您现在的位置是:亿华云 > IT科技

消息服务:项目整合RocketMQ

亿华云2025-10-03 03:17:51【IT科技】7人已围观

简介在《​在《​​SpringCloud Alibaba实战​​》专栏前面的文章中,我们实现了用户微服务、商品微服务和订单微服务之间的远程调用,并且实现了服务调用的负载均衡。也基于阿里开源的Sentine

在《​

在《​​SpringCloud Alibaba实战​​》专栏前面的消息项目文章中,我们实现了用户微服务、服务商品微服务和订单微服务之间的整合远程调用,并且实现了服务调用的消息项目负载均衡。也基于阿里开源的服务Sentinel实现了服务的限流与容错,并详细介绍了Sentinel的整合核心技术与配置规则。

简单介绍了服务网关,消息项目并对SpringCloud Gateway的服务核心架构进行了简要说明,也在项目中整合了SpringCloud Gateway网关实现了通过网关访问后端微服务。整合

同时,消息项目也基于SpringCloud Gateway整合Sentinel实现了网关的服务限流功能,详细介绍了SpringCloud Gateway网关的整合核心技术。在链路追踪章节,消息项目我们开始简单介绍了分布式链路追踪技术与解决方案,服务随后在项目中整合Sleuth实现了链路追踪,整合并使用Sleuth整合ZipKin实现了分布式链路追踪的可视化 。

在消息服务章节,我们介绍了MQ的使用场景,引入MQ后的注意事项以及MQ的选型对比。接下来,我们就在项目中整合RocketMQ。服务器租用

本章总览​

RocketMQ环境准备​

RocketMQ是阿里开源的消息中间件,目前是Apache下的顶级项目。正式在项目中接入RocketMQ之前,我们需要搭建RocketMQ的环境。这里呢,我把搭建RocketMQ的基础环境分为两个部分:搭建RocketMQ环境和搭建RocketMQ控制台。

「注意:冰河这里都是先下载RocketMQ的源码和RocketMQ控制台的源码,然后对源码进行编译后,再搭建的。目的也是让小伙伴们能够跟着冰河实现手动编译RocketMQ的源码,另外,编译RocketMQ源码和控制台源码需要JDK1.8+Maven。」

源码编译安装RocketMQ

(1)到链接https://github.com/apache/rocketmq/releases/tag/rocketmq-all-4.9.3下载RocketMQ 4.9.3版本的源码。下载并解压后的源码如下所示。

(2)打开cmd命令行,进入RocketMQ的解压目录,我这里是网站模板E:\Application\RocketMQ\rocketmq-rocketmq-all-4.9.3目录,然后在cmd命令行输入如下命令开始编译打包。

mvn clean install -Dmaven.test.skip=true -Prelease-all

编译过程如下所示。

编译打包成功后,如下图所示。

(3)编译成功后,会在RocketMQ解压目录下的distribution目录下的target目录下生成RocketMQ的安装包,在我电脑上的目录就是:E:\Application\RocketMQ\rocketmq-rocketmq-all-4.9.3\distribution\target。如下所示。

这样,我们就自己下载RocketMQ的源码,并打包成功了。

注意:这里,为了方便,我还是将RocketMQ部署到我本机Windows操作系统上,小伙伴们也可以将之前的Nacos、Sentinel和这次的RocketMQ都部署在Linux操作系统上,部署方式几乎与在Windows操作系统一样,这里,冰河就不再赘述了。」

(4)将编译出的安装包,解压到电脑的某个目录下,云服务器例如我解压后的目录为:E:\Application\microservices\RocketMQ\rocketmq-4.9.3。

(5)在RocketMQ的解压目录下的conf目录下修改broker.conf文件,修改后的文件内容如下所示。

brokerClusterName = DefaultCluster

brokerName = broker-a

brokerId = 0

deleteWhen = 04

fileReservedTime = 48

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

# 自动创建Topic

autoCreateTopicEnable=true

# nameServ地址

namesrvAddr=127.0.0.1:9876

# 存储路径

storePathRootDir=E:/RocketMQ/data/rocketmq/dataDir

# commitLog路径

storePathCommitLog=E:/RocketMQ/data/rocketmq/dataDir/commitlog

# 消息队列存储路径

storePathConsumeQueue=E:/RocketMQ/data/rocketmq/dataDir/consumequeue

# 消息索引存储路径

storePathIndex=E:/RocketMQ/data/rocketmq/dataDir/index

# checkpoint文件路径

storeCheckpoint=E:/RocketMQ/data/rocketmq/dataDir/checkpoint

# abort文件存储路径

abortFile=E:/RocketMQ/data/rocketmq/dataDir/abort

小伙伴们可以根据自己的实际情况,自行修改上述文件中配置的目录地址。

(6)非常重要的一步,在启动RocketMQ之前,需要配置下ROCKETMQ_HOME环境变量,否则在启动RocketMQ的时候,会提示如下错误信息。

E:\Application\microservices\RocketMQ\rocketmq-4.9.3\bin>mqnamesrv.cmd

Please set the ROCKETMQ_HOME variable in your environment!

「提示:设置ROCKETMQ_HOME环境变量。」

接下来,就在系统环境变量中,设置下ROCKETMQ_HOME的环境变量,如下所示。

(7)配置完RocketMQ的环境变量后,打开cmd命令行,进入RocketMQ的bin目录,例如,我电脑的目录是:E:\Application\microservices\RocketMQ\rocketmq-4.9.3\bin。执行​​mqnamesrv.cmd​​命令启动NameServer,如下所示。

打印出如下信息,说明RocketMQ的NameServer启动成功了。

The Name Server boot success. serializeType=JSON

(8)重新打开一个cmd命令行,进入RocketMQ的bin目录,输入​​mqbroker.cmd -n localhost:9876​​命令启动RocketMQ的Broker服务,如下所示。

打印出如下信息,说明RocketMQ的Broker服务启动成功了。

boot success. serializeType=JSON and name server is localhost:9876

测试RocketMQ环境

RocketMQ内置了大量的测试案例,并且这些测试案例可以通过RocketMQ的bin目录下的tools.cmd命令进行测试。接下来,我们就使用RocketMQ自带的tools.cmd命令测试RocketMQ的环境。

(1)启动生产者程序向RocketMQ发送消息。

重新打开cmd命令行,进入RocketMQ的bin目录,在命令行输入如下命令调用RocketMQ自带的生产者程序向RocketMQ发送消息。

set NAMESRV_ADDR=localhost:9876

tools.cmd org.apache.rocketmq.example.quickstart.Producer

可以看到,执行完上述两条命令后,生产者程序开始向RocketMQ发送消息。

(2)启动消费者程序消费RocketMQ中的消息。

重新打开cmd命令行,进入RocketMQ的bin目录,在命令行输入如下命令调用RocketMQ自带的消费者程序消费RocketMQ中的消息。

set NAMESRV_ADDR=localhost:9876

tools.cmd org.apache.rocketmq.example.quickstart.Consumer

可以看到,执行完上述两条命令后,消费者程序开始消费RocketMQ中的消息。

说明我们使用源码编译搭建RocketMQ环境成功了。

源码编译RocketMQ控制台

这里需要注意的是:RocketMQ控制台本质上是一个SpringBoot程序,启动后默认监听的端口是8080。RocketMQ的新版控制台已经从RocketMQ的rocketmq-externals项目中分离出来了。也就是说,新版的RocketMQ控制台已经从https://github.com/apache/rocketmq-externals链接所示的项目中分离出来,新版控制台的链接地址为:https://github.com/apache/rocketmq-dashboard。

(1)从链接https://github.com/apache/rocketmq-dashboard下载新版的RocketMQ控制台源码。下载后解压。

(2)进入到RocketMQ控制台源码解压目录的src/main/resources目录下,编辑application.yml文件,修改​​namesrvAddrs​​地址,去掉多余的namesrvAddrs地址。

application.yml文件中原来的配置如下所示。

rocketmq:

config:

# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876

# configure multiple namesrv addresses to manage multiple different clusters

namesrvAddrs:

- 127.0.0.1:9876

- 127.0.0.2:9876

将127.0.0.2:9876删除或者注释掉,如下所示。

rocketmq:

config:

# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876

# configure multiple namesrv addresses to manage multiple different clusters

namesrvAddrs:

- 127.0.0.1:9876

# - 127.0.0.2:9876

RocketMQ控制台启动时默认监听的端口是8080,由于我们项目中订单微服务监听的端口也是8080,所以,将RocketMQ控制台监听的端口修改为10003,修改前的配置如下所示。

server:

port: 8080

修改后的配置如下所示。

server:

port: 10003

(3)修改完application.yml文件后,打开cmd命令行,进入RocketMQ控制台源码的根目录,输入如下Maven命令开始编译RocketMQ控制台的源码。

mvn clean install -Dmaven.test.skip=true

编译过程如下所示。

(4)编译完成后,会在RocketMQ控制台源码的根目录下生成target目录,如下所示。

进入target目录后,可以看到生成了rocketmq-dashboard-1.0.1-SNAPSHOT.jar文件,如下所示。

这个jar文件就是RocketMQ控制台的运行文件。

(5)重新打开cmd命令行,进入rocketmq-dashboard-1.0.1-SNAPSHOT.jar文件所在的命令,在命令行直接输入如下命令启动RocketMQ控制台程序。

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

验证RocketMQ控制台

在浏览器中输入​​http://localhost:10003​​后,出现如下画面说明RocketMQ启动成功。

界面默认是英文,我们也可以点击右上角的​​changeLanguage​​切换语言,切换成中文显示,如下所示。

选择主题菜单想后如下所示。

可以看到目前RocketMQ中存在一个名称为TopicTest的主题,点击TopicTest主题的状态按钮,如下所示。

会显示TopicTest主题的消息队列信息,如下所示。

可以看到,正确显示出了TopicTest主题的消息队列信息,说明RocketMQ控制台启动成功了。

编码测试RocketMQ​

我们使用RocketMQ自带的生产者和消费者程序实现了消息的生成与消费,为了让小伙伴们能够更加直观的感受到消息中间件在项目中的作用,接下来,我们自己编码测试下RocketMQ。

导入RocketMQ依赖

在用户微服务shop-user的pom.xml中,添加RocketMQ相关的依赖,如下所示。

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

org.apache.rocketmq

rocketmq-client

4.5.2

</dependency>

编写生产者代码

在用户微服务的sec/test/java目录下新建​​io.binghe.shop.rocketmq.test​​包,在包下创建RocketMQProducer类,作为RocketMQ的生产者,代码如下所示。

/

**

* @author binghe

* @version 1.0.0

* @description RocketMQ生产者

*/

public class RocketMQProducer {

public static void main(String[] args) throws Exception {

//创建消息生产者

DefaultMQProducer producer = new DefaultMQProducer("bingheProducerGroup");

//设置NameServer地址

producer.setNamesrvAddr("127.0.0.1:9876");

//启动生产者

producer.start();

//构建消息对象

Message message = new Message("bingheTopic", "bingheTag", "Hello RocketMQ".getBytes());

System.out.println("生产者发出的消息为:" + JSONObject.toJSONString(message));

//发送消息并接收结果

SendResult sendResult = producer.send(message);

//打印结果信息

System.out.println("生产者收到的发送结果信息为:" + JSONObject.toJSONString(sendResult));

//关闭生产者

producer.shutdown();

}

}

生产者的代码比较简单,这里就不再赘述了。

编写消费者代码

在​​io.binghe.shop.rocketmq.test​​包下新建RocketMQConsumer类,作为RocketMQ的消费者,代码如下所示。

/

**

* @author binghe

* @version 1.0.0

* @description RocketMQ消费者

*/

public class RocketMQConsumer {

public static void main(String[] args) throws Exception {

try{

//创建消息消费者

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bingheConsumerGroup");

//设置NameServer地址

consumer.setNamesrvAddr("127.0.0.1:9876");

//订阅bingheTopic主题

consumer.subscribe("bingheTopic", "*");

//设置消息监听,当收到消息时RocketMQ会回调消息监听

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(Listlist,

ConsumeConcurrentlyContext consumeConcurrentlyContext) {

//打印消息消费者收到的RocketMQ消息

System.out.println("消费者收到的消息为:" + list);

//返回消息消费成功的标识

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

//启动消费者

consumer.start();

System.out.println("消费者启动成功");

}catch (Exception e){

e.printStackTrace();

}

}

}测试消息的生产与消费

(1)为了便于观察,这里我们先启动消费者程序RocketMQConsumer,启动RocketMQConsumer后会在IDEA的控制台打印如下信息。

消费者启动成功

说明消费者启动成功了。

(2)运行生产者程序RocketMQProducer,运行后RocketMQProducer程序控制台会输出如下信息。

生产者发出的消息为:{ "body":"SGVsbG8gUm9ja2V0TVE=","delayTimeLevel":0,"flag":0,"properties":{ "WAIT":"true","TAGS":"bingheTag"},"tags":"bingheTag","topic":"bingheTopic","waitStoreMsgOK":true}

生产者收到的发送结果信息为:{ "messageQueue":{ "brokerName":"DESKTOP-PSKC7T1","queueId":1,"topic":"bingheTopic"},"msgId":"C0A8006F538418B4AAC25B9EDDAC0000","offsetMsgId":"C0A8B80100002A9F0000000000036B16","queueOffset":2,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

说明生产者程序RocketMQProducer成功将消息发送到RocketMQ。

(3)接下来,再看下消费者程序RocketMQConsumer的控制台,如下所示。

消费者收到的消息为:[MessageExt [queueId=1, storeSize=206, queueOffset=2, sysFlag=0, bornTimestamp=1652871538093, bornHost=/192.168.184.1:52915, storeTimestamp=1652871538099, storeHost=/192.168.184.1:10911, msgId=C0A8B80100002A9F0000000000036B16, commitLogOffset=224022, bodyCRC=1774740973, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{ topic=bingheTopic, flag=0, properties={ MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1652871538103, UNIQ_KEY=C0A8006F538418B4AAC25B9EDDAC0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=bingheTag}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId=null}]]

说明生成者发送到RocketMQ的消息,被消费者成功消费到了。

项目整合RocketMQ​

我们在项目中模拟一个用户成功下单后,为用户发送通知,通知用户下单成功的逻辑,具体的流程就是下单成功后将订单的信息发送到RocketMQ,然后用户微服务订阅RocketMQ的消息,接收到消息后进行打印。

用户微服务整合RocketMQ

(1)编码测试RocketMQ时,导入了RocketMQ的依赖,这里就不用再次导入了。

(2)在用户微服务shop-user的application.yml文件中添加如下RocketMQ的配置。

rocketmq:

name-server: 127.0.0.1:9876

(3)在用户微服务shop-user中创建​​io.binghe.shop.user.rocketmq​​包,在包下创建RocketConsumeListener,实现org.apache.rocketmq.spring.core.RocketMQListener接口,具体代码如下所示。

/

**

* @author binghe

* @version 1.0.0

* @description 监听消费

*/

@Slf4j

@Component

@RocketMQMessageListener(consumerGroup = "user-group", topic = "order-topic")

public class RocketConsumeListener implements RocketMQListener{

@Override

public void onMessage(Order order) {

log.info("用户微服务收到了订单信息:{ }", JSONObject.toJSONString(order));

}

}

其中,RocketConsumeListener类上的@RocketMQMessageListener注解,表示当前类是一个RocketMQ的消费者,在@RocketMQMessageListener注解中配置了消费者组为user-group,主题为order-topic。

至此,用户微服务整合RocketMQ完毕。

订单微服务整合RocketMQ

(1)在订单微服务shop-order的pom.xml文件中添加RocketMQ的依赖,如下所示。

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

org.apache.rocketmq

rocketmq-client

4.5.2

</dependency>

(2)在订单微服务shop-order的application.yml文件中添加如下配置。

rocketmq:

name-server: 127.0.0.1:9876

producer:

group: order-group

(3)将​​io.binghe.shop.order.service.impl.OrderServiceV6Impl​​​类,复制一份成​​io.binghe.shop.order.service.impl.OrderServiceV7Impl​​​类,接下来,在​​io.binghe.shop.order.service.impl.OrderServiceV7Impl​​类中操作。

将​​io.binghe.shop.order.service.impl.OrderServiceV7Impl​​类上的@Service注解中的名称修改为orderServiceV7,如下所示。

@Slf4j

@Service("orderServiceV7")

public class OrderServiceV7Impl implements OrderService {

//省略具体代码

}

(4)在​​io.binghe.shop.order.service.impl.OrderServiceV7Impl​​类中,注入RocketMQTemplate对象,如下所示。

@Autowired

private RocketMQTemplate rocketMQTemplate;

(5)在​​io.binghe.shop.order.service.impl.OrderServiceV7Impl#saveOrder()​​方法中,提交订单成功后将订单信息写入RocketMQ,如下所示。

@Override

@Transactional(rollbackFor = Exception.class)

public void saveOrder(OrderParams orderParams) {

//省略上面所有代码

rocketMQTemplate.convertAndSend("order-topic", order);

}

(6)在​​io.binghe.shop.order.controller.OrderController​​中,将注入的OrderService的名称修改成orderServiceV7,如下所示。

@Autowired

@Qualifier(value = "orderServiceV7")

private OrderService orderService;

「注意:订单微服务shop-order中,修改后的代码见源码工程,冰河在这里不再粘贴完整的源代码。」

测试项目整合的RocketMQ

(1)分别启动Nacos,Sentinel,ZipKin和RocketMQ。

(2)分别启动用户微服务、商品微服务、订单微服务和网关服务。

(3)在浏览器中输入​​localhost:10001/server-order/order/submit_order?userId=1001&productId=1001&count=1​​,如下所示。

(4)查看用户微服务shop-user的控制台,发现会输出订单的信息,如下所示。

2022-05-18 20:37:26.440 INFO [server-user,,,] 18064 --- [MessageThread_1] i.b.s.u.rocketmq.RocketConsumeListener : 用户微服务收到了订单信息:{ "address":"北京","id":13176882400989185,"phone":"13212345678","totalPrice":2399.00,"userId":1001,"username":"binghe"}

说明项目中成功集成了RocketMQ。

很赞哦!(88)