抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Kafka深入理解

集群的成员关系

​ Kafka 使用Zoo keeper 来维护集群成员的信息。每个broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在broker 启动的时候,它通过创建临时节点把自己的ID 注册到Zookeeper 。Kafka 组件订阅Zoo keeper 的/brokers/ids 路径(bro ker 在Zoo keeper 上的注册路径),当有broker 加入集群或退出集群时,这些组件就可以获得通知。

​ 如果你要启动另一个具有相同ID 的broker ,会得到一个错误一一新broker 会试着进行注册,但不会成功,因为Zoo keeper 里已经有一个具有相同ID 的broker 。

​ 在broker 停机、出现网络分区或长时间垃圾回收停顿时, broker 会从Zookeeper 上断开连接,此时broker 在启动时创建的临时节点会自动从Zoo keeper 上移除。监听broker 列表的Kafka 组件会被告知该broker 已移除。

​ 在关闭broker 时,它对应的节点也会消失,不过它的ID 会继续存在于其他数据结构中。例如,主题的副本列表(下面会介绍)里就可能包含这些白。在完全关闭一个broker 之后,如果使用相同的m 启动另一个全新的broker ,它会立即加入集群,井拥有与旧broker相同的分区和主题。

上面说的kafka启动之后会向zk注册id,可以在zk上查看这些注册的消息!

1
2
3
4
5
[zk: localhost:2181(CONNECTED) 3] ls /           #显示当前zk上的消息信息
[isr_change_notification, zookeeper, admin, consumers, config, controller, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/ids #显示注册的brkers的id。
[3, 2, 1]
#可以尝试着停掉一个对应kafka,对应的broker id就会自动从这里删除。

工作流程

Kafka集群将 Record 流存储在称为 Topic 的类别中,每个记录由一个键、一个值和一个时间戳组成。

Kafka 是一个分布式流平台,这到底是什么意思?

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  • 以容错的持久方式存储记录流。
  • 处理记录流。

​ Kafka 中消息是以 Topic 进行分类的,生产者生产消息,消费者消费消息,面向的都是同一个 Topic。

​ Topic 是逻辑上的概念,而 Partition 是物理上的概念,每个 Partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。

​ Producer 生产的数据会不断追加到该 log 文件末端,且每条数据都有自己的 Offset。

​ 消费者组中的每个消费者,都会实时记录自己消费到了哪个 Offset,以便出错恢复时,从上次的位置继续消费。

控制器

​ 控制器是kafka集群中蛮重要的一个组件,下面我们会说明broker如何成为控制器的。

​ 控制器其实就是一个broker,只不过它除了具有一般broker 的功能之外,还负责分区首领的选举。集群里第一个启动的broker通过在zookeeper里创建一个临时节点/controller让自己成为控制器。其他broker 在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。其他broker在控制器节点上创建Zookeeper watch对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。

​ 如果控制器被关闭或者与Zookeeper 断开连接,Zookeeper上的临时节点就会消失。集群里的其他broker 通过watch 对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper 里成功创建控制器节点的broker 就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建watch 对象。每个新选出的控制器通过Zookeeper 的条件递增操作获得一个全新的、数值更大的controller epoch。其他broker 在知道当前controller epoch后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们。

​ 当控制器发现一个broker已经离开集群(通过观察相关的Zookeeper 路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个broker 上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本),然后向所有包含新首领或现有跟随者的broker 发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者开始从新首领那里复制消息。

​ 当控制器发现一个broker加入集群时,它会使用broker来检查新加入的broker 是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加入的broker 和其他broker,新broker 上的副本开始从首领那里复制消息。简而言之, Kafka 使用Zookeeper的临时节点来选举控制器, 并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用epoch 来避免“脑裂” 。“脑裂”是指两个节点同时认为自己是当前的控制器。

上面的过程说明了broker怎么选举为controller,以及当前的控制器宕之后,新的controller怎么选举过程!在zk上查看控制器的相关信息,如下!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[zk: localhost:2181(CONNECTED) 10] get /controller             
{"version":1,"brokerid":2,"timestamp":"1545987445714"} #显示了当前那个broker是首领
cZxid = 0x600000142
ctime = Fri Dec 28 16:57:25 CST 2018
mZxid = 0x600000142
mtime = Fri Dec 28 16:57:25 CST 2018
pZxid = 0x600000142
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x367ed5d87210001
dataLength = 54
numChildren = 0

[zk: localhost:2181(CONNECTED) 13] get /controller_epoch
27 #这个反应了首领的变更次数
cZxid = 0x200000017
ctime = Thu Dec 20 14:38:04 CST 2018
mZxid = 0x600000143
mtime = Fri Dec 28 16:57:25 CST 2018
pZxid = 0x200000017
cversion = 0
dataVersion = 26
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0

kafka复制

​ 复制功能是Kafka 架构的核心。在Kafka 的文档里, Kafka 把自己描述成“ 一个分布式的、可分区的、可复制的提交日志服务”。复制之所以这么关键,是因为它可以在个别节点失效时仍能保证Kafka 的可用性和持久性。

​ Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker 上,每个broker 可以保存成百上千个属于不同主题和分区的副本。

replication-factor

​ 用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的 broker 上,也就是说副本的数量不能超过 broker 的数量, 否则创建主题时会失败。

比如partions 设置为20,replicationFactor设置为1. Broker为2.可以看出,分区会均匀在broker

比如partions 设置为10,replicationFactor设置为2. Broker为2.每个broker都有副本存在。

首领副本

​ 每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。

跟随者副本

​ 首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩渍,其中的一个跟随者会被提升为新首领。

优先副本

​ 除了当前首领之外, 每个分区都有一个优先副本(首选首领),创建主题时选定的首领分区就是分区的优先副本。 之所以把它叫作优先副本, 是因为在创建分区时, 需要在 broker 之间均衡首领副本。因此, 我们希望首选首领在成为真正的首领时, broker 间的负载最终会得到均衡。 默认情况下, Kafka的 auto.leader.rebalance.enable 被设为 true,它会检査优先副本是不是当前首领,如果不是,并且该副本是同步的, 那么就会触发首领选举, 让优先副本成为当前首领

工作机制

​ 首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的状态一致、在有新消息到达时尝试从首领那里复制消息,不过有各种原因会导致同步失败。例如,网络拥塞导致复制变慢, broker 发生崩横导致复制滞后,直到重启brok er 后复制才会继续。为了与首领保持同步,跟随者向首领发送获取数据的请求,这种请求与悄费者为了读取悄息而发送的请求是一样的。首领将响应消息发给跟随者。请求消息里包含了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。

​ 通过查看每个跟随者请求的最新偏移量,首领就会知道每个跟随者复制的进度。如果跟随者在10 s 内没有请求任何消息,或者虽然在请求消息,但在10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无陆与首领保持一致,在首领发生失效时,它就不可能成为新首领一一毕竟它没有包含全部的消息。相反,持续请求得到的最新悄息副本被称为同步的副本。在首领发生失效时,只有同步副本才有可能被选为新首领。

​ 除了当前首领之外,每个分区都有一个首选首领创建主题时选定的首领就是分区的首选首领。之所以把它叫作首选首领,是因为在创建分区时,需要在broker 之间均衡首领。

​ 因此,我们希望首选首领在成为真正的首领时, broker 间的负载最终会得到均衡。默认情况下,Kafka auto.leader.rebalance被设为true ,它会检查首选首领是不是当前首领, 如果不是,并且该副本是同步的,那么就会触发首领选举,让首选首领成为当前首领。

处理请求的内部机制

​ broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。Kafka 提供了一个二进制协议(基于TCP ),指定了请求消息的格式以及broker 如何对请求作出响应一一包括成功处理请求或在处理请求过程中遇到错误。

客户端发起连接并发送请求,broker 处理请求井作出响应。broker 按照请求到达的顺序来处理它们一一这种顺序保证让Kafka 具有了消息队列的特性,同时保证保存的消息也是有序的。

所有的请求消息都包含一个标准消息头:

  • Request type(也就是 API key)
  • Request version( broker 可以处理不同版本的客户端请求,并根据客户端版本作出不同的响应)
  • Correlation id-一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)
  • Client Id 用于标识发送请求的客户端

​ broker 会在它所监听的每一个端口上运行一个acceptor线程,这个线程会创建一个连接,并把它交给processor线程去处理。processor线程(也被叫作“网络线程”)的数量是可配置的。网络线程负责从客户端获取请求悄息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。请求消息被放到请求队列后,IO 线程会负责处理它们。

生产请求和获取请求都必须发送给分区的首领副本。如果broker 收到一个针对特定分区的请求,而该分区的首领在另一个broker 上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的broker上,也会出现同样的错误。Kafka 客户端要自己负责把生产请求和获取请求发送到正确的broker 上。

​ 那么客户端怎么知道该往哪里发送请求呢?客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本, 以及哪个副本是首领。元数据请求可以发送给任意一个broker ,因为所有broker 都缓存了这些信息

​ 一般情况下,客户端会把这些信息缓存起来,并直接往目标broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过meta.max.age.ms参数来配置,2.1.3 的客户端默认参数30S),从而知道元数据是否发生了变更一一比如,在新broker 加入集群时,部分副本会被移动到新的broker 上。另外,如果客户端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息,之前的请求被发到了错误的broker 上。

生产者的请求

​ 我们曾经说过,acks 这个配置参数,该参数指定了需要多少个 broker 确认才可以认为一个消息写入是成功的。 不同的配置对“写入成功” 的界定是不一样的,如果 acks=1,那么只要首领收到消息就认为写入成功,如果 acks=all,那么需要所有同步副本收到消息才算写入成功; 如果 acks=0, 那么生产者在把消息发出去之后, 完全不需要等待 broker 的响应,包含首领副本的 broker 在收到生产请求时, 会对请求做一些验证。

发送数据的用户是否有主题写入权限?

​ 请求里包含的 acks 值是否有效(只允许出现 0、1 或 all) ?(ack=-1 等同于 ack=all)如果 acks=all, 是否有足够多的同步副本保证消息已经被安全写入?

​ 之后,消息被写入本地磁盘。 在 Linux 系统上,消息会被写到文件系统缓存里,并不保证它们何时会被刷新到磁盘上。 Kafka 不会一直等待数据被写到磁盘上, 它依赖复制功能来保证消息的持久性。
​ 在消息被写入分区的首领之后, broker 开始检査 acks 配置参数一如果 acks 被设为 0 或 1, 那么 broker 立即返回响应;如果 acks 被设为 all,那么请求会被保存在一个叫作炼狱的缓冲区里, 直到首领发现所有跟随者副本都复制了消息, 响应才会被返回给客户端

消费者请求

​ broker 处理获取请求的方式与处理生产请求的方式很相似。 客户端发送请求,向 broker 请求主题分区里具有特定偏移量的消息, 好像在说: “请把主题 Test 分区 0 偏移量从 53 开始的消息以及主题 Test 分区 3 偏移量从 64 开始的消息发给我。 ” 客户端还可以指定 broker 最多可以从一个分区里返回多少数据。 这个限制是非常重要的, 因为客户端需要为 broker 返回的数据分配足够的内存。 如果没有这个限制, broker 返回的大量数据有可能耗尽客户端的内存 。

​ 我们之前讨论过,请求需要先到达指定的分区首领上,然后客户端通过査询元数据来确保请求的路由是正确的。 首领在收到请求时,它会先检査请求是否有效,比如,指定的偏移量在分区上是否存在?如果客户端请求的是已经被删除的数据,或者请求的偏移量不存在, 那么 broker 将返回一个错误 。

​ 如果请求的偏移量存在, broker 将按照客户端指定的数量上限从分区里读取消息, 再把消息返回给客户端。Kafka 使用零复制技术向客户端发送消息一一也就是说, Kafka 直接把消息从文件(或者更确切地说是 Linux 文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。这是 Kafka 与其他大部分数据库系统不一样的地方, 其他数据库在将数据发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制, 也不需要管理内存缓冲区, 从而获得更好的性能。

​ 客户端除了可以设置 broker 返回数据的上限, 也可以设置下限。例如, 如果把下限设置为 10KB,就好像是在告诉 broker:“等到有 10KB 数据的时候再把它们发送给我。” 在主题消息流量不是很大的情况下,这样可以减少 CPU 和网络开销。客户端发送一个请求, broker 等到有足够的数据时才把它们返回给客户端, 然后客户端再发出情求, 而不是让客户端每隔几毫秒就发送一次请求,每次只能得到很少的数据甚至没有数据。对比这两种情况, 它们最终读取的数据总量是一样的, 但前者的来回传送次数更少, 因此开销也更小。

​ 当然,我们不会让客户端一直等待 broker 累积数据。在等待了一段时间之后,就可以把可用的数据拿回处理,而不是一直等待下去。所以,客户端可以定义一个超时时间,告诉 broker: “如果你无法在 K 毫秒内累积满足要求的数据量, 那么就把当前这些数据返回给我。”

ISR

并不是所有保存在分区首领上的数据都可以被客户端读取。大部分客户端只能读取已经被写入所有同步副本的消息。分区首领知道每个消息会被复制到哪个副本上,在消息还没有被写入所有同步副本之前,是不会发送给消费者的,尝试获取这些消息的请求会得到空的响应而不是错误。

​ 因为还没有被足够多副本复制的消息被认为是“不安全” 的,如果首领发生崩横,另一 个副本成为新首领,那么这些消息就丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者读取并处理了这样的一个消息,而另一个消费者发现这个消息其实并不存在。所以,我们会等到所有同步副本复制了这些消息,才允许消费者读取它们。这也意味着,如果 broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕) 。延迟时间可以通过参数 replica. lag. time. max. ms 来配置, 它指定了副本在复制消息时可被允许的最大延迟时间。

​ Kafka 的数据复制是以 Partition 为单位的。而多个备份间的数据复制,通过 Follower 向 Leader 拉取数据完成。 从一这点来讲,有点像 Master-Slave 方案。不同的是,Kafka 既不是完全的同步复制,也不是完全的异步复制,而是基于 ISR 的动态复制方案。

​ ISR,也即 In-Sync Replica。每个 Partition 的 Leader 都会维护这样一个列表,该列表中,包含了所有与之同步的 Replica(包含 Leader 自己)。每次数据写入时,只有 ISR 中的所有 Replica 都复制完,Leader 才会将其置为 Commit,它才能被 Consumer 所消费。

​ 这种方案,与同步复制非常接近。但不同的是,这个 ISR 是由 Leader 动态维护的。如果 Follower 不能紧“跟上”Leader,它将被 Leader 从 ISR 中移除,待它又重新“跟上”Leader 后,会被 Leader 再次加加 ISR 中。每次改变 ISR 后,Leader 都会将最新的 ISR 持久化到 Zookeeper 中。

​ 至于如何判断某个 Follower 是否“跟上”Leader,不同版本的 Kafka 的策略稍微有些区别。

​ 从 0.9.0.0 版本开始,replica.lag.max.messages 被移除,故 Leader 不再考虑 Follower 落后的消息条数。 另外,Leader 不仅会判断 Follower 是否在replica.lag.time.max.ms 时间内向其发送 Fetch 请求,同时还会考虑 Follower 是否在该时间内与之保持同步。

​ 在第一步中,Leader A 总共收到 3 条消息,但由于 ISR 中的 Follower 只同步了第 1 条消息(m1) , 故只有 m1 被 Commit, 也即只有 m1 可被 Consumer消费。此时 Follower B 与 Leader A 的差距是 1, 而 Follower C 与 Leader A 的差距是 2,虽然有消息的差距, 但是满足同步副本的要求保留在 ISR 中。 同步副本概念参见《复制》

​ 在第二步中,由于旧的 Leader A 宕机,新的 Leader B 在 replica.lag.time.max.ms 时间内未收到来自 A 的Fetch 请求,故将 A 从 ISR 中移除,此时 ISR={B,C}。同时,由于此时新的 Leader B 中只有 2 条消息,并未包含 m3(m3 从未被任何 Leader 所 Commit),所以 m3 无法被 Consumer 消费。
(上图中就是因为 acks 不为 all 或者-1,不全部复制, 就会导致单台服务器宕机时的数据丢失 m3 丢失了)

使用ISR方案的原因
  1. 由于Leader可移除不能及时与之同步的Follower,故与同步复制相比可避免最慢的Follower拖慢整体速度,也即ISR提高了系统可用性。

  2. ISR中的所有Follower都包含了所有Commit过的消息,而只有Commit过的消息才会被Consumer消费,故从Consumer的角度而言,ISR中的所有Replica都始终处于同步状态,从而与异步复制方案相比提高了数据一致性。

  3. ISR可动态调整,极限情况下,可以只包含Leader,极大提高了可容忍的宕机的Follower的数量。与Majority Quorum方案相比,容忍相同个数的节点失败,所要求的总节点数少了近一半。

ISR相关配置说明
  1. Broker的min.insync.replicas参数指定了Broker所要求的ISR最小长度,默认值为1。也即极限情况下ISR可以只包含Leader。但此时如果Leader宕机,则该Partition不可用,可用性得不到保证。

  2. 只有被ISR中所有Replica同步的消息才被Commit,但Producer发布数据时,Leader并不需要ISR中的所有Replica同步该数据才确认收到数据。Producer可以通过acks参数指定最少需要多少个Replica确认收到该消息才视为该消息发送成功。acks的默认值是1,即Leader收到该消息后立即告诉Producer收到该消息,此时如果在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。而如果将该值设置为0,则Producer发送完数据后,立即认为该数据发送成功,不作任何等待,而实际上该数据可能发送失败,并且Producer的Retry机制将不生效。更推荐的做法是,将acks设置为all或者-1,此时只有ISR中的所有Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。

ACK 应答机制

​ 对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 Follower 全部接受成功。

​ 所以 Kafka 为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡,选择以下的配置。

ACK 参数配置
  • 0:Producer 不等待 Broker 的 ACK,这提供了最低延迟,Broker 一收到数据还没有写入磁盘就已经返回,当 Broker 故障时有可能丢失数据。
  • 1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盘成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么将会丢失数据。
  • -1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盘成功后才返回 ACK。但是在 Broker 发送 ACK 时,Leader 发生故障,则会造成数据重复。

故障处理细节

  • LEO:每个副本最大的 Offset。HW:消费者能见到的最大的 Offset,ISR 队列中最小的 LEO。

  • Follower 故障:Follower 发生故障后会被临时踢出 ISR 集合,待该 Follower 恢复后,Follower 会 读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步数据操作。

    ​ 等该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了。

  • Leader 故障:Leader 发生故障后,会从 ISR 中选出一个新的 Leader,之后,为保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

物理存储机制

Kafka 的基本存储单元是分区。分区无法在多个 broker 间进行再细分,也无法在同一个 broker 的多个磁盘上进行再细分。在配置 Kafka的时候, 管理员指定了一个用于存储分区的目录清单——也就是log.dirs参数的值 (不要把它与存放错误日志的目录混淆了, 日志目录是配置在 1og4j.properties 文件里的)。 该参数一般会包含每个挂载点的目录。

存储结构

​ 由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制。

​ 它将每个 Partition 分为多个 Segment,每个 Segment 对应两个文件:“.index” 索引文件和 “.log” 数据文件。

​ 这些文件位于同一文件下,该文件夹的命名规则为:topic 名-分区号。例如,first 这个 topic 有三分分区,则其对应的文件夹为 first-0,first-1,first-2。

1
2
3
4
5
6
# ls /root/data/kafka/first-0        
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
00000000000000009014.snapshot
leader-epoch-checkpoint

index 和 log 文件以当前 Segment 的第一条消息的 Offset 命名。下图为 index 文件和 log 文件的结构示意图:

​ “.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 Message 的物理偏移量。

分区分配

​ 在创建主题时, Kafka 首先会决定如何在 broker 间分配分区。假设你有 6 个 broker, 打算创建一个包含 10 个分区的主题,并且复制系数为 3(确保至少有 3 台 broker)。那么 Kafka 就会有 30 个分区副本, 它们可以被分配给 6 个 broker。在进行分区分配时, 我们要达到如下的目标:

  • 在 broker 间平均地分布分区副本。 对于我们的例子来说, 就是要保证每个 broker 可以分到 5 个副本。
  • 确保每个分区的每个副本分布在不同的 broker 上。 假设分区 0 的首领副本在 broker2 上,,那么可以把跟随者副本放在 broker3 和 broker4 上, 但不能放在 broker2 上,也不能两个都放在 broker3 上。
  • 如果为 broker 指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的 broker 上 。 这样做是为了保证一个机架的不可用不会导致整体的分区不可用 。

​ 为了实现这个目标, 我们先随机选择一个 broker(假设是 4) , 然后使用轮询的方式给每个 broker 分配分区来确定首领分区的位置。 于是,首领分区 0会在 broker4 上,首领分区 1 会在 broker5 上, 首领分区 2 会在 broker 0 上(只有 6 个 broker), 并以此类推。 然后, 我们从分区首领开始,依次分配跟随者副本。 如果分区 0 的首领在 broker4 上,那么它的第一个跟随者副本会在 broker5 上,第二个跟随者副本会在 broker 0 上。 分区 1 的首领在 broker5 上,那么它的第一个跟随者副本在 broker0 上,第二个跟随者副本在 broker1 上。

​ 为分区和副本选好合适的 broker 之后, 接下来要决定这些分区应该使用哪个目录。 我们单独为每个分区分配目录, 规则很简单: 计算每个目录里的分区数量, 新的分区总是被添加到数量最小的那个目录里。 也就是说, 如果添加了一个新磁量, 所有新的分区都会被创建到这个磁盘上。 因为在完成分配工作之前,新磁盘的分区数量总是最少的。 (最少使用原则)

文件管理

​ 保留数据是 Kafka 的一个基本特性,Kafka 不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,Kafka 管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小 。

​ 因为在一个大文件里査找和删除消息是很费时的,也很容易出错,所以分区分成若干个片段。默认情况下,每个片段包含 1GB 或一周的数据,以较小的那个为准。在 broker 往分区写入数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。

​ 当前正在写入数据的片段叫作活跃片段,活动片段永远不会被删除,所以如果你要保留数据 1 天,但片段里包含了 5 天的数据,那么这些数据会被保留5 天,因为在片段被关闭之前这些数据无法被删除。如果你要保留数据一周,而且每天使用一个新片段,那么你就会看到,每天在使用一个新片段的同时会删除一个最老的片段一所以大部分时间该分区会有 7 个片段存在。

文件格式

​ Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的 。 因为使用了相同的消息格式进行磁盘存储和网络传输, Kafka 可以使用零复制技术给消费者发送消息, 同时避免了对生产者已经压缩过的消息进行解压和再圧缩。

​ 除了键、 值和偏移量外,消息里还包含了消息大小、 校验和、 消息格式版本号、 压缩算法(snappy、 Gzip 或 Lz4)和时间戳(在 0.10.0 版本里引入的)。 时间戳可以是生产者发送消息的时间, 也可以是消息到达 broker 的时间, 这个是可配置的。

​ 如果生产者发送的是圧缩过的消息,那么同一个批次的消息会被压缩在一起,被当作 “包装消息” 进行发送,于是,broker 就会收到一个这样的消息,然后再把它发送给消费者。消费者在解压这个消息之后,会看到整个批次的消息,它们都有自己的时间戳和偏移量。

如果在生产者端使用了压缩功能(极力推荐),那么发送的批次越大,就意味着在网络传输和磁盘存储方面会获得越好的压缩性能,同时意味着如果修改了消费者使用的消息格式 (例如, 在消息里增加了时间戳) ,那么网络传输和磁盘存储的格式也要随之修改,而且 broker 要知道如何处理包含了两种消息格式的文件。 一种是普通消息, 一种是包装消息

​ Kafka 附带了一个叫 DumpLogSegment 的工具,可以用它査看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。

索引

​ 消费者可以从Kafka的任意可用偏移量位置开始读取消息,假设消费者要读取从偏移量100开始的1MB消息,那么broker必须立即定位到偏移量100(可能是在分区的任意一个片段里),然后开始从这个位置读取消息,为了帮助broker 更快地定位到指定的偏移量, Kafka 为每个分区维护了一个索引,索引把偏移量映射到片段文件和偏移量在文件里的位置。

​ 索引也被分成片段, 所以在删除消息时, 也可以删除相应的索引 。Kafka 不维护索引的校验和。 如果索引出现损坏, Kafka 会通过重新读取消息并录制偏移量和位置来重新生成索引。如果有必要, 管理员可以删除索引, 这样做是绝对安全的, Kafka 会自动重新生成这些索引。

超时数据的清理机制

​ 一般情况下,Kafka 会根据设置的时间保留数据,把超过时效的旧数据删除掉,不过,试想一下这样的场景,如果你使用 Kafka 保存客户的收货地址,那么保存客户的最新地址比保存客户上周甚至去年的地址要有意义得多,这样你就不用担心会用错旧地址,而且短时间内客户也不会修改新地址。另外一个场景,一个应用程序使用 Kafka 保存它的状态,每次状态发生变化,它就把状态写入 Kafka。在应用程序从崩演中恢复时,它从 Kafka 读取消息来恢复最近的状态,在这种情况下,应用程序只关心它在崩粉前的那个状态,而不关心运行过程中的那些状态。

​ Kafka 通过改变主题的保留策略来满足这些使用场景 ,早于保留时间的事件会被删除, 为每个键保留最新的值, 从而达到清理的效果每个日志片段可以分为以下两个部分 :

  • 干净的部分,这些消息之前被清理过, 每个键只有一个对应的值,这个值是上一次清理时保留下来的。

  • 污浊的部分,这些消息是在上一次清理之后写入的。

清理分区

​ 为了清理分区, 清理线程会读取分区的污独部分, 并在内存里创建一个 map。 map 里的每个元素包含了消息键的散列值和消息的偏移量,键的散列值是 16B,加上偏移量总共是 24B。 如果要清理一个 1GB 的日志片段,并假设每个消息大小为 1KB,那么这个片段就包含_一百万个消息,而我们只需要用 24MB的 map 就可以清理这个片段。 (如果有重复的键, 可以重用散列项, 从而使用更少的内存。 )

​ 清理线程在创建好偏移量 map 后,开始从干净的片段处读取消息,从最旧的消息开始,把它们的内容与 map 里的内容进行比对。它会检査消息的键是否存在于 map 中, 如果不存在,那么说明消息的值是最新的,就把消息复制到替換片段上。如果键已存在,消息会被忽略,,因为在分区的后部已经有一个具有相同键的消息存在。 在复制完所有的消息之后,我们就将替换片段与原始片段进行交换,然后开始清理下一个片段。完成整个清理过程之后,每个键对应一个不同的消息一这些消息的值都是最新的。清理前后的分区片段如图所示

​ 清理的思想就是根据 Key 的重复来进行整理,注意,它不是数据删除策略,而是类似于压缩策略,如果 key 送入了值,对于业务来说,key 的值应该是最新的 value 才有意义,所以进行清理后只会保存一个 key 的最新的 value,这个适用于一些业务场景,比如说 key 代表用户 ID,Value 用户名称,如果使用清理功能就能够达到最新的用户的名称的消息(这个功能有限,请参考使用)

评论