Kafka 物理存储机制

2023-04-28,,

一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从 Kafka文件存储机制和物理结构角度,分析 Kafka是如何实现高效文件存储,及实际应用效果。Kafka 的基本存储单位是分区。在配置 Kafka 的时候,管理员指定了一个用于存储分区的目录清单 log.dirs 参数的值。

一、分区分配


创建主题时,Kafka 首先决定如何在 broker 之间分配分区。假设有 6个 broker,打算创建一个包含 10个分区的主题。并且复制系数是3,相当于 30个分区副本。在被分配到6个broker 上时,要达到如下的目标:
【1】在 broker 间平均分配分区副本。对于上述例子来说,就是要保证每个 broker 可以分到 5个副本。
【2】确保每个分区的每个副本分布在不同的 broker 上。
【3】如果为 broker 指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的 broker 上。这样做是为了保证一个机架不可用不会导致整个分区不可用。
为了实现这个目标,我们先随机选择一个 broker(假设是2),然后通过轮询给每个 broker 分配分区来确定首领的位置。如果分区0的首领在 broker2上,那么分区1的首领就在 broker3上,以此类推。然后,从分区首领开始,以此分配跟随者副本。如分区0首领在 broker2 上,那么它的第一个副本会出现在 broker3 上,第二个出现在 broker4 上。如果配置了机架信息,那么就不是按照数字顺序来选择 broker 了,而是按照交替机架的方式来选择 broker。假设 broker0、broker1、broker2 放在同一个机架,broker3、broker4、broker5 放在其他不同的机架。此时就不是按照0到5的顺序来选择 broker,而是按照0,3,1,4,2,5的顺序进行选择的。

二、文件管理


保留数据时 Kafka 的一个基本特性,Kafka 不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,Kafka 管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。 因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段。默认情况下,index 大小为10M,每个片段(log)包含 1GB 或一周数据,以较小的那个为准。当前正在写入数据的片段叫做活跃片段,活跃片段永远不会被删除。

三、文件格式


我们把 Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的。因为使用相同的消息格式进行磁盘存储和网络传输,Kafka 可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压缩。除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳。时间戳可以是生产者发送消息的时间,也可以是消息到达 broker的时间,这个是可配置的。如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩再一次,被当做包装消息进行发送。下面是普通消息和包装消息图:

四、文件存储机制


【1】Broker:消息中间件处理结点,一个 Kafka节点就是一个 Broker,多个 Broker可以组成一个 Kafka集群。
【2】Topic:主题,如 page view日志、click日志等都可以以 Topic的形式存在,Kafka集群能够同时负责多个 Topic的分发。
【3】Partition:Topic物理上的分组,一个 Topic可以分为多个 Partition,每个 Partition是一个有序的队列。
【4】Segment:Partition 物理上由多个 Segment组成。
【5】offset:每个 Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 Partition中。Partition中的每个消息都有一个连续的序列号叫做 offset,用于 Partition唯一标识一条消息。

分析过程分为以下4个步骤:
【1】Topic 中 Partition存储分布:假设 Kafka集群只有一个 Broker,xxx/message-folder为数据文件存储根目录,在 Kafka Broker 中 server.properties 文件配置(参数 log.dirs=xxx/message-folder),例如创建2个 Topic名称分别为 report_pushlaunch_info,Partitions 数量都为 partitions=4(将一个 Topic分为4个部分存储)存储路径和目录规则为:
在 Kafka文件存储中,同一个 Topic下有多个不同 Partition,每个 Partition为一个目录,Partiton命名规则为 Topic名称+有序序号,第一个 Partiton序号从0开始,序号最大值为 Partitions数量减1。

【2】Partiton 中文件存储方式:每个 Partion(目录)相当于一个巨型文件被平均分配到多个大小相等 Segment(段)数据文件中。但每个段 Segment file 消息数量不一定相等,这种特性方便 old segment file 快速被删除。每个 Partiton只需要支持顺序读写就行了,Segment 文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

【3】partiton中 segment文件存储结构:segment file 由2大部分组成,分别为 index file和 data file,此2个文件对应,成对出现,后缀".index"和“.log”分别表示为 segment索引文件、数据文件。segment 文件命名规则:partion 全局的第一个segment 从0开始,后续每个 segment文件名为上一个 segment文件最后一条消息的 offset值。数值最大为 64位 long大小,19位数字字符长度,没有数字用0填充。下面文件列表是笔者在 Kafka broker上做的一个实验,创建一个 topicXXX包含1 partition,设置每个 segment大小为 500MB,并启动 producer向 Kafka broker写入大量数据,如下图所示 segment文件列表形象说明了上述2个规则以及 segment 中 index<—->data file对应关系物理结构如下:

索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中 message的物理偏移地址。 其中以索引文件中元数据 3,497为例,依次在数据文件中表示第3个 message(在全局 partiton表示第 368772个 message)、以及该消息的物理偏移地址为497。从上述图3了解到 segment data file 由许多 message组成,下面详细说明 message物理结构如下:
【参数说明】:8 byte offset:在 Parition(分区)内的每条消息都有一个有序的 id号,这个 id号被称为偏移(offset),它可以唯一确定每条消息在 Parition 内的位置。即 offset表示 Partiion的第多少 message
4 byte message size:
message 大小;
4 byte CRC32:用 crc32校验 message;
1 byte “magic":表示本次发布 Kafka服务程序协议版本号;
1 byte “attributes":表示为独立版本、或标识压缩类型、或编码类型;
4 byte key length:表示 key的长度,当key为 -1时,K byte key字段不填;
value bytes payload:表示实际消息数据;

【4】在 partition 中如何通过 offset查找 message:例如读取 offset=368776 的 Message,需要通过下面2个步骤查找。
第一步查找 segment file:上述图为例,其中 00000000000000000000.index 表示最开始的文件,起始偏移量(offset)为0。第二个文件 00000000000000368769.index 的消息量起始偏移量为 368770 = 368769 + 1。同样,第三个文件00000000000000737337.index 的起始偏移量为 737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。当 offset=368776 时定位到00000000000000368769.index|log;
第二步通过 segment file 查找 message:通过第一步定位到 segment file,当 offset=368776 时,依次定位到00000000000000368769.index 的元数据物理位置和 00000000000000368769.log 的物理偏移地址,然后再通过00000000000000368769.log 顺序查找直到 offset=368776为止。从上述图可知这样做的优点,segment index file 采取稀疏索引存储方式,它减少索引文件大小,通过 mmap可以直接内存操作,稀疏索引为数据文件的每个对应 message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。通过上述过程详细分析,我们就可以清楚认识到 kafka文件存储机制的奥秘。

五、Kafka 文件存储机制的实际运行效果


实验环境Kafka 集群 = 由2台虚拟机组成;CPU = 4核;物理内存 = 8GB;网卡 = 千兆网卡;JVM HEAP = 4GB;详细 Kafka服务端配置及其优化请参考:Kafka server.properties配置详解:

从上述图5可以看出,Kafka 运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟 Kafka文件存储中读写 message的设计是息息相关的。Kafka 中读写 message有如下特点:写 message,消息从 java堆转入 page cache即物理内存。由异步线程刷盘,消息从 page cache刷入磁盘。读 message 消息直接从 page cache转入 socket发送出去。当从 page cache 没有找到相应数据时,此时会产生磁盘IO,从磁盘 Load消息到 page cache,然后直接从 socket发出去。

六、Kafka 中的 Partition 和 Offset


【1】Log机制:说到分区,就要说 Kafka对消息的存储,首先,kafka是通过 log(日志)来记录消息发布的。每当产生一个消息,Kafka 会记录到本地的 log文件中,这个 log和我们平时的 log有一定的区别。这里可以参考一下 The Log,不多解释。这个 log文件默认的位置在 config/server.properties 中指定的,默认的位置是 log.dirs=/tmp/kafka-logs,Linux不用说,windows的话就在你对应磁盘的根目录下。

分区 Partition:Kafka是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小。因此有了 Partition的概念。Kafka 对消息进行一定的计算,通过 hash来进行分区。这样,就把一份 log文件分成了多份。如上面的分区读写日志图,分成多份以后,在单台 Broker上,比如快速上手中,如果新建 Topic的时候,我们选择 replication-factor 1 partitions 2,那么在 log目录里,我们会看到 test-0目录和 test-1目录就是两个分区了。你可能会想,这没啥区别呀。注意,当有了多个 broker之后,这个意义就存在了。这里上一张图:
【2】Kafka 分布式分区存储:这是一个 Topic包含4个Partition2 Replication(拷贝),也就是说全部的消息被放在了4个分区存储,为了高可用,将4个分区做了2份冗余,然后根据分配算法。将总共8份数据,分配到 Broker集群上。结果就是每个 Broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用。比如图中的 Broker1,宕机了那么剩下的三台 Broker依然保留了全量的分区数据。所以还能使用,如果再宕机一台,那么数据不完整了。当然你可以设置更多的冗余,比如设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行。需要在存储占用和高可用之间做衡量。至于宕机后,zookeeper会选出新的 partition leader。

偏移 offset上一段说了分区,分区就是一个有序的,不可变的消息队列。新来的 commit log 持续往后面加数据。这些消息被分配了一个下标(或者偏移),就是offset,用来定位这一条消息。消费者消费到了哪条消息,是保持在消费者这一端的。消息者也可以控制,消费者可以在本地保存最后消息的 offset,并间歇性的向 zookeeper注册 offset。也可以重置 offset。

如何通过 offset算出分区:其实 Partition存储的时候,又分成了多个 segment(段),然后通过一个 index索引,来标识第几段。这里先可以去看一下本地 log目录的分区文件夹。在我这里,test-0,这个分区里面,会有一个 index文件和一个 log文件,对于某个指定的分区,假设每5个消息作为一个段大小,当产生了10条消息的情况下,目前有会分段。
0.index (表示这里 index是对0-4做的索引)、5.index (表示这里index是对5-9做的索引)、10.index (表示这里index是对10-15做的索引,目前还没满) 和 0.log、5.log、10.log。当消费者需要读取 offset=8 的时候,首先 kafka 对 index文件列表进行二分查找,可以算出。应该是在 5.index对应的 log文件中,然后对对应的 5.log文件,进行顺序查找,5->6->7->8,直到顺序找到8就好了。

七、索引


消费者可以从 Kafka 的任意可用偏移量位置开始读取消息,假设消费者要读取从偏移量 100开始的 1MB 消息,那么 Broker 必须立即定位到偏移量 100,为了帮组 broker 更快地定位到指定的偏移量,Kafka 为每个分区维护一个索引。索引吧偏移量映射到片段文件和偏移量在文件里的位置。索引也被分成片段,所以再删除消息时,也可以删除相应的索引。Kafka 不维护索引的校验和。如果索引出现损坏,Kafka 会通过重新读取消息并录制偏移量和位置来重新生成索引。如果有必要,管理员是可以删除索引的,这样做是绝对安全的,Kafka 会自动重新生成这些索引。

八、Kafka高效文件存储设计特点


【1】Kafka把 topic中一个 parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
【2】通过索引信息可以快速定位 message和确定 response的最大大小。
【3】通过 index元数据全部映射到 memory,可以避免 segment file的 IO磁盘操作。
【4】通过索引文件稀疏存储,可以大幅降低 index文件元数据占用空间大小。

Kafka 物理存储机制的相关教程结束。

《Kafka 物理存储机制.doc》

下载本文的Word格式文档,以方便收藏与打印。