kafka06-日志存储


参考文献:
1、《深入理解Kafka-核心设计与实践原理(201901)》(第5章 日志存储)

1、文件目录布局

  • 不考虑多副本的情况,一个分区对应一个日志(Log)。
  • 日志分段(LogSegment)是为了防止Log过大,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。
    • Log在物理上只以文件夹的形式存储。
    • 每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".txnindex"为后缀的事务索引文件)。
  • 主题、分区、副本和Log、LogSegment之间的关系

  • 创建一个4个分区,2个副本的主题topic-log。并向主题中发送一定量的消息,某一时刻topic-log-0目录中的布局如下所示。
128 ~]# kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-log --partitions 4 --replication-factor 2

128 ~]# ll /tmp/kafka/log/topic-log-0
-rw-r--r--. 1 root root 10485760 11月 19 16:41 00000000000000000000.index
-rw-r--r--. 1 root root 5111     11月 19 16:41 00000000000000000000.log
-rw-r--r--. 1 root root 10485756 11月 19 16:41 00000000000000000000.timeindex
-rw-r--r--. 1 root root 10485760 11月 19 16:45 00000000000000000133.index
-rw-r--r--. 1 root root 4085     11月 19 16:45 00000000000000000133.log
-rw-r--r--. 1 root root 10485756 11月 19 16:45 00000000000000000133.timeindex
-rw-r--r--. 1 root root 10485760 11月 19 16:49 00000000000000000251.index
-rw-r--r--. 1 root root 3869     11月 19 16:49 00000000000000000251.log
-rw-r--r--. 1 root root 10485756 11月 19 16:49 00000000000000000251.timeindex
  • 为了便于消息的检索,每个LogSegment中的日志文件(以".log"为文件后缀)都有对应的两个索引文件:
    • 偏移量索引文件(以".index"为文件后缀)
    • 时间戳索引文件(以".timeindex"为文件后缀)
  • 每个LogSegment都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offset。偏移量是一个64位的长整型数。
  • 日志文件和两个索引文件都是根据基准偏移量(baseOffst)命名的,名称固定为20位数字,没有达到的位数则用0填充。
    • 比如第1个LogSegment的日志文件名是00000000000000000000.log,因此它的基准偏移量是0。
    • 比如第2个LogSegment的日志文件名是00000000000000000133.log,因此它的基准偏移量是133,也说明了该LogSegment中的第一条消息的偏移量为133,同时可以反映出第一个LogSegment中共有133条消息(偏移量从0至132的消息)。
  • 向Log中追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment都不能写入数据。
    • 为了方便描述,我们将最后一个LogSegment称为"activeSegment" ,即表示当前活跃的日志分段
    • 随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。
  • 注意每个LogSegment中不只包含".log"、".index"、".timeindex"这3种文件,还可能包含"deleted"、".cleaned"、".swap"等临时文件,以及可能的".snapshot"、".txnindex"、"leader-epoch-checkpoint"等文件。
  • 在某一时刻,Kafka中的文件目录布局如图所示。
    • 每一个根目录都会包含最基本的4个检查点文件(xxx-checkpoint)和meta.properties文件。
    • 初始情况下主题__consumer_offisets并不存在,当第一次有消费者消费消息时会自动创建这个主题。
    • 在创建主题的时候,如果当前broker中不止配置了一个根目录,那么会挑选分区数最少的那个根目录来完成本次创建任务。

2、日志格式的演变

  • Kafka的消息格式有3个版本:
    • v0版本:在Kafka 0.10.0之前使用
    • v1版本:Katka从0.10.0到0.11.0版本之前使用(比v0版本就多了一个timestamp字段)
    • v2版本:Katka从0.11.0版本开始所使用(比v0和v1的版本而言变化很大)
  • 每个分区由内部的每一条消息组成,如果消息格式设计得不够精炼,那么其功能和性能都会大打折扣。
    • 如果有冗余字段,势必会不必要地增加分区的占用空间,进而不仅使存储的开销变大、网络传输的开销变大,也会使Kafka的性能下降。
    • 如果缺少字段,比如在最初的Kafka消息版本中没有timestamp字段,对内部而言,其影响了日志保存、切分策略,对外部而言,其影响了消息审计、端到端延迟、大数据应用等功能的扩展。

1、v0版本和v1版本

  • Kafka消息格式的第一个版本通常称为v0版本(在0.8.x版之前,Kafka还使用过一个更古老的消息格式,不过不需要了解这个版本的消息格式)。
  • Katka从0.10.0版本到0.11.0版本之前所使用的消息格式版本为v1,比v0版本就多了一个timestamp字段,表示消息的时间戳。
  • v0和v1版本的消息格式
    • 图1中左边的"RECORD"部分就是v0版本的消息格式,大多数人会把图1中左边的整体(即包括offset和message size字段)都看作消息,因为每个RECORD(v0和v1版)必定对应一个offset和message size
      • 每条消息都有一个offset用来标志它在分区中的偏移量,这个offset是逻辑值,而非实际物理偏移值。
      • message size表示消息的大小。
      • 这两者在一起被称为日志头部(LOG OVERHEAD),固定为12B。
    • LOG_OVERHEAD和RECORD一起用来描述一条消息。
      • 在讲述具体消息格式时会偏向于将单纯的RECORD看作消息,而在其他地方则偏向于将LOG OVERHEAD和RECORD的整体看作消息,读者需要留意其中的区别。
    • 消息集(图1的右边部分,Message Set)中包含一条或多条消息,消息集不仅是存储于磁盘及在网络上传输(Produce & Fetch)的基本形式,而且是Kafka中压缩的基本单元

  • v0版本的消息格式中个字段的释义:
    • crc32(4B):crc32校验值。校验范围为magic至value之间。
    • magic(1B):消息格式版本号,此版本的magic值为0。
    • attributes(1B):消息的属性。总共占1个字节。
      • 低3位表示压缩类型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入)。
      • 其余位保留。
    • key length(4B):表示消息的key的长度。如果为-1,则表示没有设置key,即key=null。
    • key:可选,如果没有key则无此字段。
    • value length 4B):实际消息体的长度。如果为-1,则表示消息为空。
    • value:消息体。可以为空,比如墓碑(tombstone)消息。
  • v1版本的消息格式中个字段的释义:(没有解释的和v0的含义相同)
    • crc32(4B):
    • magic(1B):消息格式版本号,此版本的magic值为1。
    • timestamp:表示消息的时间戳(v0中没有)。
      • timestamp类型由broker端参数log.message.timestamp.type来配置,默认值为CreateTime,即采用生产者创建消息时的时间戳。
      • 如果在创建ProducerRecord时没有显式指定消息的时间戳,那么Kafkaproducer也会在发送这条消息前自动添加上。
    • attributes(1B):消息的属性。总共占1个字节。
      • 低3位和v0版本的一样。
      • 第4位(bit):0表示timestamp类型为CreateTime,1表示timestamp类型为LogAppendTime。
      • 其余位保留。
    • key length(4B):
    • key:
    • value length 4B):
    • value:
  • 破损消息
    • v0版本中一个消息的最小长度(RECORD_OVERHEAD_vo)为crc32 + magic + attributes +key length + value length = 4B + 1B + 1B +4B +4B = 14B,v0版本中一条消息的最小长度为14B。
    • v1版本的消息的最小长度(RECORD_OVERHEAD_V1 )要比v0版本的大8个字节,即22B。
    • 如果消息的长度小于最小长度,那么这就是一条破损的消息而不被接收

2、消息压缩

  • Kafka的压缩方式是将多条消息一起进行压缩,这样可以保证较好的压缩效果(因为一条消息通常不会太大,压缩效果不太好)。
  • 在一般情况下,生产者发送的压缩数据在broker中也是保持压缩状态进行存储的,消费者从broker获取的也是压缩的消息,消费者在处理消息之前才会解压消息,这样保持了端到端的压缩
  • Kafka日志中使用哪种压缩方式是通过参数compression.type来配置的。
    • 默认值为"producer",表示保留生产者使用的压缩方式。
    • 这个参数还可以配置为"gzip"、"snappy"、"lz4",分别对应GZIP、SNAPPY、LZ4这3种压缩算法。
    • 如果参数compression.type配置为"uncompressed",则表示不压缩。
  • 消息压缩的过程
    • 压缩消息时是将整个消息集进行压缩作为内层消息(inner message),内层消息整体作为外层消息(wrapper message)的value,其结构如图1所示。(Record表示的是从crc32到value的消息格式)
    • 压缩后的外层消息中的key为null,所以图1的左半部分没有画出key字段,value字段中保存的是多条压缩消息(内层消息)。
    • 当生产者创建压缩消息的时候,对内部压缩消息设置的offset从0开始为每个内部消息分配offset,参考图2右半部分。
      • 其实每个从生产者发出的消息集中的消息offset都是从0开始的,当然这个offset不能直接存储在日志文件中,对offset的转换是在服务端进行的,客户端不需要做这个工作。
      • 外层消息保存了内层消息中最后一条消息的绝对位移(absolute offset),绝对位移是相对于整个分区而言的。
      • 参考图2,对于未压缩的情形,图2右半部分内层消息中最后一条的offset理应是1030,但被压缩之后就变成了5,而这个1030被赋予给了外层的offset。
    • 当消费者消费这个消息集的时候,首先解压缩整个消息集,然后找到内层消息中最后一条消息的inner offset,根据如下公式找到内层消息中最后一条消息前面的消息的absolute offiset(RO表示Relative Offset, IO表示Inner Offset,而AO表示Absolute Offset):
      • RO = IO_of_a_message - IO_of_the_last_message(相对offset = 内层消息A的offset - 最后一个内层消息的offset)
      • AO = AO_Of_Last_Inner_Message + RO (消息A的绝对offset = 最后一个内层消息的绝对offset + 相对offset)

  • 对于压缩的情形,外层消息的timestamp设置为:
    • 如果timestamp类型是CreateTime,那么设置的是内层消息中最大的时间戳。
    • 如果timestamp类型是LogAppendTime,那么设置的是Kafka服务器当前的时间戳。
    • 内层消息的timestamp设置为:
      • 如果外层消息的timestamp类型是CreateTime,那么设置的是生产者创建消息时的时间戳。
      • 如果外层消息的timestamp类型是LogAppendTime,那么所有内层消息的时间戳都会被忽略。
  • 对attributes字段而言,它的timestamp位只在外层消息中设置,内层消息中的timestamp类型一直都是CreateTime.

3、v2版本

  • Katka从0.11.0版本开始所使用的消息格式版本为v2,这个版本的消息格式相比v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。

1、变长整型(Varints)和ZigZag编码

  • Varints是使用一个或多个字节来序列化整数的一种方法。数值越小,其占用的字节数就越少。
  • 为了使编码更加高效,Varints使用了ZigZag的编码方式。
  • ZigZzag编码以一种锯齿形(zig-zags)的方式来回穿梭正负整数,将带符号整数映射为无符号整数,这样可以使绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1、1编码为、-2编码为3。

  • 不过需要注意的是,Varints并非一直会节省空间,一个int32最长会占用5个字节(大于默认的4个字节),一个int64最长会占用10个字节(大于默认的8个字节)。

2、v2版本的消息格式

  • v2版本的消息结构
    • v2版本中消息集称为Record Batch,而不是先前的Message Set,其内部也包含了一条或多条消息。
    • 消息的格式参见图的中部和右部。
    • 在消息压缩的情形下,Record Batch Header部分(参见图左部,从first offset到records count字段)是不被压缩的,而被压缩的是records字段中的所有内容。
    • 生产者客户端中的ProducerBatch对应这里的RecordBatch,而ProducerRecord对应这里的Record。

 

  • v2版本消息(Record)的关键字段,可以看到内部字段大量采用了Varints,这样Kafka可以根据具体的值来确定需要几个字节来保存。v2版本的消息格式去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且attributes字段被弃用了(key、key length、value、value length字段同v0和v1版本的一样,这里不再赘述)。
    • length:消息总长度。
    • attributes:弃用,但还是在消息格式中占据1B的大小,以备未来的格式扩展。
    • timestamp delta:时间戳增量。通常一个timestamp需要占用8个字节,如果像这里一样保存与RecordBatch的起始时间戳的差值,则可以进一步节省占用的字节数。
    • offset delta:位移增量。保存与RecordBatch起始位移的差值,可以节省占用的字节数。
    • key length:
    • key:
    • value length:
    • value:
    • headers count:headers的个数。
    • headers:这个字段用来支持应用级别的扩展,而不需要像v0和v1版本一样不得不将一些应用级别的属性值嵌入消息体。Header的格式如图最右部分所示,包含key和value,一个Record里面可以包含0至多个Header。
  • v2版本消息集(RecordBatch)的关键字段,参考图最左部分:
    • first offset:表示当前RecordBatch的起始位移。
    • length:计算从partition leader epoch字段开始到末尾的长度。
    • partition leader epoch:分区leader纪元,可以看作分区leader的版本号或更新次数。
    • magic:消息格式的版本号,对v2版本而言,magic等于2。
    • crc32:crc32校验值。
    • attributes:消息属性,注意这里占用了两个字节。
      • 低3位表示压缩格式,可以参考v0和v1。
      • 第4位表示时间戳类型。
      • 第5位表示此RecordBatch是否处于事务中,0表示非事务,1表示事务。
      • 第6位表示是否是控制消息(ControlBatch),0表示非控制消息,1表示是控制消息,控制消息用来支持事务功能。
    • last offset delta:RecordBatch中最后一个Record的offset与first offset的差值。主要被broker用来确保RecordBaich中Record组装的正确性。
    • first timestamp:RecordBatch中第一条Record的时间戳。
    • max timestamp:RecordBatch中最大的时间戳,一般情况下是指最后一个Record的时间戳,和last offset delta的作用一样,用来确保消息组装的正确性。
    • producer id:PID,用来支持幂等和事务。
    • producer epoch:和producer id一样,用来支持幂等和事务。
    • first sequence:和producer id一样,用来支持幂等和事务。
    • records count:RecordBatch中消息的个数。
    • records:被压缩的消息。

4、查看内容日志

  • kafka-dump-log.sh
    • --files :要转储的数据和索引日志文件列表,用逗号分隔。(必须)
    • --print-data-log:如果设置,转储数据日志时打印消息内容。如果指定了任何解码器选项,则自动设置。
#在Katka 2.0.0之前使用
128 ~]# kafka-run-class.sh kafka.tools.DumpLogSegments --files
128 ~]# kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka/log/topic-log-3/00000000000000000000.log --print-data-log

#从Katka 2.0.0开始使用
128 ~]# kafka-dump-log.sh --files /tmp/kafka/log/topic-log-0/00000000000000000000.log
128 ~]# kafka-dump-log.sh --files /tmp/kafka/log/topic-log-0/00000000000000000000.log --print-data-log
#                                                                                                                       #