消息队列入门了解

之前看了许多博客,零零散散的,差不多了解了个大概。

(1 封私信) 消息队列(mq)是什么 - 知乎 (zhihu.com)

Kafka简明教程 - 知乎 (zhihu.com)

消息队列本身是一个消息中间件,如同一个队列一样,生产者生产消息到消息队列里,然后消费者对队列中的消息进行消费。

它的作用主要有两点:

  • 解耦消息的生产和消费。
  • 缓冲。(流量高时削峰)
  • 让消息被异步处理

消息队列造成的一些问题:

  • 系统复杂性

本来蛮简单的一个系统,我代码随便写都没事,现在你凭空接入一个中间件在那,我是不是要考虑去维护他,而且使用的过程中是不是要考虑各种问题,比如消息重复消费消息丢失消息的顺序消费等等,反正用了之后就是贼烦。

  • 数据一致性

这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点。

你下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败你就不管了?

我说了保证自己的业务数据对的就好了,其实还是比较不负责任的一种说法,这样就像个渣男,没有格局这样呀你的路会越走越窄的

  • 可用性

你搞个系统本身没啥问题,你现在突然接入一个中间件在那放着,万一挂了怎么办?我下个单MQ挂了,优惠券不扣了,积分不减了,这不是一个程序员能搞定的吧。

消息队列与消费者之间的投递方式

在 「消息队列由哪些角色组成?」 中,我们已经提到消息队列有 push 推送pull 拉取两种投递方式。

一种模型的某些场景下的优点,在另一些场景就可能是缺点。无论是 push 还是 pull ,都存在各种的利弊。

  • push

    • 优点,就是及时性。
    • 缺点,就是受限于消费者的消费能力,可能造成消息的堆积,Broker 会不断给消费者发送不能处理的消息。
  • pull

    • 优点,就是主动权掌握在消费方,可以根据自己的消息速度进行消息拉取。
    • 缺点,就是消费方不知道什么时候可以获取的最新的消息,会有消息延迟和忙等。

目前的消息队列,基于 push + pull 模式结合的方式,Broker 仅仅告诉 Consumer 有新的消息,具体的消息拉取,还是 Consumer 自己主动拉取。

  1. 一个功能的实现,有多种实现方式,有优点就有缺点。并且,一个实现的缺点,恰好是另外一个实现的优点。
  2. 一个功能的实现,可能是多种实现方式的结合,取一个平衡点,不那么优,也不那么缺。再说一句题外话,是和否之间,还有灰色地方。

kafka利用的就是生产者push到消息队列中,消费者pull拉取消息队列中消息的方式进行通信。

消息系统都致力于让 consumer 以最大的速率最快速的消费消息,但不幸的是,push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。最终 Kafka 还是选取了传统的 pull 模式。

Pull 模式的另外一个好处是 consumer 可以自主决定是否批量的从 broker 拉取数据 。Push 模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull 模式下,consumer 就可以根据自己的消费能力去决定这些策略。 Pull 有个缺点是,如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达。为了避免这点,Kafka 有个参数可以让 consumer阻塞知道新消息到达(有点类似java中cas轮询锁升级成为重量级锁的过程,当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。

如何实现消息的幂等性

这里实现幂等性,不是在消息队列层面实现的,而是在消费者层面实现的。因为,就算消息队列实现了处理生产者消息的幂等性,在消息队列传送消息到消费者的时候,也会有网络原因造成的消息重发,最终,消费者仍然需要处理幂等性的问题。

在 「消息队列有几种消费语义?」 中,我们已经看了三种消费语义。如果要达到消费者的消费消息的幂等性,就需要消息仅被消费一次,且每条消息从 Producer 保证被送达,并且被 Consumer 仅消费一次

那么,我们就基于这个场景,来思考下,为什么会出现消息重复的问题?

  • 对于 Producer 来说

    • 可能因为网络问题,Producer 重试多次发送消息,实际第一次就发送成功,那么就会产生多条相同的消息。
    • ….
  • 对于 Consumer 来说

    • 可能因为 Broker 的消息进度丢失,导致消息重复投递给 Consumer 。
    • Consumer 消费成功,但是因为 JVM 异常崩溃,导致消息的消费进度未及时同步给 Consumer 。 对于大多数消息队列,考虑到性能,消费进度是异步定时同步给 Broker 。

如何解决

所以,上述的种种情况,都可能导致消费者会获取到重复的消息,那么我们的思考就无法是解决不发送、投递重复的消息,而是消费者在消费时,如何保证幂等性。

消费者实现幂等性,有两种方式:

  1. 框架层统一封装。
  2. 业务层自己实现。

框架层统一封装

首先,需要有一个消息排重的唯一标识,该编号只能由 Producer 生成,例如说使用 uuid、或者其它唯一编号的算法 。

然后,就需要有一个排重的存储器,例如说:

  • 使用关系数据库,增加一个排重表,使用消息编号作为唯一主键。
  • 使用 KV 数据库,KEY 存储消息编号,VALUE 任一。此处,暂时不考虑 KV 数据库持久化的问题

那么,我们要什么时候插入这条排重记录呢?

  • 在消息消费执行业务逻辑之前,插入这条排重记录。但是,此时会有可能 JVM 异常崩溃。那么 JVM 重启后,这条消息就无法被消费了。因为,已经存在这条排重记录。

  • 在消息消费执行业务逻辑之后,插入这条排重记录。

    • 如果业务逻辑执行失败,显然,我们不能插入这条排重记录,因为我们后续要消费重试。
    • 如果业务逻辑执行成功,此时,我们可以插入这条排重记录。但是,万一插入这条排重记录失败呢?那么,需要让插入记录和业务逻辑在同一个事务当中,此时,我们只能使用数据库

业务层自己实现

方式很多,这个和 HTTP 请求实现幂等是一样的逻辑:

  • 先查询数据库,判断数据是否已经被更新过。如果是,则直接返回消费完成,否则执行消费。
  • 更新数据库时,带上数据的状态。如果更新失败,则直接返回消费完成,否则执行消费。

如果胖友的系统的并发量非常大,可以使用 Zookeeper 或者 Redis 实现分布式锁,避免并发带来的问题。当然,引入一个组件,也会带来另外的复杂性:

  1. 系统的并发能力下降。
  2. Zookeeper 和 Redis 在获取分布式锁时,发现它们已经挂掉,此时到底要不要继续执行下去呢?

选择

正常情况下,出现重复消息的概率其实很小,如果由框架层统一封装来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务层自己实现处理消息重复的问题。

当然,这两种方式不是冲突的。可以提供不同类型的消息,根据配置,使用哪种方式。例如说:

  • 默认情况下,开启【框架层统一封装】的功能。
  • 可以通过配置,关闭【框架层统一封装】的功能。

当然,如果可能的话,尽可能业务层自己实现。但是,实际上,很多时候,开发者不太会注意,哈哈哈哈。

关于kafka的更多细节(异步通信)

名词解释

1)Broker: Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

2)Producer: 负责发布消息到Kafka broker

3)Consumer: 消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。

4)Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

5)Partition: Partition是物理上的概念,每个Topic包含一个或多个Partition.

6)Consumer Group: 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

7)Topic & Partition Topic: 在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1)。

kafka 维护消费状态

kafka在底层使用了offset,对于每一个消费者,其消费的topic的每一个partition区域都会维护一个offset变量,记录着在当前partition中consumer所阅读的下标位置。Consumer 必须自己从 Topic 的 Partition 拉取消息。一个 Consumer 连接到一个 Broker 的 Partition,从中依次读取消息。

img

这里有一个消费者组的概念:同一个消费者组中的两个消费者,不会去消费同一条消息,也就是说,其实这一个消费者组,就相当于一个大的消费者,那么这个大的消费者,一定是每条消息只会消费一次,不会重复消费的。

而如果,两个消费者处于不同的消费者组,那么,完全有可能消费到相同的消息。

kafka保证顺序一致性

img

发送端只保证了partition级别的顺序一致性,不保证topic级别的顺序。可以指定key值让相同key的数据到同一个partition保证顺序一致性或者是对某个需要保证顺序的topic,设置其partition为1,这样也可以保证其消费端使用offset保证读入数据的一致性。

Kafka的ack机制

Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡

ack有可选值 1,0,-1(默认是1)。

  • 当ack=1时,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。(异步复制)
  • 当ack=0时,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。(这种效率最高,吞吐量最大)
  • 当ack=-1时,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。也就是说,不仅仅是leader副本(同步复制

Partition 为 Kafka 提供了扩展能力

img

一个 Kafka 集群由多个 Broker(就是 Server) 构成,每个 Broker 中含有集群的部分数据。

Kafka 把 Topic 的多个 Partition 分布在多个 Broker 中。

这样会有多种好处:

  • 如果把 Topic 的所有 Partition 都放在一个 Broker 上,那么这个 Topic 的可扩展性就大大降低了,会受限于这个 Broker 的 IO 能力。把 Partition 分散开之后,Topic 就可以水平扩展 。
  • 一个 Topic 可以被多个 Consumer 并行消费。如果 Topic 的所有 Partition 都在一个 Broker,那么支持的 Consumer 数量就有限,而分散之后,可以支持更多的 Consumer。
  • 一个 Consumer 可以有多个实例,Partition 分布在多个 Broker 的话,Consumer 的多个实例就可以连接不同的 Broker,大大提升了消息处理能力。可以让一个 Consumer 实例负责一个 Partition,这样消息处理既清晰又高效。

Zookeeper对于Kafka的作用

Zookeeper 是一个开放源码的、高性能的协调服务(我感觉就像是redis主从架构里面的增强版哨兵角色),它用于 Kafka 的分布式应用。Zookeeper 主要用于在集群中不同节点之间进行通信。在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。

Zookeeper中offset的存储方式

由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以groupid-topic-partition -> offset的方式保存。如图所示:

image-20220520205548302

Kafka在保存Offset的时候,实际上是将Consumer Group和partition对应的offset以消息的方式保存在__consumers_offsets这个topic中

kafka如何解决消息不丢失

从producer端 —> kafka 不丢失

producer端:

  • 把producer端的异步发送改成同步发送,这样producer就能实时知道发送消息的结果。
  • 添加异步回调函数来监听消息发送的结果,如果发送失败,可以在回调中重试。

kafka的broker端:

  • 将消息持久化,broker先把消息写入到pagecache,然后操作系统统一对pagecache进行刷盘。(如果刷盘之前宕机了,就会导致数据丢失)

    ![屏幕截图 2022-05-20 214131](https://rufuspic.oss-cn-chengdu.aliyuncs.com/markdown_imgs/屏幕截图 2022-05-20 214131.png)

  • Partition副本集机制,对于每一个topic,每个broker中都存了唯一的一个leader(保证了分区容错性)

    • 在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
    • Kafka 的副本机制比其他分布式系统要更严格一些。在 Kafka 中,追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
    • 当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
  • ack响应机制,这里参考上文写的。(ack主要针对的是)

从 kafka —> producer 端:

  • 主要是一个offset超前提交的问题,其实是可以通过重新调整offset的值来挽回,问题不大。

消息队列的三个基本语义

根据我自己的理解,我觉得这涉及到消息队列的三个基本语义,一共有 3 种,分别如下:

  1. 消息至多被消费一次(At most once):消息可能会丢失,但绝不重传。
  2. 消息至少被消费一次(At least once):消息可以重传,但绝不丢失。
  3. 消息仅被消费一次(Exactly once):每一条消息只被传递一次。

消息队列的Replicas机制

追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,就存在着不可能与 Leader 实时同步的风险。在探讨如何正确应对这种风险之前,我们必须要精确地知道同步的含义是什么。或者说,Kafka 要明确地告诉我们,追随者副本到底在什么条件下才算与 Leader 同步。

基于这个想法,Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。那么,到底什么副本能够进入到 ISR 中呢?

我们首先要明确的是,Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。

另外,能够进入到 ISR 的追随者副本要满足一定的条件。至于是什么条件,我先卖个关子,我们先来一起看看下面这张图。

img

图中有 3 个副本:1 个领导者副本和 2 个追随者副本。Leader 副本当前写入了 10 条消息,Follower1 副本同步了其中的 6 条消息,而 Follower2 副本只同步了其中的 3 条消息。

现在,请你思考一下,对于这 2 个追随者副本,你觉得哪个追随者副本与 Leader 不同步?

答案是,要根据具体情况来定。换成英文,就是那句著名的“It depends”。看上去好像 Follower2 的消息数比 Leader 少了很多,它是最有可能与 Leader 不同步的。的确是这样的,但仅仅是可能。

事实上,这张图中的 2 个 Follower 副本都有可能与 Leader 不同步,但也都有可能与 Leader 同步。也就是说,Kafka 判断 Follower 是否与 Leader 同步的标准,不是看相差的消息数,而是另有“玄机”。

**这个标准就是 Broker 端参数 replica.lag.time.max.ms 参数值。**这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。

我们在前面说过,Follower 副本唯一的工作就是不断地从 Leader 副本拉取消息,然后写入到自己的提交日志中。如果这个同步过程的速度持续慢于 Leader 副本的消息写入速度,那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。

值得注意的是,倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的。

Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:**/brokers/topics/[topic]/partitions/[partition]/state。**目前有两个地方会对这个Zookeeper的节点进行维护:

Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。

Leader来维护:leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。

Unclean 领导者选举(Unclean Leader Election)

既然 ISR 是可以动态调整的,那么自然就可以出现这样的情形:ISR 为空。因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。可是 ISR 是空,此时该怎么选举新 Leader 呢?

**Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。**通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。

开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

如果你听说过 CAP 理论的话,你一定知道,一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。

你可以根据你的实际业务场景决定是否开启 Unclean 领导者选举。不过,我强烈建议你不要开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。

Zookeeper 在 Kafka 中的作用概括

leader 选举 和 follower 信息同步

3149801-304988f765bfa28a 如上图所示,kafaka集群的 broker,和 Consumer 都需要连接 Zookeeper。 Producer 直接连接 Broker(producer不用连接zk)。

Producer 把数据上传到 Broker,Producer可以指定数据有几个分区、几个备份。上面的图中,数据有两个分区 0、1,每个分区都有自己的副本:0’、 1’。

黄色的分区为 leader,白色的为 follower。

leader 处理 partition 的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。 如下图所示,红色的为 leader,绿色的为 follower,leader复制自己到其他 Broker 中:

img

如果leader发生故障或挂掉,一个新leader被选举并接收客户端的消息。Kafka确保从同步副本列表中选举一个副本为 leader。关于follower 的同步机制可参考:https://blog.csdn.net/lizhitao/article/details/51718185

Topic 分区被放在不同的 Broker 中,保证 Producer 和 Consumer 错开访问 Broker,避免访问单个 Broker造成过度的IO压力,使得负载均衡。

Zookeeper 在 Kafka 中的作用(详细)

image-20220521162910306

1、Broker注册

Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:

/brokers/ids

每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。

Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。

2、Topic注册

在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:

/borkers/topics

Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。

3、生产者负载均衡

由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

(1) 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。

(2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。

4、消费者负载均衡

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。

5、分区与消费者的关系

消费组 (Consumer Group): consumer group 下有多个 Consumer(消费者)。 对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。 同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。

在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。

6、消息消费进度Offset 记录

在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

节点内容就是Offset的值。

7、消费者注册

消费者服务器在初始化启动时加入消费者分组的步骤如下

注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

对消费者分组中的消费者的变化注册监听。每个消费者都需要关注所属消费者分组 中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。

对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。

进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消息分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。