Kafka和Knative Eventing
Kafka架构体系
? Kafka系统中存在5个关键组件 ? Producer ? Consumer ? Kafka Cluster ◆Broker:Kafka Server,或Kafka Node ◆Zookeeper:集群状态存储 3.0 ? Connector:连接应用程序和Topic ? Stream Processor:流处理器,从一个Topic接收并处理流式数据,并将结果存入另一个Topic ?还有两个重要的逻辑组件 ? Topic ? Partition Topic 和 Partition ?关于Topic和Partition ? Topic分类的消息流,相关的消息保存于Partition中 ◆一个Topic中的数据,可以分布保存于一至多个Partition中 ◆每个Partition中,通常存在一个leader,以及一至多个replicas/followers ? Topic是Producer发布消息,以及Consumer消费消息时使用的端点 Topic中的消息记录 ?消息及存储方式 ? Kafka中,每个消息记录(record)的标准格式通常由key、value、timestamp和一些metadata组成; ? 待存入Topic的消息记录未明确指定目标Partition时,Kafka会根据记录的key的hash码选择一个Partition;未明确指定timestamp时,Producer将会使用当前时间(创建时间或附加到日志的时间)作为其时间戳; ? Kafka将数据持久存储在log.dir参数指定 的目录中,而各topic会映射进该目录的子目录中; ◆Kafka会保留所有记录,无论它们是否已被消费 ◆记录在broker配置中定义的retention period内保留,默认时长为7天(168小时) ? Kafka基于Pub/Sub和Queue模型构建Topic,它使用消费者组(Consumer Group)的概念将处理任务划分为一组消费者进程并行运行,并且可以将消息广播到多个组中; Partition ?Partition ? Partition代表Topic中的数据分片,在其它数据库系统中,通常称为replica或shard; ◆每个Partition都是一个按时间排序的不可变记录序列,该序列存储于日志中; ◆消费者按照记录在日志中的存储顺序读取消息; ◆每个消息都有一个称为offset的id ? 能够将一个Topic中的数据并行存储于多个broker上; ? 支持以冗余机制(复制因子大于1)存储多个副本,并能容忍最多N-1个服务器故障,N为复制因子数量; ? 消费者读取一个Topic时,它将从所有Partition中读取数据 Knative Eventing 与 Kafka ?Knative Eventing中的Kafka存在三类组件,它们彼此间不存在依赖关系,各自可独立用于同Eventing中的其它组件协同 ? KafkaSource ◆负责从Kafka集群中读取消息,并转换为CloudEvents后引入到Eventing之中 ? KafkaChannel ◆Knative Eventing Channel的实现之一 ◆功能与InMemoryChannel类似,但能够提供持久化等功能,是生产环境中推荐使用的类型 ? KafkaBroker ◆Knative Eventing Broker的实现之一,功能与MT-Channel-Based Broker功能类似; ◆依赖于KafkaChannel类型的Channel实现首先部署eventing
root@master:/root/knative-in-practise/knative-deploy-v1.2/eventing# kubectl apply -f eventing-crds.yaml
root@master:/root/knative-in-practise/knative-deploy-v1.2/eventing# kubectl apply -f eventing-core.yaml
[root@master bak]# cat eventing-core.yaml |grep gcr image: gcr.io/knative-releases/knative.dev/eventing/cmd/controller@sha256:65b4fbd331d9db65d4a27c6462fac20971de540f9fea7cb7a16884265e1bd9bc value: gcr.io/knative-releases/knative.dev/eventing/cmd/apiserver_receive_adapter@sha256:b2d41d5daaca21b63ba884e1456563ee4379bfb0325be6570dbe2b1a46741bfd image: gcr.io/knative-releases/knative.dev/eventing/cmd/mtping@sha256:d23d9e0c54c2ed2e95a81106c921fd67e648f17c06e37a2f08acab9e126a3328 image: gcr.io/knative-releases/knative.dev/eventing/cmd/webhook@sha256:d5666785e0ac6cbab414142485297e575a38c9c1d9a3f17520e7f58eb4de3f80
[root@master eventing]# cat eventing-core.yaml |grep chuan image: gexuchuan123/eventingcmdcontroller:v1 value: gexuchuan123/eventingcmdapiserver_receive_adapter:v1 image: gexuchuan123/eventingcmdmtping.v1 image: gexuchuan123/eventingcmdwebhook:v1
验证
knative-eventing eventing-controller-6f4b48b8db-2cxpl 1/1 Running 0 3m34s # knative-eventing eventing-webhook-67f848c6ff-gj7zh 1/1 Running 0 3m34s #
部署Kafka
?Kafka集群的部署途径 ? Knative Eventing中的三类Kafka组件,在其后端都依赖于一个正常运行着的Kafka集群 ? Strimzi项目中的Kafka-Operator是专用于在Kubernetes集群上管理Kafka集群的Operator,它能够大大简化在Kubernetes上部署和使用Kafka的复杂度?部署kafka-operator
? 创建专用的名称空间,例如kafka:kubectl create namespace kafka ? 基于配置文件部署strimzi-cluster-operator ◆docker pull quay.io/strimzi/operator:0.28.0 ◆kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka ? 查看部署的结果root@master01:~# kubectl get pods -l strimzi.io/kind=cluster-operator -n kafka NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-587cb79468-w5lqk 1/1 Running 0 118s? 查看生成的CRD
root@master01:~# kubectl api-resources --api-group='kafka.strimzi.io' NAME SHORTNAMES APIVERSION NAMESPACED KIND kafkabridges kb kafka.strimzi.io/v1beta2 true KafkaBridge kafkaconnectors kctr kafka.strimzi.io/v1beta2 true KafkaConnector kafkaconnects kc kafka.strimzi.io/v1beta2 true KafkaConnect kafkamirrormaker2s kmm2 kafka.strimzi.io/v1beta2 true KafkaMirrorMaker2 kafkamirrormakers kmm kafka.strimzi.io/v1beta2 true KafkaMirrorMaker kafkarebalances kr kafka.strimzi.io/v1beta2 true KafkaRebalance kafkas k kafka.strimzi.io/v1beta2 true Kafka kafkatopics kt kafka.strimzi.io/v1beta2 true KafkaTopic kafkausers ku kafka.strimzi.io/v1beta2 true KafkaUser?部署Kafka示例集群 ? 为帮忙用户基于Kafka CRD快速部署Kafka集群,Strimzi提供了几个示例配置 ◆kafka-ephemeral-single.yaml:非持久化存储,单节点集群; ◆kafka-ephemeral.yaml:非持久化存储,多节点集群; ◆kafka-jbod.yaml:jbod存储,多节点集群; ◆kafka-persistent-single.yaml:持久化存储,单节点集群; ◆kafka-persistent.yaml :持久化存储,多节点集群; ? 以定义了单节点、临时存储集群的kafka-ephemeral-single配置为例(这里采用目前最新的0.28.0版本) https://strimzi.io/quickstarts/ https://github.com/strimzi/strimzi-kafka-operator/tree/0.28.0/examples ◆kubectl apply -f https://github.com/strimzi/strimzi-kafka-operator/blob/0.28.0/examples/kafka/kafka-ephemeral-single.yaml cat kafka-ephemeral-single.yaml 临时存储
root@master03:~/knative-in-practise/eventing/kafka/01-create-kafka-cluster# cat 02-kafka-ephemeral-single.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: kafka spec: kafka: version: 3.1.0 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.1" storage: type: ephemeral zookeeper: replicas: 3 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {}
root@master03:~/knative-in-practise/eventing/kafka/01-create-kafka-cluster# kubectl get kafka -nkafka NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS my-cluster 1 3
root@master02:~# kubectl apply -f kafka.yaml -nkafka
root@master01:~# kubectl get po -nkafka NAME READY STATUS RESTARTS AGE my-cluster-entity-operator-6d5ff97f6-r8msx 3/3 Running 0 73s my-cluster-kafka-0 1/1 Running 0 96s my-cluster-zookeeper-0 1/1 Running 0 2m my-cluster-zookeeper-1 1/1 Running 0 2m my-cluster-zookeeper-2 1/1 Running 0 2m strimzi-cluster-operator-587cb79468-w5lqk 1/1 Running 0 33mdocker pull quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 kubectl -nkafka run kafka-producer02 -ti --image=quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic topic01 kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic topic01 --from-beginning ? 等待集群部署完成 ◆kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka ? 部署集群时,还会自动为集群生成几个相关的Service资源,其中的bootstrap是集群消息服务的访问端点,如下面示例中的my-cluster-kafka-bootstrap ◆kubectl get svc -n kafka KafkaSource 和 KafkaChannel https://knative.dev/docs/install/yaml-install/eventing/install-eventing-with-yaml/#optional-install-a-default-channel-messaging-layer https://github.com/knative-sandbox/eventing-kafka
# Install the Kafka Source kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml #01 # Install the Kafka "Consolidated" Channel kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-consolidated.yaml #02 # Install the Kafka "Distributed" Channel kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-distributed.yaml #不用部署
部署KafkaSource
1,KafkaSource负责将Kafka中的消息记录转为CloudEvents 2,仅在需要从Kafka中加载消息并提供给Knative Eventing上的应用程序使用时才需要KafkaSource 3,命令:kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml ◆上面的部署操作,将会在knative-eventing名称空间中创建一组资源,其中会存在一个名称前缀为kafka-controllermanager的Pod资源部署KafkaChannel
1, 负责在Knative Eventing上提供基于Kafka集群的Channel实现,后端基于Kafka Topic ? 两种实现,它们分别定义在不同的资源配置文件 ◆Consolidated Channel(channel-consolidated.yaml):Knative KafkaChannel的原生实现; ◆Distributed Channel(channel-distributed.yaml):由SAP的Kyma项目提供的KafkaChannel实现, ◆提示:无论哪种配置,都需要将配置文件中的“REPLACE_WITH_CLUSTER_URL”替换为前面部署的Kafka集群的访问入口,例如“my-cluster-kafka-bootstrap.kafka:9092” ? 以Consolidated Channel为例 ◆kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-consolidated.yaml[root@master ~]# cd knative-in-practise/eventing/kafka/02-deploy-kafka-channel/# kubectl apply -f 01-source.yaml
root@master01:~# kn source list-types TYPE S NAME DESCRIPTION ApiServerSource X apiserversources.sources.knative.dev Watch and send Kubernetes API events to addressable ContainerSource X containersources.sources.knative.dev Generate events by Container image and send to addressable GitLabSource gitlabsources.sources.knative.dev KafkaSource kafkasources.sources.knative.dev Route events from Apache Kafka Server to addressable
root@master01:~# docker 336fff6a61c8 gexuchuan123/sourcereceive_adapter:v1
root@master01:~# docker 2c27c7efff43 gexuchuan123/sourcecontroller:v1
root@master01:~# kubectl get po -nknative-eventing #验证sourcecontroller knative-eventing kafka-controller-manager-57dbf5d694-8k6ss 1/1 Running 0 31s
02 02-channel-consolidated
root@master01:~/kafkachannel# docker images #用到的镜像 gexuchuan123/consolidateddispatcher v1 c9916b27aad9 52 years ago 53.7MB gexuchuan123/cmdwebhook v1 2bd6ce654802 52 years ago 48.4MB gexuchuan123/consolidatedcontroller v1 b4c14a773343 52 years ago 56.5MB
root@master02:/opt/knative-in-practise/eventing/kafka/01-deploy-kafka-channel# kubectl apply -f 02-channel-consolidated.yaml
brokers: my-cluster-kafka-bootstrap.kafka:9092
root@master01:~# kubectl get po -nknative-eventing
knative-eventing kafka-ch-controller-594964d76d-ngjc6 1/1 Running 0 79s #验证kafka-channel,需先部署eventing knative-eventing kafka-webhook-544cfd8c64-wb592 1/1 Running 0 79s
default ns改为 KafkaChannel
https://knative.dev/docs/eventing/configuration/channel-configuration/
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# cat default-channel-config.yaml apiVersion: v1 kind: ConfigMap metadata: name: default-ch-webhook namespace: knative-eventing data: default-ch-config: | clusterDefault: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel namespaceDefaults: default: apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel spec: numPartitions: 5 replicationFactor: 1 #生产环境不为1副本
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel create kc01
kn channel describe kc01 Type: KafkaChannel (messaging.knative.dev/v1beta1) #注意[root@master ~]# kn channel list #默认名称空间下为kafkachannel NAME TYPE URL AGE READY REASON kc02 KafkaChannel http://kc02-kn-channel.default.svc.cluster.local 34s True
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kubectl create ns test #其他名称空间为InMemoryChannel namespace/test created root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel create ch01 -ntest Channel 'ch01' created in namespace 'test'. root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel list -ntest NAME TYPE URL AGE READY REASON ch01 InMemoryChannel http://ch01-kn-channel.test.svc.cluster.local 9s True
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel create ch02 --type messaging.knative.dev:v1beta1:KafkaChannel -ntest Channel 'ch02' created in namespace 'test'.
在default名称空间下创建kafkatopic
root@master01:/opt/knative-in-practise/eventing/kafka/04-kafkasource-demo# cat 01-kafka-topic-demo.yaml apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: topic01
namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 5 replicas: 1
在default名称空间下创建kafkasource,指向ksvc
[root@master 04-kafkasource-demo]# cat 02-kafkasource-demo.yaml apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafkasource-demo namespace: default spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - topic01 sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
[root@master 04-kafkasource-demo]# kubectl get po NAME READY STATUS RESTARTS AGE event-display-00001-deployment-6f64bdfbbf-289q5 2/2 Running 0 27m #验证ksvc kafkasource-kafkasource-demo-2af62e5dd709203478a0eeb6ecc37xb4j6 1/1 Running 0 15m #验证kafkasource
kn service create event-display --image ikubernetes/event_display --port 8080 --scale-min 1 -nkafka #创建ksvc
#创建生产者
kubectl -nkafka run kafka-producer02 -ti --image=quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic topic01
查看kafkasource
[root@master 04-kafkasource-demo]# kubectl get KafkaSource -A NAMESPACE NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE kafka kafkasource-demo ["topic01"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 3m57s
[root@master event]# kubectl logs -f event-display-00001-deployment-6ffd8d8b8d-6jsxf -c user-container -nkafka ?? cloudevents.Event Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/kafka/kafkasources/kafkasource-demo#topic01 subject: partition:4#0 id: partition:4/offset:0 time: 2022-03
>{"msg": "hi from kafka"}
[root@master ~]# kubectl get kafkachannel kc02 -o yaml apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel metadata: name: kc02
#################
在default名称空间下创建kafkasource,指向KafkaChanel
[root@master ~]# kubectl get Channel kc01 -oyaml #忽略 apiVersion: messaging.knative.dev/v1 kind: Channel metadata: name: kc01
[root@master ~]# kubectl get kafkachannel kc01 -oyaml #查看kafkachannel格式 apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel metadata: name: kc01
[root@master 04-kafkasource-demo]# cat 04-kafkasource-demo.yaml apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafkasource-demo02 namespace: default spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 topics: - topic01 sink: ref: apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel name: kc01
验证
[root@master 04-kafkasource-demo]# kubectl get KafkaSource NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE kafkasource-demo02 ["topic01"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 26m
创建subscription,更改 v1alpha1 为 v1beta1
[root@master 04-kafkasource-demo]# kn subscription create sub01 --channel messaging.knative.dev:v1beta1:KafkaChannel:kc01 --sink ksvc:event-display
Subscription 'sub01' created in namespace 'default'.
[root@master ~]# kubectl logs -f event-display-00001-deployment-6f64bdfbbf-vkv6l -c user-container ?? cloudevents.Event Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafkasource-demo02#topic01 subject: partition:0#5 Data, {"msg": "hi from kafka"}