环球体育HQBET下载官网


 

RocketMQ中各类重复消费的原理浅析

日期:2024-01-22 04:02:19 浏览次数:112 分类:公司动态 来源:环球体育HQBET下载官网

  时代的到来,我国的各个产业每天都在产生不可估计的数据,以及对数据的各式各样的需求,消息中间件在处理数据、消费数据的过程中慢慢的受到重视。在高并发、微服务、分布式的场景下,如何合理地利用消息中间件,如何保证MQ消费消息的幂等性?所谓知其然,才能知其所以然,本文将通过RocketMQ作为例子,来扒一扒什么情况下会导致重复消费。

  1.生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个Queue,然后跟这个Queue所在的机器建立连接,把消息发送到这个Queue上。

  当异常情况出现时,如消息发送超时或者消息消费超时,RocketMQ为保证消息发送成功,会启动重试机制,选择另一台机器的Queue重发。现在假设有这样一种情况,消费者实际正确接收到了消息,只是由于网络波动导致响应超时了,那就会出现消息重复发送,导致消费者重复消费的情况出现。

  那除此之外,还有无另外的情况会导致消息重复消费的情况呢?总结起来一共有如下几种情况。

  传入的是msgs集合。上述原因一中消息处理之后,不管成功失败,都会对结果做处理。而集合中的任意一个失败,都可能会导致status被设置为RECONSUME_LATER。在对结果处理是,判断到RECONSUME_LATER时,就会对msgs重新遍历并发送消息,重新消费,因此导致之前成功处理的消息都会被重复消费。不过好在msgs消息的数量默认情况下是1。

  producer发送消息到broker,Rocketmq会将消息的内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数传入的起始offset来获取需要消费的消息,再从commitLog中获取具体的消息内容返回给consumer。消费成功之后,消费者提交offset,来记录这个queue消费到哪个位置了。

  RocketMq设计的时候,消费完消息,并不是同步提交offset,而是将offset保存到内存中,通过一个定时任务(默认是5S一次),以网络请求的方式将offset提交给broker。如果最新的offset还没提交,此时服务器宕机了,那么重启之后,就会从broker中读取到之前的提交的offset,并从此处开始消费,此时就会出现重复消费的情况了。

  与消费者提交offset同理,Broker为避免数据丢失,会将offset持久化到磁盘中。同样的也是通过一个默认5S的定时任务来处理持久化操作。所以offset的完整过程就如下图。当broker宕机时,就会导致offset丢失,此时如果消费者重新拉取消费进度,就会比实际消费的进度要低,导致重复消费。

  为保证RocketMQ服务的高可用,一般项目中都会启用主从备份的模式,当主节点挂掉之后,从节点就会升级为主节点对外提供服务。因此就有必要进行主从同步,保证数据的一致性。默认情况下每隔10S,从节点会向主节点请求,同步元数据,包括消费进度。此时如果主节点宕机了,从节点就无法获取到10S之内的消费进度,自然也就会导致重复消费。

  RocketMQ的消费者有两种模式,集群消费模式和广播消费模式,绝大多数场景采用的都是集群消费模式。前面提到的消费进度就是在集群消费模式下才会存在。集群消费模式中有一个消费组的概念。一个消费组可以有多个消费者,不同消费组之间消费消息互不干扰,而同一消费组的消费者按照一定的算法分配消息队列进行消息消费,保证一个消息只能被一个消费组消费一次。当消费组中的消费组增加或者减少时就会触发重平衡。如图,原先消费组中有两个消费者,平均消费4个队列,每个消费组2个队列;当加入了一个新的消费者时,为了能够更好的保证新的花钱的人可以消费消息,就会进行重平衡,重新分配消息队列。

  假设在重平衡发生时,此时消费者2还在正常消费Queue4,当消费者3加入,重平衡完成时,此时消费者2判断到Queue4已经不属于自身个人消费了,就会将Queue4设置为dropped,消费完成时,发现队列是dropped状态,那么消费者2的消费进度offset就不会被提交。成功消费了消息,但是消费进度却没有被提交,于是当消费者3开始消费消息时,就会从服务端拉取到之前的消费进度,造成队列4的消息被重复消费。

  RocketMQ中有一个机制会定时清理长时间正在消费的消息,默认是15分钟执行一次清理任务。之所以这么做,是有原因的。我们说过,消息被消费之后,就会提交offset。当一个线程消费了所有消息时,就会把消息从集合中移除,提交的消息进度offset就是msg5的offset+1。

  假设,现在是两个线消费完成,之后提交offset,但是此时线还在处理前两条消息,因此为了能够更好的保证消费消息的不丢失,移除之后发现集合中还有剩余消息,就会把msg1的offset返回提交上去。而一旦集合最前面的消息长时间处理,就会导致这个消费进度一直在最前面。此时如果服务器重启,就会导致很多消费过的消息都会被重复消费。因此引入了清理长时间消费的机制。

  引入清理长时间消费的消息机制后,一经发现某个消息已经处理超过15分钟了,就会将消息移除,保障后续消息消费进度的正常提交,之后会隔一定的时间再次消费这个被移除的消息。但是,这一条消息虽然被移除了,却并不是没有消费过,因此再次消费就会导致重复消费的问题出现。

  RocketMq的官方文档中对消息传递有这样的解释:RocketMq确保所有消息至少被传递一次,在大多数情况下,消息不会重复。可见RocketMq为了能够更好的保证消息的不丢失,牺牲了消息投递的重复率。因此我们在使用RokcetMq时需要合理使用它的特点,设计合理的幂等技术方案来解决重复消费的问题。

  文章出处:【微信号:5G通信,微信公众号:5G通信】欢迎添加关注!文章转载请注明出处。

  单机实例,在此之前需要已配置JAVA环境。下载程序包直接用一般就下载已经编译好的二进制文件就好了,下载好以后&

  场景下的丰富实践,我们将互联网业务对消息的需求来做抽象,形成了一套能满足大多数业务场景的标准模型,慢慢地发展成与厂商无关、平台无关的分布式消息及流处理领域的应用开发

  主要由 Producer、Broker、Consumer 三部分所组成,其中

  MQTT协议架构模型 /

  的扩展项目 /

  的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,

  消息的原因 /

  的主从复制 /

  是基于主题(Topic)的发布/订阅模式,而RabbitMQ则是基于队列(Queue)的消息代理系统。 语言支持

  5.0 是消息事件流一体的实时数据处理平台,是业务消息领域的事实标准,很多网络公司在业务消息场景会使用

  在业务消息场景的优势有哪些呢? /

  业务场景中,在实际生产场景中,用户通常会选择消息 ID 或者特定的业务 Key(例如学号,订单号)来查询和定位特定的一批消息,进而定位分布式系统中的复杂问题。传统方案

  构建的索引结构 /

  者之间的高效可靠通信。它支持同步和异步消息传递模式,能轻松实现灵活和响应迅速的通信方式。

  【RISC-V开放架构设计之道阅读体验】先睹为快-学习RISC-V的案头好书