Kafka和Knative Eventing


 Kafka架构体系

? Kafka系统中存在5个关键组件   ? Producer   ? Consumer   ? Kafka Cluster     ◆Broker:Kafka Server,或Kafka Node     ◆Zookeeper:集群状态存储   3.0   ? Connector:连接应用程序和Topic   ? Stream Processor:流处理器,从一个Topic接收并处理流式数据,并将结果存入另一个Topic ?还有两个重要的逻辑组件   ? Topic   ? Partition   Topic 和 Partition ?关于Topic和Partition   ? Topic分类的消息流,相关的消息保存于Partition中     ◆一个Topic中的数据,可以分布保存于一至多个Partition中     ◆每个Partition中,通常存在一个leader,以及一至多个replicas/followers   ? Topic是Producer发布消息,以及Consumer消费消息时使用的端点   Topic中的消息记录 ?消息及存储方式   ? Kafka中,每个消息记录(record)的标准格式通常由key、value、timestamp和一些metadata组成;   ? 待存入Topic的消息记录未明确指定目标Partition时,Kafka会根据记录的key的hash码选择一个Partition;未明确指定timestamp时,Producer将会使用当前时间(创建时间或附加到日志的时间)作为其时间戳;   ? Kafka将数据持久存储在log.dir参数指定 的目录中,而各topic会映射进该目录的子目录中;     ◆Kafka会保留所有记录,无论它们是否已被消费     ◆记录在broker配置中定义的retention period内保留,默认时长为7天(168小时) ? Kafka基于Pub/Sub和Queue模型构建Topic,它使用消费者组(Consumer Group)的概念将处理任务划分为一组消费者进程并行运行,并且可以将消息广播到多个组中;   Partition ?Partition   ? Partition代表Topic中的数据分片,在其它数据库系统中,通常称为replica或shard;     ◆每个Partition都是一个按时间排序的不可变记录序列,该序列存储于日志中;     ◆消费者按照记录在日志中的存储顺序读取消息;     ◆每个消息都有一个称为offset的id   ? 能够将一个Topic中的数据并行存储于多个broker上;   ? 支持以冗余机制(复制因子大于1)存储多个副本,并能容忍最多N-1个服务器故障,N为复制因子数量;   ? 消费者读取一个Topic时,它将从所有Partition中读取数据   Knative Eventing 与 Kafka ?Knative Eventing中的Kafka存在三类组件,它们彼此间不存在依赖关系,各自可独立用于同Eventing中的其它组件协同   ? KafkaSource     ◆负责从Kafka集群中读取消息,并转换为CloudEvents后引入到Eventing之中   ? KafkaChannel     ◆Knative Eventing Channel的实现之一     ◆功能与InMemoryChannel类似,但能够提供持久化等功能,是生产环境中推荐使用的类型   ? KafkaBroker     ◆Knative Eventing Broker的实现之一,功能与MT-Channel-Based Broker功能类似;     ◆依赖于KafkaChannel类型的Channel实现

首先部署eventing

root@master:/root/knative-in-practise/knative-deploy-v1.2/eventing# kubectl apply -f eventing-crds.yaml

root@master:/root/knative-in-practise/knative-deploy-v1.2/eventing# kubectl apply -f eventing-core.yaml

[root@master bak]# cat eventing-core.yaml |grep gcr
          image: gcr.io/knative-releases/knative.dev/eventing/cmd/controller@sha256:65b4fbd331d9db65d4a27c6462fac20971de540f9fea7cb7a16884265e1bd9bc
              value: gcr.io/knative-releases/knative.dev/eventing/cmd/apiserver_receive_adapter@sha256:b2d41d5daaca21b63ba884e1456563ee4379bfb0325be6570dbe2b1a46741bfd
          image: gcr.io/knative-releases/knative.dev/eventing/cmd/mtping@sha256:d23d9e0c54c2ed2e95a81106c921fd67e648f17c06e37a2f08acab9e126a3328
          image: gcr.io/knative-releases/knative.dev/eventing/cmd/webhook@sha256:d5666785e0ac6cbab414142485297e575a38c9c1d9a3f17520e7f58eb4de3f80
[root@master eventing]# cat eventing-core.yaml |grep chuan
          image: gexuchuan123/eventingcmdcontroller:v1
              value: gexuchuan123/eventingcmdapiserver_receive_adapter:v1
          image: gexuchuan123/eventingcmdmtping.v1
          image: gexuchuan123/eventingcmdwebhook:v1

 验证

knative-eventing   eventing-controller-6f4b48b8db-2cxpl          1/1     Running   0              3m34s #
knative-eventing   eventing-webhook-67f848c6ff-gj7zh             1/1     Running   0              3m34s #

部署Kafka

?Kafka集群的部署途径   ? Knative Eventing中的三类Kafka组件,在其后端都依赖于一个正常运行着的Kafka集群   ? Strimzi项目中的Kafka-Operator是专用于在Kubernetes集群上管理Kafka集群的Operator,它能够大大简化在Kubernetes上部署和使用Kafka的复杂度

?部署kafka-operator

  ? 创建专用的名称空间,例如kafka:kubectl create namespace kafka   ? 基于配置文件部署strimzi-cluster-operator                ◆docker pull quay.io/strimzi/operator:0.28.0     ◆kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka   ? 查看部署的结果   
root@master01:~# kubectl get pods -l strimzi.io/kind=cluster-operator -n kafka
NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-587cb79468-w5lqk   1/1     Running   0          118s
  ? 查看生成的CRD
root@master01:~# kubectl api-resources --api-group='kafka.strimzi.io'
NAME                 SHORTNAMES   APIVERSION                 NAMESPACED   KIND
kafkabridges         kb           kafka.strimzi.io/v1beta2   true         KafkaBridge
kafkaconnectors      kctr         kafka.strimzi.io/v1beta2   true         KafkaConnector
kafkaconnects        kc           kafka.strimzi.io/v1beta2   true         KafkaConnect
kafkamirrormaker2s   kmm2         kafka.strimzi.io/v1beta2   true         KafkaMirrorMaker2
kafkamirrormakers    kmm          kafka.strimzi.io/v1beta2   true         KafkaMirrorMaker
kafkarebalances      kr           kafka.strimzi.io/v1beta2   true         KafkaRebalance
kafkas               k            kafka.strimzi.io/v1beta2   true         Kafka
kafkatopics          kt           kafka.strimzi.io/v1beta2   true         KafkaTopic
kafkausers           ku           kafka.strimzi.io/v1beta2   true         KafkaUser
?部署Kafka示例集群   ? 为帮忙用户基于Kafka CRD快速部署Kafka集群,Strimzi提供了几个示例配置     ◆kafka-ephemeral-single.yaml:非持久化存储,单节点集群;     ◆kafka-ephemeral.yaml:非持久化存储,多节点集群;     ◆kafka-jbod.yaml:jbod存储,多节点集群;     ◆kafka-persistent-single.yaml:持久化存储,单节点集群;     ◆kafka-persistent.yaml :持久化存储,多节点集群;   ? 以定义了单节点、临时存储集群的kafka-ephemeral-single配置为例(这里采用目前最新的0.28.0版本)              https://strimzi.io/quickstarts/               https://github.com/strimzi/strimzi-kafka-operator/tree/0.28.0/examples     ◆kubectl apply -f https://github.com/strimzi/strimzi-kafka-operator/blob/0.28.0/examples/kafka/kafka-ephemeral-single.yaml    cat  kafka-ephemeral-single.yaml   临时存储
root@master03:~/knative-in-practise/eventing/kafka/01-create-kafka-cluster# cat 02-kafka-ephemeral-single.yaml 
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.1.0
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.1"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}
root@master03:~/knative-in-practise/eventing/kafka/01-create-kafka-cluster# kubectl get kafka -nkafka
NAME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
my-cluster   1                        3 

 root@master02:~# kubectl apply -f kafka.yaml -nkafka

root@master01:~# kubectl get po -nkafka
NAME                                         READY   STATUS    RESTARTS   AGE
my-cluster-entity-operator-6d5ff97f6-r8msx   3/3     Running   0          73s
my-cluster-kafka-0                           1/1     Running   0          96s
my-cluster-zookeeper-0                       1/1     Running   0          2m
my-cluster-zookeeper-1                       1/1     Running   0          2m
my-cluster-zookeeper-2                       1/1     Running   0          2m
strimzi-cluster-operator-587cb79468-w5lqk    1/1     Running   0          33m
docker pull quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 kubectl -nkafka run kafka-producer02 -ti --image=quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic topic01 kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic topic01 --from-beginning      ? 等待集群部署完成     ◆kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka   ? 部署集群时,还会自动为集群生成几个相关的Service资源,其中的bootstrap是集群消息服务的访问端点,如下面示例中的my-cluster-kafka-bootstrap     ◆kubectl get svc -n kafka   KafkaSource 和 KafkaChannel https://knative.dev/docs/install/yaml-install/eventing/install-eventing-with-yaml/#optional-install-a-default-channel-messaging-layer https://github.com/knative-sandbox/eventing-kafka
# Install the Kafka Source
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml  #01

# Install the Kafka "Consolidated" Channel
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-consolidated.yaml   #02

# Install the Kafka "Distributed" Channel
kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-distributed.yaml  #不用部署

部署KafkaSource

  1,KafkaSource负责将Kafka中的消息记录转为CloudEvents   2,仅在需要从Kafka中加载消息并提供给Knative Eventing上的应用程序使用时才需要KafkaSource   3,命令:kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/source.yaml     ◆上面的部署操作,将会在knative-eventing名称空间中创建一组资源,其中会存在一个名称前缀为kafka-controllermanager的Pod资源

部署KafkaChannel

  1, 负责在Knative Eventing上提供基于Kafka集群的Channel实现,后端基于Kafka Topic   ? 两种实现,它们分别定义在不同的资源配置文件     ◆Consolidated Channel(channel-consolidated.yaml):Knative KafkaChannel的原生实现;     ◆Distributed Channel(channel-distributed.yaml):由SAP的Kyma项目提供的KafkaChannel实现,     ◆提示:无论哪种配置,都需要将配置文件中的“REPLACE_WITH_CLUSTER_URL”替换为前面部署的Kafka集群的访问入口,例如“my-cluster-kafka-bootstrap.kafka:9092”   ? 以Consolidated Channel为例     ◆kubectl apply -f https://storage.googleapis.com/knative-nightly/eventing-kafka/latest/channel-consolidated.yaml
[root@master ~]# cd knative-in-practise/eventing/kafka/02-deploy-kafka-channel/# kubectl apply -f 01-source.yaml
root@master01:~# kn source list-types
TYPE              S     NAME                                   DESCRIPTION
ApiServerSource   X     apiserversources.sources.knative.dev   Watch and send Kubernetes API events to addressable
ContainerSource   X     containersources.sources.knative.dev   Generate events by Container image and send to addressable
GitLabSource            gitlabsources.sources.knative.dev      
KafkaSource             kafkasources.sources.knative.dev       Route events from Apache Kafka Server to addressable
root@master01:~# docker  336fff6a61c8  gexuchuan123/sourcereceive_adapter:v1
root@master01:~# docker  2c27c7efff43   gexuchuan123/sourcecontroller:v1
root@master01:~# kubectl get po -nknative-eventing   #验证sourcecontroller
knative-eventing   kafka-controller-manager-57dbf5d694-8k6ss              1/1     Running   0          31s

 02 02-channel-consolidated

root@master01:~/kafkachannel# docker images  #用到的镜像 
gexuchuan123/consolidateddispatcher                                             v1            c9916b27aad9   52 years ago    53.7MB
gexuchuan123/cmdwebhook                                                         v1            2bd6ce654802   52 years ago    48.4MB
gexuchuan123/consolidatedcontroller                                             v1            b4c14a773343   52 years ago    56.5MB
root@master02:/opt/knative-in-practise/eventing/kafka/01-deploy-kafka-channel# kubectl apply -f  02-channel-consolidated.yaml
      brokers: my-cluster-kafka-bootstrap.kafka:9092
root@master01:~# kubectl get po -nknative-eventing
knative-eventing kafka-ch-controller-594964d76d-ngjc6 1/1 Running 0 79s #验证kafka-channel,需先部署eventing knative-eventing kafka-webhook-544cfd8c64-wb592 1/1 Running 0 79s

default ns改为 KafkaChannel

https://knative.dev/docs/eventing/configuration/channel-configuration/

root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# cat default-channel-config.yaml 
apiVersion: v1
kind: ConfigMap
metadata:
  name: default-ch-webhook
  namespace: knative-eventing
data:
  default-ch-config: |
    clusterDefault:
      apiVersion: messaging.knative.dev/v1
      kind: InMemoryChannel
    namespaceDefaults:
      default:
        apiVersion: messaging.knative.dev/v1beta1
        kind: KafkaChannel
        spec:
          numPartitions: 5
          replicationFactor: 1  #生产环境不为1副本

root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel create kc01

kn channel describe  kc01 Type:         KafkaChannel (messaging.knative.dev/v1beta1)   #注意
[root@master ~]# kn channel list            #默认名称空间下为kafkachannel
NAME   TYPE           URL                                                AGE   READY   REASON
kc02   KafkaChannel   http://kc02-kn-channel.default.svc.cluster.local   34s   True
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kubectl create ns test  #其他名称空间为InMemoryChannel
namespace/test created
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel create ch01 -ntest
Channel 'ch01' created in namespace 'test'.
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel list -ntest
NAME   TYPE              URL                                             AGE   READY   REASON
ch01   InMemoryChannel   http://ch01-kn-channel.test.svc.cluster.local   9s    True
root@master01:/opt/knative-in-practise/eventing/kafka/03-channel-defaults# kn channel create ch02 --type messaging.knative.dev:v1beta1:KafkaChannel -ntest
Channel 'ch02' created in namespace 'test'.

 在default名称空间下创建kafkatopic

root@master01:/opt/knative-in-practise/eventing/kafka/04-kafkasource-demo# cat 01-kafka-topic-demo.yaml 
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: topic01
namespace: kafka labels: strimzi.io
/cluster: my-cluster spec: partitions: 5 replicas: 1

 在default名称空间下创建kafkasource,指向ksvc

[root@master 04-kafkasource-demo]# cat 02-kafkasource-demo.yaml 
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafkasource-demo
  namespace: default
spec:
  consumerGroup: knative-group
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092 
  topics:
   - topic01
  sink: 
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
[root@master 04-kafkasource-demo]# kubectl get po
NAME                                                              READY   STATUS    RESTARTS      AGE
event-display-00001-deployment-6f64bdfbbf-289q5                   2/2     Running   0             27m  #验证ksvc
kafkasource-kafkasource-demo-2af62e5dd709203478a0eeb6ecc37xb4j6   1/1     Running   0             15m  #验证kafkasource

kn service create event-display --image ikubernetes/event_display --port 8080 --scale-min 1 -nkafka     #创建ksvc

#创建生产者

kubectl -nkafka run kafka-producer02 -ti --image=quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic topic01

查看kafkasource

[root@master 04-kafkasource-demo]# kubectl get KafkaSource -A
NAMESPACE   NAME               TOPICS        BOOTSTRAPSERVERS                            READY   REASON   AGE
kafka       kafkasource-demo   ["topic01"]   ["my-cluster-kafka-bootstrap.kafka:9092"]   True             3m57s
[root@master event]# kubectl logs -f event-display-00001-deployment-6ffd8d8b8d-6jsxf -c user-container -nkafka
??  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.kafka.event
  source: /apis/v1/namespaces/kafka/kafkasources/kafkasource-demo#topic01
  subject: partition:4#0
  id: partition:4/offset:0
  time: 2022-03

>{"msg": "hi from kafka"} 

[root@master ~]# kubectl get kafkachannel kc02 -o yaml
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
  name: kc02

#################

 在default名称空间下创建kafkasource,指向KafkaChanel

[root@master ~]# kubectl get Channel kc01 -oyaml  #忽略
apiVersion: messaging.knative.dev/v1
kind: Channel
metadata:
  name: kc01
[root@master ~]# kubectl get kafkachannel kc01 -oyaml  #查看kafkachannel格式
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
  name: kc01
[root@master 04-kafkasource-demo]# cat 04-kafkasource-demo.yaml 
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafkasource-demo02
  namespace: default
spec:
  consumerGroup: knative-group
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092 
  topics:
   - topic01
  sink: 
    ref:
      apiVersion: messaging.knative.dev/v1beta1
      kind: KafkaChannel
      name: kc01

验证

[root@master 04-kafkasource-demo]# kubectl get KafkaSource
NAME                 TOPICS        BOOTSTRAPSERVERS                            READY   REASON   AGE
kafkasource-demo02   ["topic01"]   ["my-cluster-kafka-bootstrap.kafka:9092"]   True             26m

 创建subscription,更改 v1alpha1 为 v1beta1

[root@master 04-kafkasource-demo]# kn subscription create sub01 --channel messaging.knative.dev:v1beta1:KafkaChannel:kc01 --sink ksvc:event-display
Subscription 'sub01' created in namespace 'default'.

[root@master ~]# kubectl logs -f event-display-00001-deployment-6f64bdfbbf-vkv6l  -c user-container

??  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.kafka.event
  source: /apis/v1/namespaces/default/kafkasources/kafkasource-demo02#topic01
  subject: partition:0#5

Data,
  {"msg": "hi from kafka"}

相关