本文最后更新于:February 12, 2023 pm
RocketMQ作为一款纯Java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,主要功能是异步解耦和流量削峰。RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
目录
部署的RocketMQ版本:4.9.3
依赖
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
|
配置
| rocketmq: name-server: 13.39.8.9:9876 producer: group: ToTheFor send-message-timeout: 10000
|
生产者
调用RocketMQTemplate的convertAndSend()方法进行消息的发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) { System.out.println("发送消息了"); rocketMQTemplate.convertAndSend(topic, msg); } }
|
消费者
实现RocketMQListener<?>接口,重写onMessage()方法。具体的业务逻辑在此方法中进行处理。
增加@RocketMQMessageListener注解:
- consumerGroup:消费者组。
- topic:设置Topic。
- selectorExpression:选择Tag,默认所有(*)。部分选择:tag1 || tag2。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",selectorExpression = "*") public class RocketMQConsumer implements RocketMQListener<String> {
@Override public void onMessage(String s) { System.out.println("接受到的消息为:" + s); } }
|
测试
| @Autowired private RocketMQProducer rocketMQProducer; @Token @RequestMapping("/sendM") public Object testRM() { rocketMQProducer.sendMessage("baseTopic","少时诵诗书"); return new JSONObject() .fluentPut("flag", "ok") ; }
|
启动
在上述完成后,进行启动,会发现报错如下:
| invokeSync call the addr[null] timeout
|
在经过漫长的寻找和处理后,解决办法如下,降低依赖版本:
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
|
然后再次启动,访问对应请求即可成功。
其他消息
均为生产者发送消息。
同步消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) { System.out.println("发送消息了"); rocketMQTemplate.syncSend(topic,msg); } }
|
异步消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.tothefor.console.consumer;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) { System.out.println("发送消息了"); rocketMQTemplate.asyncSend(topic, msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); }
@Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); } }
|
单向消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.tothefor.console.consumer;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) { System.out.println("发送消息了"); rocketMQTemplate.sendOneWay(topic,msg); } }
|
延时消息
必须发同步消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.tothefor.console.consumer;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
@Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) { System.out.println("发送消息了"); rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build(),10000,3); } }
|
过滤
Tag过滤
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",selectorExpression = "*") public class RocketMQConsumer implements RocketMQListener<String> {
@Override public void onMessage(String s) { System.out.println("接受到的消息为:" + s); } }
|
- selectorExpression:需要的Tag。
语法过滤
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",selectorType = SelectorType.SQL92,selectorExpression = "age > 10") public class RocketMQConsumer implements RocketMQListener<String> {
@Override public void onMessage(String s) { System.out.println("接受到的消息为:" + s); } }
|
- selectorType:指定过滤类型。默认SelectorType.TAG。
- selectorExpression:过滤条件。
消费者模式
修改消费者的消费模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",messageModel = MessageModel.BROADCASTING) public class RocketMQConsumer implements RocketMQListener<String> {
@Override public void onMessage(String s) { System.out.println("接受到的消息为:" + s); } }
|
- messageModel:设置消费者模式。默认MessageModel.CLUSTERING(集群模式)。
顺序消息
生产者的多种发送方式:
| rocketMQTemplate.syncSendOrderly(); rocketMQTemplate.asyncSendOrderly(); rocketMQTemplate.sendOneWayOrderly();
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",consumeMode = ConsumeMode.ORDERLY) public class RocketMQConsumer implements RocketMQListener<String> {
@Override public void onMessage(String s) { System.out.println("接受到的消息为:" + s); } }
|
- consumeMode:消费模式。默认并发消费模式(ConsumeMode.CONCURRENTLY)。
事务消息
正常事务
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.tothefor.console.consumer;
import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
@Component public class RocketMQProducer { @Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String msg) { System.out.println("发送消息了"); TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(msg).build(), null); System.out.println("发送状态:"+transactionSendResult.getSendStatus().name()); System.out.println("本地事务执行状态:"+transactionSendResult.getLocalTransactionState().name()); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.tothefor.console.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic") public class RocketMQConsumer implements RocketMQListener<String> {
@Override public void onMessage(String s) { System.out.println("接受到的消息为:" + s); } }
|
事务监听器
发送事务消息除了生产者和消费者以外,我们还需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.tothefor.console.consumer;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@RocketMQTransactionListener public class TransactionMsgListener implements RocketMQLocalTransactionListener {
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("正常本地事务执行..."); return RocketMQLocalTransactionState.COMMIT; }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8); System.out.println("调用回查本地事务接口:"+jsonStr); return RocketMQLocalTransactionState.COMMIT; } }
|
RocketMQLocalTransactionState有三种状态:
- COMMIT:提交。
- ROLLBACK:回滚。
- UNKNOWN:未知。
访问接口
| @Autowired private RocketMQProducer rocketMQProducer; @RequestMapping("/sendM") public Object testRM() { rocketMQProducer.sendMessage("baseTopic","少时诵诗书"); return new JSONObject() .fluentPut("flag", "ok") ; }
|
结果
| 发送消息了 正常本地事务执行... 发送状态:SEND_OK 本地事务执行状态:COMMIT_MESSAGE 接受到的消息为:少时诵诗书
|
回滚
上面是正常的一个流程,现在演示回滚场景。只需要修改事务监听器里面的返回值即可。
| @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("正常本地事务执行..."); return RocketMQLocalTransactionState.ROLLBACK; }
|
然后访问接口:
| 发送消息了 正常本地事务执行... 发送状态:SEND_OK 本地事务执行状态:ROLLBACK_MESSAGE
|
补偿
需要修改事务监听器里面的返回值。
| @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("正常本地事务执行..."); return RocketMQLocalTransactionState.UNKNOWN; }
|
输出为:
| 发送消息了 正常本地事务执行... 发送状态:SEND_OK 本地事务执行状态:UNKNOW 调用回查本地事务接口:少时诵诗书 接受到的消息为:少时诵诗书
|