Kafka知识总结


一、概念和基本架构

1.1 kafka介绍

Kafka在一个或多个可以跨越多个数据中心的服务器上作为集群运行。Kafka集群中按照主题分类管理,一个主题可以有多个分区,一个分区可以有多个副本分区。每个记录由一个键,一个值和一个时间戳组成。Kafka只有消息的拉取,没有推送,可以通过轮询实现消息的推送。有俩种消息传递方式:点对点传递模式,发布订阅模式。

1.2 kafka优势

1. 高吞吐量:单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能。

2. 高性能:单节点支持上千个客户端,并保证零停机和零数据丢失。

3. 持久化数据存储:将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。效率高:零拷贝,顺序读,顺序写,利用Linux的页缓存。

4. 分布式系统,易于向外扩展。所有的Producer、Broker和Consumer都会有多个,均为分布式的。无需停机即可扩展机器。多个Producer、Consumer可能是不同的应用。

5. 可靠性 - Kafka是分布式,分区,复制和容错的。

6. 客户端状态维护:消息被处理的状态是在Consumer端维护,而不是由server端维护。当失败时能自动平衡。

7. 支持online和offline的场景。

8. 支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。

1.3 应用场景

日志收集:一个公司可以用Kafka可以收集各种服务的Log,通过Kafka以统一接口服务的方式开放给各种Consumer;

消息系统:解耦生产者和消费者、缓存消息等;

用户活动跟踪:Kafka经常被用来记录Web用户或者App用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到Kafka的Topic中,然后消费者通过订阅这些Topic来做实时的监控分析,亦可保存到数据库;

运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;

1.4 消息的发送与接收

生产者主要参数配置

bootstrap.servers:broker地址,集群不需配置全部,只需配置几个。生产者连接上指定的broker之后,可通过该连接发现其他节点。

key.serializer:发送信息key数据的序列化类。

value.serializer: 发送消息value数据的序列化类。

acks:0生产者不需要等待broker对消息的确认,主要将消息放到缓存区,就认为消息已经发送完成。1消息只要写到主分区就响应客户端,不需要等待副本分区确认。 acks=-1或all首领分区需要等待所有ISR副本分区的确认。

retries: 重试次数,当消息发送出现错误的时候,系统会重发消息。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1。否则在重试此失败消息的时候,其他的消息可能发送成功了。

compression.type: 生产者生成数据的压缩格式,默认无。支持的格式none , gzip , snappy 和lz4。

二、高级特性分析

2.1  消息发送

由上图可以看出:KafkaProducer有两个基本线程:

主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;消息收集器RecoderAccumulator为每个分区都维护了一个Deque 类型的双端队列。

ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一

个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

Sender线程:

该线程从消息收集器获取缓存的消息,将其处理为 的形式, Node 表示集群的broker节点。进一步将转化为形式,此时才可以

向服务端发送数据。在发送之前,Sender线程将消息以 Map> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

2.2  主题和分区

Topic,Kafka用于分类管理消息的逻辑单元,类似与MySQL的数据库。

Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上。优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是BrokerServer数量的整数倍。

Consumer Group,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部的消息。在消费组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。

分区规则:1. 如果record提供了分区号,则使用record提供的分区号。2. 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模。3. 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。(首先在可用的分区中分配分区号,没有可用分区,在该主题所有分区中分配分区号)

2.3  拦截器

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息。

自定义拦截器:1. 实现ProducerInterceptor接口  2. 在KafkaProducer的设置中设置自定义的拦截器

2.4 心跳机制

Kafka 的心跳是 Kafka Consumer和 Broker之间的健康检查,只有当Broker Coordinator正常时,Consumer才会发送心跳。如果出现故障会触发再平衡。一个分区只能对应一个消费组中的一个消费者, 一个消费者可以对用多个分区。消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。broker宕机,主题的分区宕机也会触发再平衡。

2.5 重平衡

触发条件:1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。2. 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡 3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡。

平衡过程中,消费者无法从kafka获取消息。如果kafak节点有很多,则重平衡时间可能很长。所以要避免重平衡。

避免重平衡设置:如果消费者挂了,是无法避免重平衡的。由于网络问题没接收到心跳,可能会让kafka误以为消费者挂了。可通过以下三个参数配置。

session.timout.ms控制心跳超时时间。 推荐设置为6s

heartbeat.interval.ms控制心跳发送频率, 推荐设置为2s

max.poll.interval.ms控制poll的间隔, 推荐为消费者处理消息最长耗时再加1分钟

2.6 分区副本机制

Kafka在一定数量的服务器上对主题分区进行复制。当集群中的一个broker宕机后系统可以自动故障转移到其他可用的副本上,不会造成数据丢失。

宕机如何恢复:

少部分副本宕机,当leader宕机了,会从follower选择一个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader里pull数据。

全部副本宕机,1. 等待ISR中的一个恢复后,并选它作为leader。(等待时间较长,降低可用性) 2、选择第一个恢复的副本作为新的leader,无论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)

2.7 leader选举

Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。如果这个集合有增减,kafka会更新zookeeper上的记录。如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有N+1个副本,kafka可以容忍N个服务器不可用。

为什么不用过半机制?这种算法和kafka比起来冗余度高,耗费资源如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。kafka只要有一个副本存活即可。

如果所有的ISR副本都失败了怎么办?1. 等待ISR集合中的副本复活。2. 选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中,需要设置unclean.leader.election.enable=true。

2.8 日志删除

基于时间:日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是7天,log.retention.ms 优先级最高。

基于压缩:不删除数据,把数据压缩后存储到磁盘。具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除。

压缩好处:Kafka即是数据源又是存储工具,可以简化技术栈,降低维护成本。使用外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使用这些Key将数据取回,实现起来有一定的工程难度和复杂度。使用Kafka的日志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读回到内存就可以了。Kafka对于磁盘的读写做了大量的优化工作,比如磁盘顺序读写。相对于外部存储介质没有索引查询等工作量的负担,可以实现高性能。同时,Kafka的日志压缩机制可以充分利用廉价的磁盘,不用依赖昂贵的内存来处理,在性能相似的情况下,实现非常高的性价比(这个观点仅仅针对于异常处理和容灾的场景来说)。

2.9 高性能磁盘存储

零拷贝:零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。传统IO一般会有四次拷贝,1、将磁盘文件,读取到操作系统内核缓冲区;2、将内核缓冲区的数据,copy到application应用程序的buffer;3、将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区);4、将socket buffer的数据,copy到网络协议栈,由网卡进行网络传输。

Java Nio就是把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;消费者从broker读取数据,就是由此实现。

页缓存:页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。Kafka接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时,可以使用mmap内存文件映射。mmp工作原理是直接利用操作系统的Page来实现磁盘文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。Kafka中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。

消息先被写入页缓存,由操作系统负责刷盘任务。

顺序写入:

Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算Kafka使用磁盘作为存储介质,也能承载非常大的吞吐量。

Kafka速度快是因为:1. partition顺序读写,充分利用磁盘特性,这是基础;2. Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;3. Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。