RocketMQ-(二)RocketMQ的基本使用

本文最后更新于:February 12, 2023 pm

RocketMQ作为一款纯Java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,主要功能是异步解耦和流量削峰。RocketMQ主要有四大核心组成部分:NameServerBrokerProducer以及Consumer四部分。

目录

RocketMQ版本:4.9.3

简单生产者

创建生产者步骤:

  • 谁来发?
  • 发给谁?
  • 怎么发?
  • 发什么?
  • 发的结果是什么?
  • 关闭。

依赖

最好版本是和MQ版本一样。

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.tothefor.mqdemo.produce;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

/**
* @Author DragonOne
* @Date 2023/2/6 21:35
* @墨水记忆 www.tothefor.com
*/
public class MyProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置超时时间
producer.setMqClientApiTimeout(1000*10);
producer.setNamesrvAddr("13.39.8.9:9876");
producer.start();
for (int i = 1; i <= 20; ++i) {
Message message = new Message("Topic1", "Tags1", ("message" + i).getBytes(StandardCharsets.UTF_8));
// 此处的timeout是等待broker响应信息的时长,设置为10s,以防超时。默认只有3s
SendResult send = producer.send(message,10000);
System.out.println("第" + i + "个的发送结果为:" + JSON.toJSONString(send));
}
producer.shutdown();
}
}

简单消费者

步骤同生产者类似。

  • 谁来收?
  • 从哪儿收?
  • 监听哪一个消息队列。(push模式)
  • 注册监听器。
  • 处理业务。

📢注意:不能关闭消费者,因为有监听器监听消息,是一个长连接。

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.tothefor.mqdemo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.io.File;
import java.util.List;

/**
* @Author DragonOne
* @Date 2023/2/6 22:17
* @墨水记忆 www.tothefor.com
*/
public class MyConsumer {
public static void main(String[] args) throws Exception {
System.out.println(System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setMqClientApiTimeout(1000 * 10);
consumer.setNamesrvAddr("13.39.8.9:9876");
consumer.subscribe("Topic1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println(messageExt);
String body = new String(messageExt.getBody());
System.out.println(body);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动...");

}
}

遇坑

org.apache.rocketmq.client.exception.MQClientException: readLocalOffset Exception, maybe fastjson version too low
See http://rocketmq.apache.org/docs/faq/ for further details.

报错fastjson版本太低。

  • 原因:广播模式会在本地生成一些文件。把删除即可。查看文件位置:(Java语句打印)
1
System.out.println(System.getProperty("user.home") + File.separator + ".rocketmq_offsets");

多消费者模式

  • IDEA同时启动多个实例:在右上角的修改配置Edit Configurations -> 选择需要多实例启动的Application -> Modify options -> Allow Multiple Instances。

一个生产者,多个消费者。

分享

即生产者生成的消息会分给不同的消费者,每个消费者拿到一部分信息进行消费。

  • 同一个消费者组,同一个Topic。每个消费者只能拿到一部分。(分享)

比如:生产者发送了10条信息。看一张图,理解一下:

示例代码同上面的一样即可,只是在生产者发送完消息后,同时启动两个消费者进行消费。(在两个消费者启动完成后,生产者再发送消息。所以,可以先启动两个消费者,再启动生产者。)

共享

生产者生成的消息会完整的复制多份,给每个消费者的信息都是完整的。

  • 不同消费者组,同一个Topic。每个消费者都会拿到该Topic下完整的信息。(共享)

比如:生产者发送了10条信息。看一张图,理解一下:

生成者同上。消费者有点小区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.tothefor.mqdemo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.io.File;
import java.util.List;

/**
* @Author DragonOne
* @Date 2023/2/6 22:17
* @墨水记忆 www.tothefor.com
*/
public class MyConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 区别在这:先group1启动,再修改为group2启动
// DefaultMQProducer producer = new DefaultMQProducer("group2");
consumer.setMqClientApiTimeout(1000 * 10);
consumer.setNamesrvAddr("13.39.8.9:9876");
consumer.subscribe("Topic1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println(messageExt);
String body = new String(messageExt.getBody());
System.out.println(body);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动...");


}
}

设置消费者模式

在上面的图片中,如果要Group1中的consumer1和consumer2都收到10条信息,需要怎么处理?

在上面是示例中,RocketMQ默认消费者同一个组中是负载均衡模式(分享),也就是集群模式。但是我们可以进行修改消费者模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.tothefor.mqdemo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.io.File;
import java.util.List;

/**
* @Author DragonOne
* @Date 2023/2/6 22:17
* @墨水记忆 www.tothefor.com
*/
public class MyConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 先group1启动,再修改为group2启动
// DefaultMQProducer producer = new DefaultMQProducer("group2");
consumer.setMqClientApiTimeout(1000 * 10);
consumer.setNamesrvAddr("13.39.8.9:9876");
// 设置广播模式,默认集群模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("Topic1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println(messageExt);
String body = new String(messageExt.getBody());
System.out.println(body);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者启动...");

}
}


多生产者

不管有多少个生产者,只要Topic一样,那么在消费者进行消费时和上述一样。

消息类别

同步消息

及时性强,是重要的消息,且必须有回执的消息,例如:通知转账成功。

1
2
3
4
5
6
7
for (int i = 1; i <= 20; ++i) {
Message message = new Message("Topic1", "Tags1", ("message" + i).getBytes(StandardCharsets.UTF_8));

SendResult send = producer.send(message, 10000);
System.out.println("第" + i + "个的发送结果为:" + JSON.toJSONString(send));
}
producer.shutdown();
  • 发送消息后有一个结果,且是在发送完成后才能继续向下执行,从而再执行打印语句,是顺序执行的。
1
2
3
1个的发送结果为:xxxxx
2个的发送结果为:xxxxx
...

异步消息

即时性较弱,但需要有回执的消息。也就是发送了消息,但是不一定非要等到结果才继续向下执行。

就好像你先发你的,我先走我的,等你有了结果再说。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for (int i = 1; i <= 20; ++i) {
Message message = new Message("Topic1", "Tags1", ("message" + i).getBytes(StandardCharsets.UTF_8));
int finalI = i;
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("第 " + finalI + " 个执行成功了");
}

@Override
public void onException(Throwable throwable) {
System.out.println("第 " + finalI + " 个执行失败了");
}
});
System.out.println("第 " + i + " 个发完了");
}
// producer.shutdown();
  • 最后的关闭操作必须进行注释掉,因为是异步,消息还没发出去,可能就应该被关了。

输出:

1
2
3
4
5
6
7
8
9
1 个发完了
2 个发完了
...
19 个发完了
20 个发完了
14 个执行成功了
1 个执行成功了
...
11 个执行成功了

单向信息

不需要有回执的信息。即:只管发,不管结果。可能成功了,也可能失败了。

1
producer.sendOneway(message);

延时消息

消息发送时并不直接发送到信息服务器,而是根据设定的等待时间等级达到,起到一定的延时到达的缓冲作用。消息等级从左到右依从增加(从1开始):

  • 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m10m 20m 30m 1h 2h

示例:

1
2
3
4
5
6
7
8
for (int i = 1; i <= 20; ++i) {
Message message = new Message("Topic1", "Tags1", ("message" + i).getBytes(StandardCharsets.UTF_8));
message.setDelayTimeLevel(3); // 第三等级 10s
// 此处的timeout是等待broker响应信息的时长,设置为10s,以防超时。默认只有3s
SendResult send = producer.send(message, 10000);
System.out.println("第" + i + "个的发送结果为:" + JSON.toJSONString(send));
}
producer.shutdown();

批量消息

一次发送多条消息,从而节约网络开销。

示例:

1
2
3
4
5
6
7
8
List<Message> msgs = new ArrayList<>();
for (int i = 1; i <= 20; ++i) {
Message message = new Message("Topic1", "Tags1", ("message" + i).getBytes(StandardCharsets.UTF_8));
msgs.add(message);
}

SendResult send = producer.send(msgs, 10000);
producer.shutdown();
  • 📢注意:批量消息不支持设置延时!!!

限制

  • 批量消息应该有相同的Topic。
  • 相同的waitStoreMsgOK。
  • 不能是延时信息。
  • 消息内容总长度不能超过4M。
  • 消息内容总长度包含如下:
    • topic(字符串字节数)
    • body(字节数组长度)
    • 消息追加的属性(key与value对应字符串字节数)
    • 日志(固定20字节)

消息过滤

分类过滤

按照Tag过滤信息。

示例:

1
2
3
4
5
6
// 生产者
Message message = new Message("Topic1", "Tags1", ("message" + i).getBytes(StandardCharsets.UTF_8));

// 消费者
consumer.subscribe("Topic1", "*"); // 获取所有的Tag
consumer.subscribe("Topic1", "Tags1 || Tags2"); // 只获取Tag为Tags1的和Tags2的

语法过滤

过滤的是消息追加的属性。

基本语法:

  • 数值比较:>、>=、<、<=、BETWEEN、=
  • 字符比较:=、<>、IN
  • IS NULL 或者 IS NOT NULL
  • 逻辑符号:AND、OR、NOT

常量支持类型:

  • 数值:123、123.456
  • 字符:’abc’。必须用单引号括起来。
  • NULL:特殊的常量
  • 布尔值:TRUE 或 FALSE。

生产者:

1
2
3
Message message = new Message("Topic1", "Tags1", ("message" + i).getBytes(StandardCharsets.UTF_8));
message.putUserProperty("name","墨水记忆");
message.putUserProperty("age","13");

消费者:

1
consumer.subscribe("Topic1", MessageSelector.bySql("age >10"));
  • 只消费age大于10的。

注意

默认的broker并没有开启对SQL语法的支持,需要修改配置。

编辑broker.conf文件:

1
vim broker.conf

添加配置项:

1
enablePropertyFilter=true

然后重启breoker。


本文作者: 墨水记忆
本文链接: https://tothefor.com/DragonOne/3b617eab.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!