本文是极客时间《消息队列高手课》的学习笔记。
常用场景
- 异步处理
- 流量控制(令牌桶)
- 服务解耦
- 发布、订阅系统(观察者模式)
- 连接流计算任务和数据
- 消息广播给大量接收者
局限
- 引入消息队列带来的延迟问题
- 增加系统复杂度
- 可能产生数据不一致问题
主流开源MQ
RabbitMQ
- 服务端采用Erlang语言编写
- 轻量级,部署使用简单
- 性能略差,每秒几万到十几万条消息
- 大量消息堆积将导致性能急剧下降
- 生产者和队列之间灵活的路由规则配置
RocketMQ
- 阿里开源
- 服务端Java语言编写
- 毫秒级响应,低延迟,实时性高,金融级稳定性,适合在线业务场景
- 性能好,每秒钟大概能处理几十万条消息
- 国产,与国际周边生态系统集成兼容度低
kafka
- LinkedIn开源
- 周边生态兼容好,尤其在大数据和流式计算领域
- 使用Scala和Java语言
- 设计上大量使用批量和异步思想,优点是吞吐量大,每秒几十万条消息,适合处理海量消息
- 批量和异步设计缺点是同步收发消息的响应时延比较高,不太适合在线业务场景
Pulsar
- 新兴的开源消息队列产品
- Yahoo开发并开源
- 采用存储和计算分离的设计(消息数据保存在BookKeeper,元数据ZooKeeper,Broker无状态)
MQ选择要点:
如果说,消息队列并不是你将要构建系统的主角之一,你对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,我建议你使用 RabbitMQ 。
如果你的系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性是你需要的。
如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合你的消息队列。
术语和概念
- 主题(Topic):服务端存放消息的容器
- 队列(Queue):先进先出FIFO线性表,可以保证消息的有序性
- 分区(Partition):和队列的概念类似
- 生产者(Producer):发出消息的程序
- 消费者(ConSumer):接收消息的程序
- 消费组(Consumer Group):多个特定消费者组成的组
- 发布者(Publisher):同生产者
- 订阅者(Subscriber):同消费者
- 消息关键字(Key):用于生产者按hash等规则将消息映射到队列
- 消息标签(Tag):用于消费者消费指定标签的消息
消息确认机制
几乎所有的消息队列产品都使用一种非常朴素的 “ 请求 - 确认 ” 机制,确保消息不会在传递过程中由于网络或服务器故障丢失。
在生产端,生产者先将消息发送给服务端,也就是Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。 如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息。
在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。
确认机制很好地保证了消息传递过程中的可靠性。但带来的问题是在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞。
两种消息模型
- 队列模型:服务代理端消息存储在FIFO队列中,生产者发消息入队,消费者收消息出队删除
- 发布-订阅模型:服务端将消息存放在主题中,发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”,订阅的主题将被发送给订阅者消费
这两种消息模型本质是相同的,生产者就是发布者,消费者就是订阅者,队列就是主题。他们最大的区别是一份消息数据能不能被消费多次。
发布订阅模型中,如果只有一个订阅者,那和队列模型就基本一样了。发布订阅模型在功能层面可以兼容队列模型。
现代消息队列大多是发布订阅模型,比如RocketMq和Kafka。RabbitMq是队列模型。
RabbitMq消息模型
RabbitMq是少数坚持使用队列模型的产品。也支持多消费者,服务端有多个队列,每个消费者对应一个队列,通过位于生产者和队列之间Exchange模块上配置的策略来决定将消息投递到哪些队列中,消息将被复制多份投入到不同队列。
RocketMq消息模型
RocketMq采用标准的发布订阅模型。
每个主题包含多个队列(队列数量可以水平进行扩容),通过多个队列来实现多实例并行生产和消费。 RocketMq只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
生产者将消息发往主题中的队列,某条消息只会发送到其中的一个队列,队列的选择方式可以是轮询或哈希等规则。
每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
由于主题中有多个队列,因此消费组内的不同消费者可以并行消费,消息只能被消费组中的一个消费者消费,不同队列中的消息可以同时被多个消费者消费。
在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后 的消息都没有被消费过,每成功消费一条消息,消费位置就加一。
在消费的时候,为了保证消息的不丢失和严格顺序,每个队列只能串行消费,无法做到并发,否则会出现消费空洞的问题。如果队列消息不要求严格顺序,则可以做到单个队列的并行消费。
Kafka消息模型
Kafka的消息模型和RocketMq完全一样,唯一区别是在Kafka中,队列这个概念的对应名称是“分区”。
利用事务消息实现分布式事务
什么是事务
对若干数据进行更新操作,为了保证这些数据的完整性和一致性,我们希望这些更新操作 要么都成功,要么都失败。
一个严格意义的事务实现,应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为 ACID 特性。
- 原子性,是指一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情况。
- 一致性,是指这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据。严格一致性可能难以实现,需要有折中,比如顺序一致性、最终一致性等等。
- 隔离性,是指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰,这个有点儿像我们打网游中的副本,我们在副本中打的怪和掉的装备,与其他副本没有任何关联也不会互相影响。
- 持久性,是指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何影响。
消息队列的事务
消息队列中的“事务”主要解决的是消息生产者和消息消费者的数据一致性问题。
比如电商系统中,用户从购物车中选择商品进行支付,订单系统生成订单结算后,发消息给购物车系统,将已下单的商品从购物车中移除。这个过程可以使用消息队列异步清理购物车,但需要保证只有订单成功创建后,才能从购物车移除对应商品,同时也需要保证订单创建后,购物车内的商品必须移除,否则将会出现数据不一致的情况。可见创建订单和移除购物车必须在一个事务内处理,要么都成功,要么回滚。上述例子可以分解为三个操作:
- 订单系统创建订单
- 发送消息通知购物车系统
- 购物车系统消费订单通知消息
其中步骤3比较容易实现,只要在消费消息成功(购物车中移除商品)后再提交消费确认即可,如果失败,由于没有提交消费确认,消息系统会自动重发。
问题关键在订单系统,创建订单和发送消息要么都成功,要么都失败,可以用消息系统提供的事务消息实现:
- 在消息队列上开启事务
- 向消息队列发送“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
- 执行本地事务,创建订单
- 提交半消息或回滚取消
- 消息系统发送消息
如果在第四步提交事务消息时失败了怎么办?对于这个问题, Kafka 和 RocketMQ 给出了2种不同的解决方案。
Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。 RocketMQ 则给出了另外一种解决方案。
在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果 Producer 也就是订单系统,在提交或者回滚事务消息时发生网络异常, RocketMQ 的 Broker 没有收到提交或者回滚的请求, Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。
在我们这个例子中,反查本地事务的逻辑也很简单,我们只要根据消息中的订单 ID ,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。 RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。
RocketMq 和 Kafka 事务对比
相同点与差异:
- 事务都是针对生产端
- RocketMq事务确保producer执行本地事务和发消息这两个操作,要么都成功,要么都失败
- RocketMq有事务反查机制,补偿producer提交事务失败的情况
- Kafka事务确保在一个事务中发送的多条消息,要么都成功,要么都失败
- Kafka事务也可以在执行过程中加入本地事务,实现RocketMq的效果,但没有事务反查补偿机制
- Kafka这种事务机制主要用于流式计算中,数据源在Kafka主题中,流计算结果也存入Kafka不同主题中,保证每个消息只计算一次(Exactly Once)
- 实现原理类似,都是基于两阶段提交,第一阶段通知、执行事务,第二阶段提交或回滚
- 都有用于记录事务的主题,RocketMq将消息暂存在这个事务主题中,Kafka直接将消息放到实际主题,通过消费端处理暂时不消费,Kafka的事务主题用于开启和提交事务的通知消息
RocketMq事务实现流程:
- Producer给消息标记为半消息并发送给Broker
- Producer执行本地事务
- Producer发送消息给Broker,提交或回滚事务
- Broker根据消息属性判断为半消息
- Broker记录半消息原始的主题队列,并放入事务专用主题的消息队列
- Broker的事务反查服务启动定时器,定时检查事务消息队列,找到需要反查的半消息
- Broker给Producer发送事务反查请求,根据返回结果确定提交、回滚、还是继续反查
- Broker把半消息标记为已处理,提交:消息复制到真正的主题队列;回滚:什么也不做。结束事务
Kafka事务实现流程:
- 引入事务协调者,负责服务端协调事务,作为Broker的一部分,通过选举保证自身可用性
- 生产者给协调者发请求开启事务,协调者在事务主题记录事务ID
- 生产者发送消息告知事务消息所属的主题和分区,协调者记录到事务日志中
- 生产者发送事务消息,和普通消息发送逻辑相同
- Broker收到消息放入对应主题分区,和普通消息处理逻辑相同
- 消费者消费消息时,将事务消息过滤出来暂存不消费
- 生产者给协调者发送事务提交或回滚请求
- 协调者进行二阶段提交,将事务设置为预提交,写入事务日志;协调者在事务相关的分区中写入事务结束的特殊消息
- 消费者根据事务结束特殊消息,处理未提交的事务消息
- 协调者记录最后一条事务日志,标记事务结束
确保消息不丢失
检测消息丢失的方法
利用消息队列的有序性来验证是否有消息丢失。在 Producer端,给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。如果没有消息丢失, Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1 。如果检测到序号不连续,那就是丢消息了。
需要注意消息在分区或队列上是有序的,在发消息的时候,必须指定分区或队列,在每个分区或队列上单独检测有序性。
如果生产者是多实例的,需要给每个消息加上生产者编号,并生成各自的消息序号,在消费端分别检测每个生产者消息的连续性。
确保消息可靠传递
生产者发消息到代理,代理发消息给消费者,都需要进行ack确认,如果发送方没有收到ack确认,则会重发消息,确保消息可靠传递。带来的问题是,消息重复。
- 生产阶段:在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这生产阶段的消息不会丢失。
- 存储阶段:如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。
- 消费阶段:不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑 之后,再发送消费确认。
存储阶段保证消息的及时持久化:
对于单个节点的 Broker ,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。
如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker ,也不会发生消息丢失。
如何处理重复消息
由于消息可靠传递会导致消息重发,那么重复消息必然存在。
MQTT 协议给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
- At most once: 至多一次。消息在传递时,最多会被送达一次。
- At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
- Exactly once :恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
常用的绝大部分消息队列提供的服务质量都是 At least once ,包括 RocketMQ 、 RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。
用幂等性解决消息重复问题
一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。
一个幂等操作的特点是, 其任意多次执行所产生的影响均与一次执行的影响相同。
- 利用数据库的唯一约束实现幂等
- 为更新的数据设置前置条件,版本号,CAS
- 记录并检查操作,Token 机制或者 GUID (全局唯一 ID )机制,需要配合分布式锁、事务实现
处理消息堆积
发送端性能优化
代码发送消息的性能上不去,可以采用批量发送、增加并发。发送端增加并发并不需要对应增加代理的分区或队列。
消费端性能优化
如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。
在设计系统的时候, 一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这样的扩容实际上是没有效果的。因为代理发送消息给消费者后,需要收到确认才能发送下一个消息,在每个分区上实际上只能支持单线程消费。
消息短时间大量积压排查
粗粒度的原因只有两种:发送变快了,消费变慢了。
通过内置的消息监控功能,确定是哪一种原因。单位时间发送消息变多,可以通过扩容消费端实例数提升总体消费能力。无法扩容则将系统降级,关闭不重要业务等。
还有一种问题是消费失败导致一条消息被反复消费的情况,需要检查消费失败的原因。
Kafka高性能设计要点
- 使用批量处理的方式来提升系统吞吐能力。生产、存储、消费都是批量。
- 基于磁盘文件高性能顺序读写的特性来设计的存储结构。
- 利用操作系统的 PageCache 来缓存数据,减少 IO 并提升读性能。
- 使用零拷贝技术(sendfile)加速消费流程。
- 数据压缩,生产端批压缩,服务端不解压,消费端解压。
1 :消息批处理 —— 减少网络通信开销 2 :磁盘顺序写 —— 减少寻道移臂开销 3 :缓存页 —— 减少磁盘 IO 开销 4 :零拷贝 —— 减少数据多次拷贝的开销
以上基本是一个快速的数据处理组件或系统的标配了,再加上池化技术、异步化技术、不可变技术、多线程并发编程、事件驱动模型、无锁化技术。
Kafka Producer 调用同步 send() 成功返回,其实没法保证消息已经成功发送到 Kafka 服务器。在 Kafka 中,这个 Send 是一个异步方法。如果要确保发送成功,你必须在提供的回调方法中去检查发送结果。或者你也可以调用 producer.send(record).get() 来同步获取发送结果。
Kafka Consumer 在消费过程中是需要维护消费位置的,Consumer每次从当前消费位置拉取一批消息,这些消息都被正常消费后,Consumer会给Coordinator发一个提交位置的请求,然后消费位置会向后移动,完成一批消费过程。
消息复制
为了保证消息系统高可用性和可靠性,一般都采用集群模式,一份消息复制到多个代理节点保存,以应对代理节点单机故障的情况。
- CAP 一致性、可用性、分区容错性只能三选二
- 主从复制模型
- 写入一半以上副本即认为复制成功
- 主本选举
RocketMq消息复制
RocketMq中复制的基本单位是Broker,也就是服务端的进程,采用主从复制。
两种复制方式:
- 异步复制,消息先发送到主节点上,就返回 “ 写入成功 ”,然后消息再异步复制到从节点上。
- 同步双写,消息同步双写到主从节点上,主从都写成功,才返回 “ 写入成功 ” 。
这两种方式本质上的区别是,写入多少个副本再返回 “ 写入成功 ” 的问题,异步复制需要的副本数是 1 ,同步双写需要的副本数是 2 。
异步复制
Broker的主从关系是通过配置固定,不支持动态切换。主Broker宕机,生产者无法生产消息,消费者自动切换到从Broker消费。没有来得及复制到从节点的消息仍然在主节点的磁盘上,主节点恢复后,这些消息可以继续消费,不会丢失。
这种主从复制方式,牺牲了可用性,换取了性能和数据一致性。可用性通过多对主从节点解决,支持把一个主题分布到多对主从节点,每对主从节点承担主题的一部分队列,某个主节点宕机,自动切换到其它主节点上发消息,解决可用性和水平扩容。类似于数据库分片的实现。
这种方式的问题:主题层面无法保证消息有序,队列层面才能保证消息有序,如果主节点宕机,那么主节点上的队列无法转移到其它主节点,否则就无法保证严格顺序。
- 可用性:主题队列分布到多对主从节点,主节点宕机,主节点上的队列转移到其它主节点
- 一致性、严格顺序:主节点宕机,主节点上未消费消息存储在磁盘,无法转移到其它主节点,如果队列转移到其它主节点,那么不保证严格顺序
- 性能:主节点写入磁盘即返回成功,从节点异步复制,性能高
新的Dledger复制方式
Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举(Raft)来动态切换主节点的。
- 可用性:主节点宕机,从节点通过投票选出新的主节点提供服务
- 一致性、严格顺序:消息至少复制半数以上副本才返回写入成功,重新选举时只选举数据和主节点一样的从节点
- 性能:选举过程中不能提供服务,最少需要3个节点,复制需要半数以上节点,性能不如异步复制快
Kafka的消息复制
Kafka中,消息复制的基本单位是分区。分区的几个副本构成小的复制集群。Broker只是这些分区副本的容器,Kafka的Broker不分主从。
分区的多个副本中也是采用一主多从的方式。
消息写入主节点后,等待足够多(可配置ISR:In-Sync-Replicas)副节点复制成功后再返回消息写入成功。
通过Zookeeper选取新节点,新节点必须在ISR节点中选取,保证数据一致性。抢占式选举,在ZooKeeper中创建/controller节点,先创建的成为主节点,其它创建失败的称为副节点。
实例
假设我们有一个 5 节点的 RocketMQ 集群,采用 Dledger 5 副本的复制方式,集群中只有一个主题, 50 个队列均匀地分布到5个Broker上。即每个节点1个Broker,每个Broker有10个队列,5个节点中一个为主本,其它为副本,最多允许宕机2台。
对应Kafka的相同配置:
5副本,10个分区,至少保持ISR集合中有三个broker: bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 5 –partitions 10 –topic test min.insync.replicas=3
RocketMq namingServer
namingServer为rocketmq的客户端(生产者和消费者)提供寻址服务,即找到主题所在Broker和队列信息。
namingServer自身是一个集群,各namingServer之间独立运行,没有数据交互,每个节点都可以独立提供服务。多节点namingServer主要为了避免单点故障。
Broker与所有namingServer实例之间存在心跳,定期上送Broker包含的主题和队列信息。namingServer还负责监控每个Broker的存活状态。
客户端在生产或消费某个主题的消息之前,会先从NameServer上查询这个主题的路由信息,然后根据路由信息获取到当前主题和队列对应的 Broker 物理地址,再连接到 Broker 节点上进行生产或消费。
如果 NameServer 检测到与 Broker 的连接中断了, NameServer 会认为这个Broker不再能提供服务。 NameServer 会立即把这个Broker从路由信息中移除掉,避免客户端连接到一个不可用的Broker上去。而客户端在与 Broker 通信失败之后,会重新去 NameServer 上拉取路由信息,然后连接到其他Broker 上继续生产或消费消息,这样就实现了自动切换失效 Broker 的功能。
优点:实现简单,集群节点平等,比较容易的水平扩展节点数量提供高可用性。路由数据读写都是内存, QPS 比较高。
缺点:每个 broker 需要与所有 nameserver 节点心跳通信,通信成本较大,无法保证强一致性。
Kafka 的协调服务 ZooKeeper
Kafka选择ZooKeeper作为分布式协调服务,用于保存元数据、监控 Broker 和分区的存活状态,并利用 ZooKeeper 来进行选举。
ZooKeeper特点:
- 分布式协调服务框架
- 集群奇数部署,选举产生Leader
- 高可用、高可靠的分布式一致性存储系统
- 数据组织类似UNIX文件树,ZNode节点
- 临时节点:创建改节点的客户端离线,临时节点就会自动消失
- Watcher订阅ZNode变化通知
ZooKeeper缺点:
- 只适合存储少量数据,数据量达到百兆时,性能和稳定性下降
- 选举过程较慢,网络抖动敏感,选举期间不可用
Kafka 客户端如何找到主题、队列对应的Broker呢?先根据主题和队列,在右边的树中找到分区对应的 state 临时节点, state 节点中保存了这个分区 Leader 的 BrokerID 。拿到这个 Leader 的 BrokerID 后,再去左侧的树中,找到BrokerID对应的临时节点,就可以获取到Broker真正的访问地址了。
Kafka 的客户端并不会去直接连接ZooKeeper,它只会和Broker进行远程通信,ZooKeeper上的元数据应该是通过 Broker 中转给每个客户端的。
Kafka 与 Flink 流式计算实现端到端 Exactly Once 语义
Kafka 与 Flink 流式计算配合使用可实现端到端 Exactly Once 语义,保证流计算数据恰好被计算一次,不重复也不丢失。端到端指数据从Kafka的A主题消费,发送给Flink的计算集群进行计算,计算结果再发给Kafka的B主题。
Flink自身的Exactly Once支持
Flink自身通过CheckPoint机制定期保存计算任务的快照。Flink首先在数据流中定期插入一个Barrier屏障将数据流分隔成多个单元,Barrier依次流过各计算节点,计算节点将节点任务状态(计算过程的临时状态数据)和数据源位置信息(Kafka主题消费位置)持久化,当一个Barrier流过所有计算节点,流出计算集群后,生成一个CheckPoint快照。
计算任务故障恢复时,Flink集群从最近的CheckPoint恢复计算任务和数据源位置,这样解决了状态与恢复位置对应的问题。
CheckPoint无法解决输入和输出两端据不丢不重的问题。回退到检查点之后,需要数据源能够重新提供检查点之后的数据,但检查点之后故障前计算的部分数据结果可能已经存储到Kafka结果主题中,再次回退到检查点则会重复计算这部分数据。此时需要Kafka的Exactly Once机制进行配合。
Kafka 的 Exactly Once 语义
Kafka 的 Exactly Once 语义是通过它的事务和生产幂等两个特性来共同实现的。事务保证一个事务内的所有消息,要么都成功投递,要么都不投递。生产幂等这个特性保证生产者给 Kafka Broker 发送消息这个过程中,消息不会重复发送,具体实现通过消息连续递增的序号进行检测。
数据源端Exactly Once语义的实现
主要由FlinkKafkaConsumer负责数据源端的Exactly-once实现。
正常运行时,定期生成CheckPoint,将当前消费的offset记录下来,CheckPoint持久化后(对应的计算结果也通过二阶段提交提交到B主题了,此时这批源数据算消费完成了),触发通知FlinkKafkaConsumer将commitedOffset提交到Kafka Brokers,更新已完成消费的数据源位置。
故障恢复时,从最近的一次CheckPoint恢复计算状态和消费位置信息,在CheckPoint之后的数据由于没有进行消费位置提交,将重新进行消费计算。
一句话总结:将 offset 提交权交给 FlinkKafkaConsumer,其内部维护 Kafka 消费及提交的状态。基于 Kafka 可重复消费能力并配合 Checkpoint 机制和状态后端存储能力,就能实现 FlinkKafkaConsumer 容错性,即 Source 端的 Exactly-once 语义。
数据输出端Exactly Once语义的实现
主要由FlinkKafkaProducer负责写出数据端的Exactly-once实现。
正常运行时,FlinkKafkaProducer接收上游算子Sink的结果数据,通过调用KafkaProducer.send方法写入本地缓冲区。
在CheckPoint快照生成阶段,需要完成持久化CheckPoint,同时提交本地缓冲区的结果数据到Kafka Broker,采用两阶段提交完成这个分布式事务。而本地缓冲区的这批结果数据必须都提交到Kafka或者都失败,因此这里用到Kafka的事务功能,两阶段提交实际完成的是持久化CheckPoint和Kafka事务提交这两个操作。
FlinkKafkaProducer实现了TwoPhaseCommitSinkFunction抽象类,这个抽象类包括CheckpointedFunction和CheckpointListener两个功能,前者对应阶段一,后者对应阶段二,阶段二由阶段一完成后进行通知实现。
阶段一:调用KafkaProducer.flush方法将缓冲区全部记录发送到Kafka Broker,但不提交。持久化CheckPoint,将将当前事务对象状态也持久化到CheckPoint中(用于阶段二提交失败,故障重启之后反射出Kafka事务进行再提交)。阶段一实际已经完成了CheckPoint的持久化。
阶段二:FlinkKafkaProducer收到CheckPoint快照完成通知后,提交上一阶段产生的Kafka事务,如果提交成功后,flush到Kafka的数据从UNCOMMITTED变为COMMITTED,此时消费端可以poll到这批结果数据了。
2PC(两阶段提交)理论的两个阶段分别对应了 FlinkKafkaProducer 的状态快照处理阶段和快照结束处理阶段,前者是通过 Kafka 的事务初始化、事务开启、flush 等操作预提交事务,后者是通过 Kafka 的 commit 操作真正执行事务提交。
故障恢复时,数据源端返回到最近的CheckPoint记录的offset位置重新消费数据,而数据输出端本地缓存区尚未flush到Kafka Broker的结果数据也会随着重新消费再生成。
一种极端情况是,如果上述二阶段提交的阶段一完成,阶段二还未来得及提交事务时发生故障,将导致阶段一生成的CheckPoint对应的计算结果数据没有提交到Kafka主题上,在这个CheckPoint上进行故障恢复时,先通过反射还原出CheckPoint中保存的Kafka事务对象,再执行commit操作,提交成功后,消费端依然可以poll到恢复后提交的结果数据。如果这个重置并提交的动作失败了,可能会造成数据丢失。
消息队列的存储
消息队列的存储原理:每个主题包含若干个分区,每个分区其实就是一个 WAL ( Write Ahead Log),写入的时候只能尾部追加,不允许修改。读取的时候,根据一个索引序号进行查询,然后连续顺序往下读。
Kafka 存储结构
- 存储单位:分区Partition
- 文件:每个分区包含一组消息文件(.log)和一组索引文件(.index/.timeindex)
- 文件命名:消息文件和索引文件一一对应,文件名就是文件中第一条消息的索引序号
- 文件大小:1G,超过则重新生成新日志文件
- 索引:key是分区中消息的逻辑偏移量(即消费位置offset),.index索引是offset -> log file position,.timeindex索引是timestamp -> offset
- 索引是否稀疏:因为索引的key是连续递增的offset,采用稀疏存储,每隔几条消息创建一条索引
- 索引检索方式:根据输入offset对索引文件名二分查找,找到最近的小于offset的索引文件,再在索引文件里二分查找到最近的小于offset的索引,定位到日志文件中对应的消息条目位置,顺序遍历消息找到offset对应的消息
- 文件访问方式:索引文件读写基于mmap方式,避免用户态拷贝,日志文件读写基于FileChannel
- 二分查找page fault问题改进:索引分为热区和冷区,分别在这两部分中二分查找
RocketMq 存储结构
- 存储单位:Broker,Broker上所有主题共享一个存储文件
- 文件:每个代理节点包含一组commitlog文件,存储消息内容;多个consumequeue文件,对应这个Broker代理的主题的队列,每个队列一个consumequeue,用于在队列上消费消息;index索引文件,存储消息key到消息存储位置的索引信息,索引文件按主题和队列分别建立
- 文件命名:消息文件采用起始物理偏移量命名;
- 文件大小:1G,超过则重新生成新日志文件
- 索引:key是消息的key,索引类似hash的数组+链表结构,500w个slot对应hash数组,2000w个index对应hash链表;msg key -> log file position
- 索引是否稀疏:否,每个消息都有一个索引条目
- 索引检索方式:根据输入的消息key,计算hash值,对500wslot求模得到slot下标,slot中存储index偏移量,定位到index位置,比较index中的hash值是否和消息key的hash值相同,不同则继续找index存储的preindex,直到找到hash相同的index,其中的日志文件物理偏移量即可定位到消息
- consumerqueue文件存储了队列对应消息的偏移信息,用于消费,消费消息时,先根据consumerqueue文件定位到消息的物理偏移,再到commitlog文件中找到消息;写入消息时,先写入commitlog文件,通过后台线程异步分发到对应的consumerqueue文件中
- 文件访问方式:读盘mmap,写盘默认mmap,可配置为FileChannel,避免pagecache锁竞争
Kafka和RocketMq存储对比
- Kafka每个分区一个文件,当分区较多时,顺序写磁盘退化为随机写磁盘,而RocketMq所有主题消息都在一个commitlog文件,顺序写磁盘性能好;但RocketMq读消息仍然为每个队列一个文件
- Kafka的稀疏索引设计可以应对更多数量的小消息,节省存储空间
其它
Q:RocketMq在创建或更改topic时,需要配置writeQueueNums和readQueueNums数,这里的读写队列有什么作用?
A:用于对队列扩容或缩容,比如由128队列缩小到64队列,先将写队列数由128减小为64,此时生产者不会再写入64-127号队列,只有消费者读,等待64-127号队列中没有消息后,将读队列数由128缩小到64,完成缩容。由64扩大到128队列时,先将读队列由64扩大为128,此时64-127号队列没有生产者写入,只有消费者空读,再将写队列由64扩大到128,完成扩容。
Q:生产者发送消息时,需要使用分片算法把消息均匀分发到队列或分区中。如果需要保证Key相同的消息的严格顺序,并且能够支持对分区数量进行水平扩容,可以选择的分片算法有哪些?
A:具备单调性的分片算法,即保证扩容后原有的消息能映射到原来或者新的分区,而不会被映射到旧分区的其它集合中。只要是满足单调性的分片算法,就可以按照“先扩容分区->将旧分区中的遗留消息消费完->同时消费所有分区”这样方式确保扩容过程中消息严格有序。可选方法有检索表法,在检索表中存储key和分区的对应关系,通过查表确定分区号;一致性hash算法。
文档信息
- 本文作者:amagcatdog
- 本文链接:https://amagcatdog.github.io/2021/03/11/message-queue/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)