本文最后更新于:February 12, 2023 pm
RocketMQ作为一款纯Java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,主要功能是异步解耦和流量削峰。RocketMQ主要有四大核心组成部分:NameServer 、Broker 、Producer 以及Consumer 四部分。
目录
RocketMQ版本:4.9.3
简单生产者
创建生产者步骤:
谁来发?
发给谁?
怎么发?
发什么?
发的结果是什么?
关闭。
依赖
最好版本是和MQ版本一样。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.3</version> </dependency>
生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.tothefor.mqdemo.produce;import com.alibaba.fastjson.JSON;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;public class MyProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1" ); producer.setMqClientApiTimeout(1000 *10 ); producer.setNamesrvAddr("13.39.8.9:9876" ); producer.start(); for (int i = 1 ; i <= 20 ; ++i) { Message message = new Message("Topic1" , "Tags1" , ("message" + i).getBytes(StandardCharsets.UTF_8)); SendResult send = producer.send(message,10000 ); System.out.println("第" + i + "个的发送结果为:" + JSON.toJSONString(send)); } producer.shutdown(); } }
简单消费者
步骤同生产者类似。
谁来收?
从哪儿收?
监听哪一个消息队列。(push模式)
注册监听器。
处理业务。
📢注意:不能关闭消费者,因为有监听器监听消息,是一个长连接。
消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.tothefor.mqdemo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.io.File;import java.util.List;public class MyConsumer { public static void main (String[] args) throws Exception { System.out.println(System.getProperty("user.home" ) + File.separator + ".rocketmq_offsets" ); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1" ); consumer.setMqClientApiTimeout(1000 * 10 ); consumer.setNamesrvAddr("13.39.8.9:9876" ); consumer.subscribe("Topic1" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { System.out.println(messageExt); String body = new String(messageExt.getBody()); System.out.println(body); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动..." ); } }
遇坑
org.apache.rocketmq.client.exception.MQClientException: readLocalOffset Exception, maybe fastjson version too low See http://rocketmq.apache.org/docs/faq/ for further details.
报错fastjson版本太低。
原因:广播模式会在本地生成一些文件。把删除即可。查看文件位置:(Java语句打印)
System.out.println(System.getProperty("user.home" ) + File.separator + ".rocketmq_offsets" );
多消费者模式
IDEA同时启动多个实例:在右上角的修改配置Edit Configurations -> 选择需要多实例启动的Application -> Modify options -> Allow Multiple Instances。
一个生产者,多个消费者。
分享
即生产者生成的消息会分给不同的消费者,每个消费者拿到一部分信息进行消费。
同一个消费者组,同一个Topic。每个消费者只能拿到一部分。(分享)
比如:生产者发送了10条信息。看一张图,理解一下:
示例代码同上面的一样即可,只是在生产者发送完消息后,同时启动两个消费者进行消费。(在两个消费者启动完成后,生产者再发送消息。所以,可以先启动两个消费者,再启动生产者。)
共享
生产者生成的消息会完整的复制多份,给每个消费者的信息都是完整的。
不同消费者组,同一个Topic。每个消费者都会拿到该Topic下完整的信息。(共享)
比如:生产者发送了10条信息。看一张图,理解一下:
生成者同上。消费者有点小区别。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.tothefor.mqdemo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.io.File;import java.util.List;public class MyConsumer { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1" ); consumer.setMqClientApiTimeout(1000 * 10 ); consumer.setNamesrvAddr("13.39.8.9:9876" ); consumer.subscribe("Topic1" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { System.out.println(messageExt); String body = new String(messageExt.getBody()); System.out.println(body); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动..." ); } }
设置消费者模式
在上面的图片中,如果要Group1中的consumer1和consumer2都收到10条信息,需要怎么处理?
在上面是示例中,RocketMQ默认消费者同一个组中是负载均衡模式(分享),也就是集群模式。但是我们可以进行修改消费者模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.tothefor.mqdemo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.io.File;import java.util.List;public class MyConsumer { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1" ); consumer.setMqClientApiTimeout(1000 * 10 ); consumer.setNamesrvAddr("13.39.8.9:9876" ); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("Topic1" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { System.out.println(messageExt); String body = new String(messageExt.getBody()); System.out.println(body); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("消费者启动..." ); } }
多生产者 不管有多少个生产者,只要Topic一样,那么在消费者进行消费时和上述一样。
消息类别 同步消息
及时性强,是重要的消息,且必须有回执的消息,例如:通知转账成功。
for (int i = 1 ; i <= 20 ; ++i) { Message message = new Message("Topic1" , "Tags1" , ("message" + i).getBytes(StandardCharsets.UTF_8)); SendResult send = producer.send(message, 10000 ); System.out.println("第" + i + "个的发送结果为:" + JSON.toJSONString(send)); } producer.shutdown();
发送消息后有一个结果,且是在发送完成后才能继续向下执行,从而再执行打印语句,是顺序执行的。
第1 个的发送结果为:xxxxx 第2 个的发送结果为:xxxxx ...
异步消息
即时性较弱,但需要有回执的消息。也就是发送了消息,但是不一定非要等到结果才继续向下执行。
就好像你先发你的,我先走我的,等你有了结果再说。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for (int i = 1 ; i <= 20 ; ++i) { Message message = new Message("Topic1" , "Tags1" , ("message" + i).getBytes(StandardCharsets.UTF_8)); int finalI = i; producer.send(message, new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.println("第 " + finalI + " 个执行成功了" ); } @Override public void onException (Throwable throwable) { System.out.println("第 " + finalI + " 个执行失败了" ); } }); System.out.println("第 " + i + " 个发完了" ); }
最后的关闭操作必须进行注释掉,因为是异步,消息还没发出去,可能就应该被关了。
输出:
第 1 个发完了 第 2 个发完了 ... 第 19 个发完了 第 20 个发完了 第 14 个执行成功了 第 1 个执行成功了 ... 第 11 个执行成功了
单向信息
不需要有回执的信息。即:只管发,不管结果。可能成功了,也可能失败了。
producer.sendOneway(message);
延时消息
消息发送时并不直接发送到信息服务器,而是根据设定的等待时间等级达到,起到一定的延时到达的缓冲作用。消息等级从左到右依从增加(从1开始):
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m10m 20m 30m 1h 2h
示例:
for (int i = 1 ; i <= 20 ; ++i) { Message message = new Message("Topic1" , "Tags1" , ("message" + i).getBytes(StandardCharsets.UTF_8)); message.setDelayTimeLevel(3 ); SendResult send = producer.send(message, 10000 ); System.out.println("第" + i + "个的发送结果为:" + JSON.toJSONString(send)); } producer.shutdown();
批量消息
一次发送多条消息,从而节约网络开销。
示例:
List<Message> msgs = new ArrayList<>();for (int i = 1 ; i <= 20 ; ++i) { Message message = new Message("Topic1" , "Tags1" , ("message" + i).getBytes(StandardCharsets.UTF_8)); msgs.add(message); } SendResult send = producer.send(msgs, 10000 ); producer.shutdown();
限制
批量消息应该有相同的Topic。
相同的waitStoreMsgOK。
不能是延时信息。
消息内容总长度不能超过4M。
消息内容总长度包含如下:
topic(字符串字节数)
body(字节数组长度)
消息追加的属性(key与value对应字符串字节数)
日志(固定20字节)
消息过滤 分类过滤
按照Tag过滤信息。
示例:
Message message = new Message("Topic1" , "Tags1" , ("message" + i).getBytes(StandardCharsets.UTF_8)); consumer.subscribe("Topic1" , "*" ); consumer.subscribe("Topic1" , "Tags1 || Tags2" );
语法过滤
过滤的是消息追加的属性。
基本语法:
数值比较:>、>=、<、<=、BETWEEN、=
字符比较:=、<>、IN
IS NULL 或者 IS NOT NULL
逻辑符号:AND、OR、NOT
常量支持类型:
数值:123、123.456
字符:’abc’。必须用单引号括起来。
NULL:特殊的常量
布尔值:TRUE 或 FALSE。
生产者:
Message message = new Message("Topic1" , "Tags1" , ("message" + i).getBytes(StandardCharsets.UTF_8)); message.putUserProperty("name" ,"墨水记忆" ); message.putUserProperty("age" ,"13" );
消费者:
consumer.subscribe("Topic1" , MessageSelector.bySql("age >10" ));
注意
默认的broker并没有开启对SQL语法的支持,需要修改配置。
编辑broker.conf文件:
添加配置项:
enablePropertyFilter=true
然后重启breoker。