您现在的位置是:亿华云 > 人工智能
关于 RocketMQ ClientID 相同引发的消息堆积的问题
亿华云2025-10-03 07:02:09【人工智能】5人已围观
简介首先,造成这个问题的 BUG RocketMQ 官方已经在 3月16号 的这个提交中修复了,这里只是探讨一下在修复之前造成问题的具体细节,更多的上下文可以参考我之前写的 《RocketMQ Consu
首先,关于造成这个问题的相同息堆 BUG RocketMQ 官方已经在 3月16号 的这个提交中修复了,这里只是问题探讨一下在修复之前造成问题的具体细节,更多的关于上下文可以参考我之前写的 《RocketMQ Consumer 启动时都干了些啥?》 ,这篇文章讲解了 RocketMQ 的相同息堆 Consumer 启动之后都做了哪些操作,对理解本次要讲解的问题 BUG 有一定的帮助。
其中讲到了:
消息堆积
重复消费自不必说,关于你 ClientID 都相同了。相同息堆本篇着重聊聊为什么会消息堆积。问题
文章中讲到,关于初始化 Consumer 时,相同息堆会初始化 Rebalance 的问题策略。你可以大致将 Rebalance 策略理解为如何将一个 Topic 下的关于 m 个 MessageQueue 分配给一个 ConsumerGroup 下的 n 个 Consumer 实例的策略,看着有些绕,相同息堆其实就长这样:
rebalance策略
而从 Consumer 初始化的问题源码中可以看出,默认情况下 Consumer 采取的 Rebalance 策略是 AllocateMessageQueueAverage()。
默认的 Rebalance 策略
默认的亿华云策略很好理解,将 MessageQueue 平均的分配给 Consumer。举个例子,假设有 8 个 MessageQueue,2 个 Consumer,那么每个 Consumer 就会被分配到 4 个 MessageQueue。
那如果分配不均匀怎么办?例如只有 7 个 MessageQueue,但是 Consumer 仍然是 2 个。此时 RocketMQ 会将多出来的部分,对已经排好序的 Consumer 再做平均分配,一个一个分发给 Consumer,直到分发完。例如刚刚说的 7 个 MessageQueue 和 2 个 ConsumerGroup 这种 case,排在第一个的 Consumer 就会被分配到 4 个 MessageQueue,而第二个会被分配到 3 个 MessageQueue。
大家可以先理解一下 AllocateMessageQueueAveragely 的实现,作为默认的 Rebalance 的策略,其实现位于这里:
默认策略的实现位置
接下来我们看看,AllocateMessageQueueAveragely 内部具体都做了哪些事情。
其核心其实就是实现的 AllocateMessageQueueStrategy 接口中的网站模板 allocate 方法。实际上,RocketMQ 对该接口总共有 5 种实现:
AllocateMachineRoomNearby AllocateMessageQueueAveragely AllocateMessageQueueAveragelyByCircle AllocateMessageQueueByConfig AllocateMessageQueueByMachineRoom AllocateMessageQueueConsistentHash其默认的 AllocateMessageQueueAveragely 只是其中的一种实现而已,那执行 allocate 它需要什么参数呢?
入参
需要以下四个:
ConsumerGroup 消费者组的名字 currentCID 当前消费者的 clientID mqAll 当前 ConsumerGroup 所消费的 Topic 下的所有的 MessageQueue cidAll 当前 ConsumerGroup 下所有消费者的 ClientID实际上是将某个 Topic 下的所有 MessageQueue 分配给属于同一个消费者的所有消费者实例,粒度是 By Topic 的。
所以到这里剩下的事情就很简单了,无非就是怎么样把这一堆 MessageQueue 分配给这一堆 Consumer。这个怎么样,就对应了 AllocateMessageQueueStrategy 的不同实现。
接下来我们就来看看 AllocateMessageQueueAveragely 是如何对 MessageQueue 进行分配的,之前讲源码我一般都会一步一步的来,结合源码跟图,但是云南idc服务商这个源码太短了,我就直接先给出来吧。
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List<MessageQueue> result = new ArrayList<MessageQueue>(); // 判断一下当前的客户端是否在 cidAll 的集合当中 if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: { } The consumerId: { } not in cidAll: { }", consumerGroup, currentCID, cidAll); return result; } // 拿到当前消费者在所有的消费者实例数组中的位置 int index = cidAll.indexOf(currentCID); // 用 messageQueue 的数量 对 消费者实例的数量取余数, 这个实际上就把不够均匀分的 MessageQueue 的数量算出来了 // 举个例子, 12 个 MessageQueue, 有 5 个 Consumer, 12 % 5 = 2 int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }其实前半部分都是些常规的 check,可以忽略不看,从这里:
int index = cidAll.indexOf(currentCID);开始,才是核心逻辑。为了避免逻辑混乱,还是假设有 12 个 MessageQueue,5 个 Consumer,同时假设 index=0 。
那么 mod 的值就为 12 % 5 = 2 了。
而 averageSize 的值,稍微有点绕。如果 MessageQueue 的数量比消费者的数量还少,那么就为 1 ;否则,就走这一堆逻辑(mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())。我们 index 是 0,而 mod 是 2,index < mod 则是成立的,那么最终 averageSize 的值就为 12 / 5 + 1 = 3。
接下来是 startIndex,由于这个三元运算符的条件是成立的,所以其值为 0 * 3 ,就为 0。
看了一大堆逻辑,是不是已经晕了?直接举实例:
12 个 Message Queue
5 个 Consumer 实例
按照上面的分法:
排在第 1 的消费者 分到 3 个
排在第 2 的消费者 分到 3 个
排在第 3 的消费者 分到 2 个
排在第 4 的消费者 分到 2 个
排在第 5 的消费者 分到 2 个
具体分配流程
所以,你可以大致认为:
先“均分”,12 / 5 取整为 2。然后“均分”完之后还剩下 2 个,那么就从上往下,挨个再分配,这样第 1、第 2 个消费者就会被多分到 1 个。
所以如果有 13 个 MessageQueue,5 个 Consumer,那么第 1、第 2、第 3 就会被分配 3 个。
但并不准确,因为分配的 MessageQueue 是一次性的,例如那 3 个 MessageQueue 是一次性获取的,不会先给 2 个,再给 1 个。
而我们开篇提到的 Consumer 的 ClientID 相同,会造成什么?
当然是 index 的值相同,进而造成 mod、averageSize、startIndex、range 全部相同。那么最后 result.add(mqAll.get((startIndex + i) % mqAll.size())); 时,本来不同的 Consumer,会取到相同的 MessageQueue(举个例子,Consumer 1 和 Consumer 2 都取到了前 3 个 MessageQueue),从而造成有些 MessageQueue(如果有的话) 没有 Consumer 对其消费,而没有被消费,消息也在不停的投递进来,就会造成消息的大量堆积。
当然,现在的新版本从代码上看已经修复这个问题了,这个只是对之前的版本的原因做一个探索。
很赞哦!(235)
相关文章
- 有了戴尔Precision 5570工作站 音效师可随时随地开展工作
- 4、参加域名拍卖会
- 二、如何选择合适的域名
- 为什么说注册域名注意细节?哪些我们不能忽视?
- 如何改造数据中心以提高效率和可持续性
- 众所周知,com域名拥有最大的流通市场和流通历史。最好选择com域名,特别是在购买域名时处理域名。其次可以是cn域名、net域名、org域名等主流域名,现在比较流行的王域名和顶级域名,都是值得注册和投资的。
- CNAME:对应解析的记录值为域名地址
- 4、企业无形资产:通用网站已成为企业网络知识产权的重要组成部分,属于企业的无形资产,也有助于提升企业的品牌形象和技术领先形象。它是企业品牌资产不可或缺的一部分。
- 为什么有了HTTP,还需要WebSocket协议?
- 在数以亿计的网站中,我们应该抓住每一个可能带来宣传的机会,域名可以带有企业的名字,一般可以使用汉语拼音或者英语单词或者是相关缩写的形式,只要用户记住了你企业的名字,就能很容易的打出你的网站域名,同样的,记住了网站域名也能很快的记住你公司的名字。
热门文章
站长推荐
抢先看!华为伙伴暨开发者大会2022计算产业精彩不断、干货满满
为什么现在中文域名觉得好?使用中文域名有什么好处?
a、变更前的公司证件扫描件(代码证或者营业执照)及联系人身份证复印件、变更后的公司证件扫描件(代码证或者营业执照)及新的联系人身份证复印件;身份证复印件需本人签名,公司证件复印件需加盖公章。
一下域名,看有没有显示出你所解析的IP,如果有,就说明解析是生效的;如果没有,就说明解析是不生效的。
算力革命来袭,异构计算带给我们的三大思考
在此期间,他们每天仍在这里卖大米,在理财方面个人感情有待提高。因为现在是收米的最佳时机。
2. 不要花大价钱买域名,新手鉴别能力不足,容易投资失误。
如果你的潜在终端必须是这个米(域名),那么潜在终端并不多,也没有硬通货,那么你的域名应该在终端有兴趣购买时出售。否则,你可能得自己留着吃。