rocketMQ笔记
- 教学地址:b站
- 本地使用mq
cd /Users/cyt/huanjing/java/middle/rocketmq-all-4.8.0-bin-releasesh start.sh开启mqsh close.sh关闭mqcd ../webjava -jar rocketmq-console-ng-2.0.0.jar开启mq控制台- 后续使用docker搭建mq
基本知识
MQ介绍
对比
介绍
优势和劣势
优势
应用解耦
提高系统容错性和可维护性- 生产者发送消息到MQ
- mq可以复制多份到其他系统
异步提速
提升用户体验和系统吞吐量从下单到结果只需要100ms
先发给mq
存入mysql
后面的200ms不用管
削峰填谷
提高系统稳定性- 把mq当做缓存
- 10w个请求,从mq里面取数据,放入mysql
示意图
劣势
工作原理(重点)
- 生产者 消费者 服务器三大集群都要去
命名服务器注册自己的ip信息- 消费者对服务器集群获取消息有两种方式
主动拉去消息用监听器- 为了判断三大集群是否挂掉
- 三大集群设置心跳
- 消息信息
- 一级
主题Topic- 二级分类:
标题tag
安装
#这个路径是你rocketmq解压后的文件夹路径 export rocketMq_home=/Users/cyt/huanjing/java/middle/rocketmq-all-4.8.0-bin-release #这个路径是你安装的jdk版本的路径,你需要查看你的jdk版本,改变 export JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_321.jdk/Contents/Home" export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home export JAVA_HOME=/Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
* https://blog.csdn.net/qq_41833935/article/details/125315386?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522b3f9dadfe714d8126d35743d4131786e%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=b3f9dadfe714d8126d35743d4131786e&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-125315386-null-null.142^v102^pc_search_result_base8&utm_term=mac%20%E5%AE%89%E8%A3%85rocketmq&spm=1018.2226.3001.4187
# 消息发送(重点)
1. 基于Java环境构建消息发送与消息接收基础程序
* 单生产者单消费者
* 单生产者多消费者
* 多生产者多消费者
2. 发送不同类型的消息
1. 同步消息
2. 异步消息
3. 单向消息
3. 特殊的消息发送
1. 延时消息
2. 批量消息
4. 特殊的消息接收
1. 消息过滤
5. 消息发送与接收顺序控制
6. 事务消息
## 单生产者单消费者消息发送(OneToOne)
### 生产者
```java
public class Producer {
public static void main(String[] args) throws Exception {
// 1.谁来发?
DefaultMQProducer producer = new DefaultMQProducer("group");
// 2.发给谁
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 3.怎么发
// 4.发什么
String msg = "hello word!";
Message message = new Message("topic1", "tag1", msg.getBytes());
SendResult sendResult = producer.send(message);
// 5.发的结果是什么
System.out.println(sendResult);
// 6.打扫战场
producer.shutdown();
}
}
消费者
1 | public class Consumer { |
单生产者多消费者消息发送(OneToMany)
生产者
- for循环发送十条消息
1 | public class Producer { |
消费者-负载均衡( 默认)
1 | public class Consumer { |
编辑启动类

允许多个实例
运行两个消费者类(不同实例)
- 发现mq默认使用负载均衡
- 生产者发送10个消息
- 第一个消费者随机消费104598
- 第二个随机消费2367


消费者-广播模式
- 生产者发送
group1 topic3 tag1- 消费者
- 1号
group1 topic3- 2号
group2 topic3- 此时发现消费者都消费了10条消息
- 使用
consumer.setMessageModel(MessageModel.BROADCASTING);修改配置

消息类别
同步消息
异步消息
1 | public class Producer { |
- 如订单消息 可以过一小时才获取消息结果
单向消息
producer.sendOneway(message);
延时消息
批量消息
批量消息
注意点:
这些批量消息应该有相同的topic
2相同的waitStoreMsgOK
3不能是延时消息
4消息内容总长度不超过4M
消息内容总长度包含如下:topic(字符串字节数)
body(字节数组长度)
消息追加的属性(key与value对应字符串字节数)
日志(固定20字节)
消息过滤
tag过滤
consumer.subscribe("topic8", "tag1");过滤tag1的消息consumer.subscribe("topic8", "tag1 || vip");过滤tag1或者vip的
sql过滤 (属性过滤)
- 生产者设置属性
1 | message.putUserProperty("name", "zhangsan"); |
- 消费者设置表达式过滤
1 | consumer.subscribe("topic8", MessageSelector.bySql("age > 18")); |
- 在
broker.conf文件下设置 开启sql过滤
1 | enablePropertyFilter=true |
SpringBoot整合
1. 导包
1 | <dependency> |
2 配置文件
1 | server: |
实体类
1 | public class User implements Serializable { |
3 生产者
1 |
|
4 消费者
1 |
|
5 其他消息
同步消息
1 | // 同步消息 |
异步消息
1 | // 异步消息 |
单向消息
1 | // 单向消息 |
延迟消息
1 | // 延迟消息 2000超时时间 3:等级3 10s后延迟 |
批量消息
1 | // 批量消息 |
过滤
1 |
|
消息特殊处理
消息顺序
错乱的顺序原因
原因:
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。 但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取, 则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分 区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消 息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
想要的效果
算法实现
1 | List<OrderStep> orderList = new Producer().buildOrders(); |
消息事务
事务消息概念
- 红色为正常事务过程
- 蓝色为事务补偿过程
- 理解案例: 生产者是我 broker兄弟 本地事务
老婆管钱
- 我同意借钱
- 兄弟回复ok
- 问老婆意愿是否借钱
- 提交或回归 借或者不借
- 补偿
- 本地事务出问题
- 检测本地事务
- 提交或者回滚
事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
- 注意:事务消息仅与生产者有关,与消费者无关
代码实现
事务提交
1 | public class Producer { |
事务回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
事务补偿
- 如果事务补偿仍返回
UNKNOW则人工排查
1 | producer.setTransactionListener(new TransactionListener() { |
集群搭建
集群分类
集群特征
- 主从broker
- 规定
id=0的为主节点brokerName=bs的为一个集群- 每个broker需要再每个
nameserver注册topic等信息
工作流程
两主两从集群
5 高级特性(重点)
5.1 消息的存储
5.2 消息的存储介质
- 直接向数据库的文件放数据 (文件系统)
- 速度快
- 防止DB挂掉
5.3 高效的消息存储与读写方式
随机写(100k/s)
顺序写 (600MB/s)
1秒1部电影
mq方式
- 预先申请空间(如1GB)
顺序读写- 配置文件可以修改

零拷贝技术
5.4 消息存储结构
commitlog目录下存放消息的信息 各个队列queue
consumequeue消费逻辑队列
- 消费者重启后 可以定位上次读写的位置
- 包含偏移量信息
index索引
- 存放队列索引
- 时间
- 读到第几个队列
5.5 刷盘机制
- broker把数据存放到
commitlog文件夹下面, 此时采用刷盘机制
同步刷盘
异步刷盘
- 只包含1,3, 7三步
- 生产者给broker
- broker立即返回ACK 保存成功
- 此时数据在内存
- 当数据达到一定量写入磁盘持久化
对比&配置
5.6 高可用性
nameserver部署多台 数据不共享broker主从架构producertopic到不同的多个group组consumer读写分离
5.7 负载均衡
producer
consumer
循环平均分配:
当broker-b挂了,此时三个消费者仍是平均地从
broker-a中取数据
5.8 消息重试
顺序消息重试
无序消息重试
- 每隔一定时间自动重发
- 间隔时间递增
5.9 死信队列
- 当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息 (Dead-Letter Message)
- 死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
- 死信队列特征
- 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
- 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
- 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
- 死信队列中消息特征
- 不会被再次重复消费
- 死信队列中的消息有效期为3天,达到时限后将被清除
5.10 消息重复消费与消息幂等
原因重复原因
消息幂等

本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 cyt的笔记屋!










