您现在的位置是:亿华云 > 域名
一文带你理解 RocketMQ 广播模式实现机制
亿华云2025-10-04 03:08:03【域名】6人已围观
简介大家好,我是君哥。今天聊聊 RocketMQ 的广播消息实现机制。RocketMQ 有两种消费模式,集群模式和广播模式。集群模式是指 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消
大家好,文带我是理解君哥。今天聊聊 RocketMQ 的广播广播消息实现机制。
RocketMQ 有两种消费模式,模式集群模式和广播模式。实现
集群模式是机制指 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,文带Producer 向 TopicTest 这个 Topic 并发写入 3 条新消息,理解分别被分配到了 MessageQueue1~MessageQueue3 这 3 个队列,广播然后 Group 中的模式三个 Consumer 分别消费了一条消息:
广播模式是 RocketMQ 中的消息会被消费组中的每个消费者都消费一次,如下图:
使用 RocketMQ 的实现广播模式时,需要在消费端进行定义,机制下面是文带一段官方示例:
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}从代码中可以看到,在定义 Consumer 时,理解通过 messageModel 这个属性指定消费模式,广播这里指定为 BROADCASTING,也就启动了广播模式的消费者。
1、消费者启动
以 RocketMQ 推模式为例,看一下消费者调用关系类图:
DefaultMQPushConsumer 作为启动入口类,它的 start 方法调用了 DefaultMQPushConsumerImpl 类的源码下载 start 方法,下面重点看一下这个方法。
(1)拷贝订阅关系start 方法中调用了 copySubscription 方法,代码如下:
private void copySubscription() throws MQClientException {
try {
//拷贝订阅关系
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}这里的代码有一点需要注意:集群模式会创建一个重试 Topic 的订阅关系,而广播模式是不会创建这个订阅关系的。也就是说广播模式不考虑重试。
(2)初始化偏移量下面是初始化 offset 的代码:
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}从上面的代码可以看到,广播模式使用了 LocalFileOffsetStore,也就是说偏移量保存在客户端本地,除了在内存中会保存,在本地文件中也会保存。
2、消息拉取
ConsumeMessageService 是真正拉取消息的地方,消费者初始化时会初始化 ConsumeMessageService,并且这里会区分并发消息还是顺序消息。
(1)顺序消息在集群模式下,需要获取到 processQueue 的锁才会拉取消息,而在广播模式下,不用获取锁,直接就可以拉取消息。高防服务器判断逻辑如下:
//ConsumeMessageOrderlyService.ConsumeRequest
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
}
}这里有个疑问,对于顺序消息,获取锁是必须的,这样才能保证一个 processQueue 只能由一个线程进行处理,从而保证消费的顺序性。那对于广播模式,为什么不用获取 processQueue 的锁呢?难道广播模式不支持顺序消息?
(2)并发消息对于并发消息,广播模式不同的是,对消费结果的处理。集群模式消费失败后需要把消息发送回 Broker 等待再次被拉取,而广播模式则不需要重试。代码如下:
//ConsumeMessageConcurrentlyService.rocessConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, { }", msg.toString());
}
break;
case CLUSTERING:
List
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}这再次说明,广播模式是不支持消息重试的。
3、重平衡
在消费者启动过程中,会调用 RebalanceService 的 start 方法,进行重平衡。从重平衡的代码中可以看到,广播模式消费者会消费所有 MessageQueue,而集群模式下会根据负载均衡策略选择其中几个 MessageQueue。代码如下:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
//省略部分逻辑
} else {
}
break;
}
case CLUSTERING: {
Set
List
//省略部分逻辑
if (mqSet != null && cidAll != null) {
//省略部分逻辑
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
Set
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//省略部分逻辑
}
break;
}
default:
break;
}
}上面 updateProcessQueueTableInRebalance 这个方法调用前,要获取到需要消费的服务器租用 MessageQueue 集合。广播模式下,直接取了订阅的 Topic 下的所有集合元素,而集群模式下,则需要通过负责均衡获取当前消费者自己要消费的 MessageQueue 集合。
4、总结
本文主要讲解了 RocketMQ 广播消息的实现机制,理解广播消息,要把握下面几点:
1.偏移量保存在消费者本地内存和文件中。
2.广播消息不支持重试。
3.从源码上看,广播模式并不能支持顺序消息。
4.广播模式消费者订阅了 Topic 下的所有 MessageQueue,不会重平衡。
很赞哦!(8356)
热门文章
站长推荐
因为域名解析需要同步到DNS根服务器,而DNS根服务器会不定时刷,只有DNS根服务器刷新后域名才能正常访问,新增解析一般会在10分钟左右生效,最长不会超过24小时,修改解析时间会稍微延长。
新手网站转让要如何做?网站转让域名有什么操作?
现在AI域名在域名市场咋样?域名AI将会赶超.com吗?
新手注册的域名需要绑定使用吗?该如何操作?
众所周知,com域名拥有最大的流通市场和流通历史。最好选择com域名,特别是在购买域名时处理域名。其次可以是cn域名、net域名、org域名等主流域名,现在比较流行的王域名和顶级域名,都是值得注册和投资的。
wang域名为何遭域名投资者疯狂抢注?什么缘由?
新手怎么获得网络域名?有什么方面需要知道?
企业应该需要怎样的域名?新企业选域名取决什么方面?