• 教学地址:b站
  • 本地使用mq
    • cd /Users/cyt/huanjing/java/middle/rocketmq-all-4.8.0-bin-release
    • sh start.sh 开启mq
    • sh close.sh 关闭mq
    • cd ../web
    • java -jar rocketmq-console-ng-2.0.0.jar 开启mq控制台
  • 后续使用docker搭建mq

基本知识

MQ介绍

对比

介绍

优势和劣势

优势

应用解耦

  • 提高系统容错性和可维护性
  • 生产者发送消息到MQ
  • mq可以复制多份到其他系统

异步提速

  • 提升用户体验和系统吞吐量

  • 从下单到结果只需要100ms

    • 先发给mq

    • 存入mysql

    • 后面的200ms不用管

削峰填谷

  1. 提高系统稳定性
  2. 把mq当做缓存
  3. 10w个请求,从mq里面取数据,放入mysql

示意图

劣势

工作原理(重点)

  • 生产者 消费者 服务器三大集群都要去 命名服务器 注册自己的ip信息
  • 消费者对服务器集群获取消息有两种方式
    • 主动拉去消息
    • 用监听器
  • 为了判断三大集群是否挂掉
    • 三大集群设置心跳
  • 消息信息
    • 一级 主题Topic
    • 二级分类:标题tag

安装

  • https://blog.csdn.net/qq_46111881/article/details/127110518?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522b3f9dadfe714d8126d35743d4131786e%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=b3f9dadfe714d8126d35743d4131786e&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-2-127110518-null-null.142^v102^pc_search_result_base8&utm_term=mac%20%E5%AE%89%E8%A3%85rocketmq&spm=1018.2226.3001.4187

  • #这个路径是你rocketmq解压后的文件夹路径
    export rocketMq_home=/Users/cyt/huanjing/java/middle/rocketmq-all-4.8.0-bin-release
    #这个路径是你安装的jdk版本的路径,你需要查看你的jdk版本,改变
    export JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_321.jdk/Contents/Home"
    
      export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home
    export JAVA_HOME=/Library/Java/JavaVirtualMachines/zulu-8.jdk/Contents/Home
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48

    * https://blog.csdn.net/qq_41833935/article/details/125315386?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522b3f9dadfe714d8126d35743d4131786e%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=b3f9dadfe714d8126d35743d4131786e&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-125315386-null-null.142^v102^pc_search_result_base8&utm_term=mac%20%E5%AE%89%E8%A3%85rocketmq&spm=1018.2226.3001.4187

    # 消息发送(重点)

    1. 基于Java环境构建消息发送与消息接收基础程序

    * 单生产者单消费者

    * 单生产者多消费者

    * 多生产者多消费者

    2. 发送不同类型的消息
    1. 同步消息
    2. 异步消息
    3. 单向消息
    3. 特殊的消息发送
    1. 延时消息
    2. 批量消息
    4. 特殊的消息接收
    1. 消息过滤
    5. 消息发送与接收顺序控制
    6. 事务消息

    ## 单生产者单消费者消息发送(OneToOne)

    ### 生产者

    ```java
    public class Producer {
    public static void main(String[] args) throws Exception {
    // 1.谁来发?
    DefaultMQProducer producer = new DefaultMQProducer("group");
    // 2.发给谁
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    // 3.怎么发
    // 4.发什么
    String msg = "hello word!";
    Message message = new Message("topic1", "tag1", msg.getBytes());
    SendResult sendResult = producer.send(message);
    // 5.发的结果是什么
    System.out.println(sendResult);
    // 6.打扫战场
    producer.shutdown();
    }
    }

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Consumer {
public static void main(String[] args) throws Exception {
/**
* 1. 谁来发?
* 2. 发给谁?
* 3. 怎么发?
* 4. 发什么?
* 5. 发的结果是什么?
* 6. 打扫战场
**/
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("localhost:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic1", "*");
//4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 写业务逻辑
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//6 不要关闭消费者!
}
}

单生产者多消费者消息发送(OneToMany)

生产者

  • for循环发送十条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Producer {
public static void main(String[] args) throws Exception {
// 1.谁来发?
DefaultMQProducer producer = new DefaultMQProducer("group");
// 2.发给谁
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 3.怎么发
// 4.发什么
for (int i = 0; i < 10; i++) {
String msg = "hello word!" + i;
Message message = new Message("topic2", "tag1", msg.getBytes());
SendResult sendResult = producer.send(message);
// 5.发的结果是什么
System.out.println(sendResult);
}
// 6.打扫战场
producer.shutdown();
}
}

消费者-负载均衡( 默认)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("localhost:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic2", "*");
//4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 写业务逻辑
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
System.out.println("================================");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//6 不要关闭消费者!
}
}

编辑启动类

image-20250930171120696

允许多个实例

运行两个消费者类(不同实例)

  • 发现mq默认使用负载均衡
    • 生产者发送10个消息
    • 第一个消费者随机消费104598
    • 第二个随机消费2367

image-20250930171418561

image-20250930171431879

消费者-广播模式

  1. 生产者发送 group1 topic3 tag1
  2. 消费者
    1. 1号 group1 topic3
    2. 2号 group2 topic3
  3. 此时发现消费者都消费了10条消息
  4. 使用 consumer.setMessageModel(MessageModel.BROADCASTING); 修改配置

image-20250930172543062

消息类别

同步消息

异步消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Producer {
public static void main(String[] args) throws Exception {
// 1.谁来发?
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.发给谁
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 3.怎么发
// 4.发什么
for (int i = 0; i < 10; i++) {
String msg = "hello word!" + i;
Message message = new Message("topic5", "tag1", msg.getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}

@Override
public void onException(Throwable e) {
// 业务逻辑
System.out.println(e);
}
});
}
// 6.不要关闭!
// producer.shutdown();
}
}
  • 如订单消息 可以过一小时才获取消息结果

单向消息

  • producer.sendOneway(message);

延时消息

批量消息

批量消息
注意点:

  1. 这些批量消息应该有相同的topic
    2相同的waitStoreMsgOK
    3不能是延时消息
    4消息内容总长度不超过4M
    消息内容总长度包含如下:

  2. topic(字符串字节数)

  3. body(字节数组长度)

  4. 消息追加的属性(key与value对应字符串字节数)

  5. 日志(固定20字节)

消息过滤

tag过滤

  • consumer.subscribe("topic8", "tag1"); 过滤tag1的消息
  • consumer.subscribe("topic8", "tag1 || vip"); 过滤tag1或者vip的

sql过滤 (属性过滤)

  1. 生产者设置属性
1
2
message.putUserProperty("name", "zhangsan");
message.putUserProperty("age", "22");
  1. 消费者设置表达式过滤
1
consumer.subscribe("topic8", MessageSelector.bySql("age > 18"));
  1. broker.conf 文件下设置 开启sql过滤
1
enablePropertyFilter=true

SpringBoot整合

1. 导包

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>

2 配置文件

1
2
3
4
5
6
7
8
9
10
11
server:
port: 8081

rocketmq:
name-server: localhost:9876
producer:
group: group1


#rocketmq.name-server=localhost:9876
#rocketmq.producer.group=demo_producer

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class User implements Serializable {
String userName;

int age;

@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", age=" + age +
'}';
}


public void setUserName(String userName) {
this.userName = userName;
}

public void setAge(int age) {
this.age = age;
}

public String getUserName() {
return userName;
}

public int getAge() {
return age;
}

public User(String userName, int age) {
this.userName = userName;
this.age = age;
}

public User() {

}
}

3 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
@RequestMapping("/demo")
public class SendController {

// 模本类 建立连接
@Autowired
RocketMQTemplate rocketMqTemplate;

@GetMapping("/send")
public String send() {

String msg = "hello world";

User cyt = new User("cyt", 23);

// rocketMqTemplate.convertAndSend("topic10", cyt);
rocketMqTemplate.convertAndSend("topic10", msg);
return "success";
}

}

4 消费者

1
2
3
4
5
6
7
8
9
@Service
@RocketMQMessageListener(topic = "topic10", consumerGroup = "group1")
public class Consumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println(s);
}
}

5 其他消息

同步消息

1
2
// 同步消息
rocketMqTemplate.syncSend("topic10", "同步消息" + cyt);

异步消息

1
2
3
4
5
6
7
8
9
10
11
12
13
// 异步消息
rocketMqTemplate.asyncSend("topic10", "异步消息" + cyt, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("SendCallback" + sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.println("onException" + throwable);
}
// 超时
}, 5000);

单向消息

1
2
// 单向消息
rocketMqTemplate.sendOneWay("topic10", "单向消息" + cyt);

延迟消息

1
2
// 延迟消息 2000超时时间 3:等级3 10s后延迟
rocketMqTemplate.syncSend("topic10", MessageBuilder.withPayload("延迟消息" + msg).build(), 2000, 3);

批量消息

1
2
3
4
5
6
// 批量消息
List<Message> msgList = new ArrayList<>();
msgList.add(new Message("topic6", "tag1", "msg1".getBytes()));
msgList.add(new Message("topic6", "tag1", "msg2".getBytes()));
msgList.add(new Message("topic6", "tag1", "msg3".getBytes()));
rocketMQTemplate.syncSend("topic8", msgList, 1000);

过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
// tag过滤
//@RocketMQMessageListener(topic = "topic10", consumerGroup = "group1", selectorExpression = "tag1 || tag2")
// sql 过滤 + 广播模式
@RocketMQMessageListener(topic = "topic10",
consumerGroup = "group1",
selectorType = SelectorType.SQL92, selectorExpression = "age > 18",
messageModel = MessageModel.BROADCASTING)
public class Consumer implements RocketMQListener<String> {

@Override
public void onMessage(String s) {
System.out.println(s);
}
}

消息特殊处理

消息顺序

错乱的顺序原因

  • 原因:

    • 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序或者全局有序。

    • 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。 但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取, 则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分 区有序,即相对每个queue,消息都是有序的。

    • 下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消 息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

想要的效果

算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<OrderStep> orderList = new Producer().buildOrders();
//设置消息进入到指定的消息队列中
for (final OrderStep order : orderList) {
Message msg = new Message("topic1", order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List<MessageQueue> list, Message
message, Object o) {
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
long orderId = order.getOrderId();
long mqIndex = orderId % list.size();
return list.get((int) mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();

消息事务

事务消息概念

  • 红色为正常事务过程
  • 蓝色为事务补偿过程
  • 理解案例: 生产者是我 broker兄弟 本地事务 老婆管钱
  1. 我同意借钱
  2. 兄弟回复ok
  3. 问老婆意愿是否借钱
  4. 提交或回归 借或者不借
  • 补偿
  1. 本地事务出问题
  2. 检测本地事务
  3. 提交或者回滚

事务消息状态

  1. 提交状态:允许进入队列,此消息与非事务消息无区别
  2. 回滚状态:不允许进入队列,此消息等同于未发送过
  3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
  • 注意:事务消息仅与生产者有关,与消费者无关

代码实现

事务提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Producer {
public static void main(String[] args) throws Exception {
// 1.谁来发? 事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("group1");
// 2.发给谁
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听
producer.setTransactionListener(new TransactionListener() {
// 正常事务过程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 把消息报错到数据库中
// sql insert
System.out.println("执行了正常的事务过程");
return LocalTransactionState.COMMIT_MESSAGE; // 提交
}

// 事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
// 4.发什么
producer.start();
String msg = "hello word!";
Message message = new Message("topic13", "tag1", msg.getBytes());
// 发送事务消息
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.println(transactionSendResult);
}
}

事务回滚

  • return LocalTransactionState.ROLLBACK_MESSAGE;

事务补偿

  • 如果事务补偿仍返回 UNKNOW 则人工排查
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
producer.setTransactionListener(new TransactionListener() {
// 正常事务过程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 把消息报错到数据库中
// sql insert
System.out.println("执行了正常的事务过程");
return LocalTransactionState.UNKNOW;
}

// 事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("执行事务补偿过程");
// sql 业务
return LocalTransactionState.COMMIT_MESSAGE;
}
});

集群搭建

集群分类

集群特征

  • 主从broker
    • 规定 id=0 的为主节点
    • brokerName=bs 的为一个集群
    • 每个broker需要再每个 nameserver 注册topic等信息

工作流程

两主两从集群

5 高级特性(重点)

5.1 消息的存储

5.2 消息的存储介质

  • 直接向数据库的文件放数据 (文件系统)
    • 速度快
    • 防止DB挂掉

5.3 高效的消息存储与读写方式

随机写(100k/s)

顺序写 (600MB/s)

  • 1秒1部电影

mq方式

  • 预先申请空间(如1GB)
  • 顺序读写
  • 配置文件可以修改

image-20251009171229997

零拷贝技术

5.4 消息存储结构

  • commitlog 目录下存放消息的信息 各个队列queue
  • consumequeue 消费逻辑队列
    • 消费者重启后 可以定位上次读写的位置
    • 包含偏移量信息
  • index 索引
    • 存放队列索引
    • 时间
    • 读到第几个队列

5.5 刷盘机制

  • broker把数据存放到 commitlog 文件夹下面, 此时采用 刷盘机制

同步刷盘

异步刷盘

  • 只包含1,3, 7三步
  • 生产者给broker
  • broker立即返回ACK 保存成功
    • 此时数据在内存
  • 当数据达到一定量写入磁盘持久化

对比&配置

5.6 高可用性

  • nameserver 部署多台 数据不共享
  • broker 主从架构
  • producer topic到不同的多个group组
  • consumer 读写分离

5.7 负载均衡

producer

consumer

循环平均分配:

当broker-b挂了,此时三个消费者仍是平均地从 broker-a中取数据

5.8 消息重试

顺序消息重试

无序消息重试

  • 每隔一定时间自动重发
    • 间隔时间递增

5.9 死信队列

  1. 当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息 (Dead-Letter Message)
  2. 死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
  3. 死信队列特征
    1. 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
    2. 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
    3. 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
  4. 死信队列中消息特征
    1. 不会被再次重复消费
    2. 死信队列中的消息有效期为3天,达到时限后将被清除

5.10 消息重复消费与消息幂等

原因重复原因

消息幂等