RocketMQ-(三)SpringBoot整合RocketMQ

本文最后更新于:February 12, 2023 pm

RocketMQ作为一款纯Java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,主要功能是异步解耦和流量削峰。RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

目录

部署的RocketMQ版本:4.9.3

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version> <!-- 2.2.2 对应MQ4.9.3版本 -->
</dependency>

配置

1
2
3
4
5
rocketmq:
name-server: 13.39.8.9:9876 # 服务地址
producer:
group: ToTheFor # 生产者组
send-message-timeout: 10000 # 发送消息超时时间

生产者

调用RocketMQTemplate的convertAndSend()方法进行消息的发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 11:14
* @Title
* @Description
*/
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

// 发送消息的实例,如果需要增加Tag,只需要拼在topic后面即可,如:topic:tag
public void sendMessage(String topic, String msg) {
System.out.println("发送消息了");
rocketMQTemplate.convertAndSend(topic, msg);
}
}

消费者

实现RocketMQListener<?>接口,重写onMessage()方法。具体的业务逻辑在此方法中进行处理。

增加@RocketMQMessageListener注解:

  • consumerGroup:消费者组。
  • topic:设置Topic。
  • selectorExpression:选择Tag,默认所有(*)。部分选择:tag1 || tag2。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 12:46
* @Title
* @Description
*/

@Component
@RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",selectorExpression = "*")
public class RocketMQConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("接受到的消息为:" + s);
}
}

测试

1
2
3
4
5
6
7
8
9
10
@Autowired
private RocketMQProducer rocketMQProducer;
@Token
@RequestMapping("/sendM")
public Object testRM() {
rocketMQProducer.sendMessage("baseTopic","少时诵诗书");
return new JSONObject()
.fluentPut("flag", "ok")
;
}

启动

在上述完成后,进行启动,会发现报错如下:

1
invokeSync call the addr[null] timeout

在经过漫长的寻找和处理后,解决办法如下,降低依赖版本:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>

然后再次启动,访问对应请求即可成功。

其他消息

均为生产者发送消息。

同步消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 11:14
* @Title
* @Description
*/
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

// 发送消息的实例,如果需要增加Tag,只需要拼在topic后面即可,如:topic:tag
public void sendMessage(String topic, String msg) {
System.out.println("发送消息了");
rocketMQTemplate.syncSend(topic,msg);
}
}

异步消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.tothefor.console.consumer;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 11:14
* @Title
* @Description
*/
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

// 发送消息的实例,如果需要增加Tag,只需要拼在topic后面即可,如:topic:tag
public void sendMessage(String topic, String msg) {
System.out.println("发送消息了");
rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}

@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
}
}

单向消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.tothefor.console.consumer;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 11:14
* @Title
* @Description
*/
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

// 发送消息的实例,如果需要增加Tag,只需要拼在topic后面即可,如:topic:tag
public void sendMessage(String topic, String msg) {
System.out.println("发送消息了");
rocketMQTemplate.sendOneWay(topic,msg);
}
}

延时消息

必须发同步消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.tothefor.console.consumer;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 11:14
* @Title
* @Description
*/
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

// 发送消息的实例,如果需要增加Tag,只需要拼在topic后面即可,如:topic:tag
public void sendMessage(String topic, String msg) {
System.out.println("发送消息了");
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build(),10000,3);
}
}

  • 发送超时时间10s,延时等级为3级(10s)。

过滤

Tag过滤

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 12:46
* @Title
* @Description
*/

@Component
@RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",selectorExpression = "*")
public class RocketMQConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("接受到的消息为:" + s);
}
}


  • selectorExpression:需要的Tag。

语法过滤

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 12:46
* @Title
* @Description
*/

@Component
@RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",selectorType = SelectorType.SQL92,selectorExpression = "age > 10")
public class RocketMQConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("接受到的消息为:" + s);
}
}

  • selectorType:指定过滤类型。默认SelectorType.TAG。
  • selectorExpression:过滤条件。

消费者模式

修改消费者的消费模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 12:46
* @Title
* @Description
*/

@Component
@RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",messageModel = MessageModel.BROADCASTING)
public class RocketMQConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("接受到的消息为:" + s);
}
}

  • messageModel:设置消费者模式。默认MessageModel.CLUSTERING(集群模式)。

顺序消息

生产者的多种发送方式:

1
2
3
rocketMQTemplate.syncSendOrderly(); // 发送同步顺序消息;
rocketMQTemplate.asyncSendOrderly(); // 发送异步顺序消息;
rocketMQTemplate.sendOneWayOrderly(); // 发送单向顺序消息;

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 12:46
* @Title
* @Description
*/

@Component
@RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic",consumeMode = ConsumeMode.ORDERLY)
public class RocketMQConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("接受到的消息为:" + s);
}
}

  • consumeMode:消费模式。默认并发消费模式(ConsumeMode.CONCURRENTLY)。

事务消息

正常事务

生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.tothefor.console.consumer;

import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 11:14
* @Title
* @Description
*/
@Component
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

// 发送消息的实例,如果需要增加Tag,只需要拼在topic后面即可,如:topic:tag
public void sendMessage(String topic, String msg) {
System.out.println("发送消息了");
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(msg).build(), null);
System.out.println("发送状态:"+transactionSendResult.getSendStatus().name());
System.out.println("本地事务执行状态:"+transactionSendResult.getLocalTransactionState().name());
}
}

消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.tothefor.console.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* @Author DragonOne
* @Date 2023/2/10 12:46
* @Title
* @Description
*/

@Component
@RocketMQMessageListener(consumerGroup = "ToTheFor", topic = "baseTopic")
public class RocketMQConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println("接受到的消息为:" + s);
}
}

事务监听器

发送事务消息除了生产者和消费者以外,我们还需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.tothefor.console.consumer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
* 生产者消息监听器:
* 用于监听本地事务执行的状态和检查本地事务状态。
* @author qzz
*/
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {

/**
* 执行本地事务(在发送消息成功时执行)
* @param message
* @param o
* @return commit or rollback or unknown
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("正常本地事务执行...");
return RocketMQLocalTransactionState.COMMIT;
}

/**
* 检查本地事务的状态
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
System.out.println("调用回查本地事务接口:"+jsonStr);
return RocketMQLocalTransactionState.COMMIT;
}
}


RocketMQLocalTransactionState有三种状态:

  • COMMIT:提交。
  • ROLLBACK:回滚。
  • UNKNOWN:未知。
访问接口
1
2
3
4
5
6
7
8
9
@Autowired
private RocketMQProducer rocketMQProducer;
@RequestMapping("/sendM")
public Object testRM() {
rocketMQProducer.sendMessage("baseTopic","少时诵诗书");
return new JSONObject()
.fluentPut("flag", "ok")
;
}
结果
1
2
3
4
5
发送消息了
正常本地事务执行...
发送状态:SEND_OK
本地事务执行状态:COMMIT_MESSAGE
接受到的消息为:少时诵诗书

回滚

上面是正常的一个流程,现在演示回滚场景。只需要修改事务监听器里面的返回值即可。

1
2
3
4
5
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("正常本地事务执行...");
return RocketMQLocalTransactionState.ROLLBACK; // 返回值修改
}

然后访问接口:

1
2
3
4
发送消息了
正常本地事务执行...
发送状态:SEND_OK
本地事务执行状态:ROLLBACK_MESSAGE

补偿

需要修改事务监听器里面的返回值。

1
2
3
4
5
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("正常本地事务执行...");
return RocketMQLocalTransactionState.UNKNOWN; // 修改
}

输出为:

1
2
3
4
5
6
发送消息了
正常本地事务执行...
发送状态:SEND_OK
本地事务执行状态:UNKNOW
调用回查本地事务接口:少时诵诗书
接受到的消息为:少时诵诗书