RocketMQ学习笔记
一、MQ简介
MQ定义
-
MQ:Message Queue,消息队列
-
Message:消息,不同进程之间传递的数据
-
Queue:队列,具有FIFO(先进先出)特性,用于缓存数据
广义上,只要能实现消息跨进程传输及队列数据缓存,都可称为消息队列
MQ的作用
-
异步
- 例子:快递员发快递,先放到菜鸟驿站,客户按自己时间取快递
- 作用:提高系统响应速度、吞吐量
-
解耦
- 例子:《Thinking in JAVA》翻译成其他语言,实现英语与其他语言交流
- 作用:
- 服务间解耦,减少相互影响,提高系统稳定性及可扩展性
- 实现数据分发:生产者发送消息后,可由一个或多个消费者消费,消费者增减对生产者无影响
-
削峰
- 例子:长江涨水,引入三峡大坝储存水,下游稳定排水
- 作用:以稳定系统资源应对突发流量冲击
二、RocketMQ产品特点
1、RocketMQ介绍
- 阿里巴巴开源的消息中间件
- 2016年开源后捐赠给Apache,现为Apache顶级项目
- 早期使用ActiveMQ,后因IO性能瓶颈转向Kafka,但Kafka不适合阿里业务场景
- 2012年自研,最早叫MetaQ,后改名RocketMQ
- 从阿里巴巴双11高并发场景中打磨,适合金融互联网场景
2、RocketMQ特点
产品 | 优点 | 缺点 | 适合场景 |
---|---|---|---|
Apache Kafka | 吞吐量非常大,性能好,集群高可用 | 有丢数据可能,功能较单一 | 日志分析、大数据采集 |
RabbitMQ | 消息可靠性高,功能全面 | Erlang语言不好定制,吞吐量较低 | 企业内部小规模服务调用 |
Apache Pulsar | 基于BookKeeper构建,消息可靠性非常高 | 周边生态有差距,使用公司较少 | 企业内部大规模服务调用 |
Apache RocketMQ | 高吞吐、高性能、高可用,功能全面,客户端协议丰富,使用Java语言开发方便定制 | 服务加载较慢 | 几乎全场景,特别适合金融场景 |
RocketMQ优势:
- 经历全球最严苛高并发场景(双11)考验
- 消息吞吐量比Kafka稍低,但比RabbitMQ高很多
- 阿里内部每天处理请求超5万亿次,支持核心应用超3000个
- 天生为金融互联网而生,消息可靠性比Kafka高,吞吐量比RabbitMQ高
- 高级功能全面:广播消费、延迟队列、死信队列等,事务消息领先潮流
三、RocketMQ快速实战
1、快速搭建RocketMQ服务
- 官网:http://rocketmq.apache.org
- 下载:https://rocketmq.apache.org/download
- 版本:5.3.0(推荐最新版,2022年7月发布5.0大版本,重构代码超60%)
- 注意:4.x版本已于2024年3月停止维护,不建议使用
配置步骤:RocketMQ的后端服务分为nameserver和broker两个服务
- 下载运行包(Binary),解压到/app/rocketmq目录
- 调整JVM内存配置(学习环境,非生产环境)
- 修改bin/runserver.sh和runbroker.sh
JAVA_OPT="${JAVA_OPT}-server-Xms1g-Xmx1g-Xmn512m-XX:MetaspaceSize=128m-XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT}-server-Xms2g-Xmx2g"
- 启动nameserver服务
nohup bin/mqnamesrv&
- 日志确认:
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
- 启动broker服务
- 配置环境变量:
export NAMESRV_ADDR='localhost:9876'
nohup bin/mqbroker&
- 日志确认:
The broker[xxxxx] boot success. serializeType=JSON and name server is localhost:9876
- 配置环境变量:
常用命令:
mqshutdown namesrv
:关闭nameserver服务mqshutdown broker
:关闭broker服务jps
:检查服务启动状态
2、快速实现消息收发
1》命令行快速实现消息收发
-
发送消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
- 默认发送1000条消息
- 日志示例:
SendResult[sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31103E6,...]
-
接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 消息接收日志:
ConsumeMessageThread_please_rename_unique_group_name_4_18 Receive New Messages: [MessageExt[...]]
- 消费者会持续挂起,等待新消息,用CTRL+C停止
2》Java客户端实现
-
Maven依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version> </dependency>
-
消息生产者:
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");producer.setNamesrvAddr("192.168.65.112:9876");producer.start();for (int i = 0; i < 2; i++) {try {Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();} }
-
消息消费者:
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DemoConsumer");consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.print("Consumer Started");} }
3、搭建Java客户端项目
- 创建标准Maven项目,引入rocketmq-client依赖
- 实现消息生产者和消费者
- 重点:生产者和消费者都需要指定nameserver地址
- 消费者需要订阅具体的Topic,只有发送到该Topic的消息才会被消费
4、搭建RocketMQ可视化管理服务
- 下载:RocketMQ Dashboard源码(官网下载)
- 编译:
mvn clean package -Dmaven.test.skip=true
- 生成jar包:rocketmq-dashboard-1.0.1-SNAPSHOT.jar
- 配置:
- 创建application.yml文件,配置namesrv地址
rocketmq:config:namesrvAddrs:- 192.168.65.112:9876
- 启动:
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1>dashboard.log 2>&1&
- 访问:http://服务器IP:8080
Dashboard功能:
- 驾驶舱:展示RocketMQ近期运行情况
- 运维:管理nameserver服务
- 集群:管理broker服务
5、升级分布式集群
- 目标:解决单点故障问题,防止数据丢失
- 架构:主从架构,Master节点响应请求,Slave节点备份数据
集群规划:
机器名 | nameServer服务部署 | broker服务部署 |
---|---|---|
worker1 | nameServer | |
worker2 | nameServer | broker-a, broker-b-s |
worker3 | nameServer | broker-a-s, broker-b |
配置步骤:
- 在三台服务器上部署nameServer服务
- 配置broker集群:
- 使用2m-2s-async模板(2主2从,异步同步)
- 配置broker-a(Master):
brokerClusterName=rocketmq-cluster brokerName=broker-a brokerId=0 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 brokerRole=ASYNC_MASTER
- 配置broker-a-s(Slave):
brokerClusterName=rocketmq-cluster brokerName=broker-a brokerId=1 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 brokerRole=SLAVE
- 配置broker-b(Master):
brokerClusterName=rocketmq-cluster brokerName=broker-b brokerId=0 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 brokerRole=ASYNC_MASTER
关键配置参数:
brokerClusterName
:集群名,相同名字的节点组成一个集群brokerName
:Broker服务名,相同名字的节点组成一组主从brokerId
:节点唯一标识,0表示Master,>0表示SlavebrokerRole
:节点角色,ASYNC_MASTER/SYNC_MASTER/SLAVEnamesrvAddr
:nameserver地址,分号分隔
6、升级Dledger高可用集群
Dledger集群概述
- 核心目标:解决主从架构中服务高可用性不足的问题,实现节点角色自动切换。
- 设计原理:
- 基于 Raft协议 实现分布式一致性。
- 所有节点通过 随机选举 机制动态决定Leader(Master)和Follower(Slave)角色。
- 强一致性:确保数据在集群中的同步,避免单点故障导致的消息丢失。
- 容错能力:只要集群中超过半数节点正常运行,即可提供服务。
Dledger集群配置步骤
1. 环境准备
-
服务器要求:至少3台服务器(奇数台,保证选举机制的容错性)。
-
配置示例:
机器名 NameServer地址 Broker配置 worker1 worker1:9876 RaftNode00 (Leader) worker2 worker2:9876 RaftNode00 (Follower) worker3 worker3:9876 RaftNode00 (Follower)
2. 配置文件修改
- 关键配置项:
enableDLegerCommitLog=true # 启用Dledger功能 dLegerGroup=RaftNode00 # Raft组名 dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911 # 节点列表 dLegerSelfId=n0 # 当前节点ID
- 存储路径配置:
storePathRootDir=/app/rocketmq/storeDledger/ storePathCommitLog=/app/rocketmq/storeDledger/commitlog
3. 启动Dledger集群
- 启动命令:
nohup bin/mqbroker -c conf/dledger/broker.conf &
- 验证集群状态:
- 使用
mqadmin clusterList
命令查看节点角色。 - 通过 Dashboard 检查 Leader/Follower 分布。
- 使用
4. 故障切换测试
- 模拟故障:
- 关闭 Leader 节点(如 worker1)。
- 观察 Raft 协议自动选举新的 Leader(worker2 或 worker3)。
- 恢复验证:
- 重启原 Leader 节点,验证其重新加入集群后是否降级为 Follower。
Dledger集群优势与局限
- 优势:
- 高可用:自动角色切换,避免服务中断。
- 强一致性:Raft 协议保证数据同步。
- 容错性:支持超过半数节点故障下的持续运行。
- 局限:
- 性能开销:相比主从架构,写入性能略有下降。
- 部署复杂度:需严格遵循奇数节点部署原则。
四、总结RocketMQ的运行架构
核心组件与职责
-
NameServer(命名服务)
- 作用:独立运行,管理 Broker 和 Topic 的注册信息。
- 类比:相当于系统的 “CPU”,协调所有组件的通信。
- 特点:
- 无状态,可水平扩展。
- 客户端和 Broker 启动时需指定 NameServer 地址。
-
Broker(核心服务)
- 作用:负责消息的存储、转发和查询。
- 类比:相当于系统的 “硬盘” 和 “显卡”,处理核心业务逻辑。
- 关键功能:
- 消息持久化(CommitLog、ConsumeQueue)。
- 主从同步(异步/同步模式)。
- 支持 Dledger 高可用架构。
-
Client(客户端)
- 作用:生产者和消费者,负责消息的发送和接收。
- 类比:相当于系统的 “输入输出设备”,通过 NameServer 和 Broker 协作完成任务。
- 特点:
- 生产者需指定 NameServer 地址。
- 消费者需订阅 Topic 并维护消费进度(Offset)。
架构图解
五、理解RocketMQ的消息模型
核心概念
-
Topic
- 定义:消息的逻辑分类,如订单、日志等业务类型。
- 管理:
- 默认需手动创建(生产环境)。
- 测试环境可通过配置
autoCreateTopic=true
自动创建。
-
MessageQueue
- 定义:Topic 下的物理队列,按 FIFO 原则存储消息。
- 分布:
- 一个 Topic 可包含多个 MessageQueue。
- MessageQueue 均匀分布到多个 Broker 上,实现负载均衡。
-
Offset(偏移量)
- 定义:消息在 MessageQueue 中的唯一标识。
- 作用:
- 生产者发送消息后,记录当前最大 Offset。
- 消费者通过 Offset 记录消费进度,确保消息不重复处理。
消息处理流程
-
生产者发送消息
- 根据 Topic 和 MessageQueue 的分布策略(如轮询)选择目标队列。
- 将消息写入 Broker 的 CommitLog 和 ConsumeQueue。
-
消费者消费消息
- 订阅指定 Topic。
- 从 MessageQueue 中拉取消息,基于 Offset 记录消费进度。
- 支持广播模式(所有消费者组消费)和集群模式(消费者组内负载均衡)。
-
消息存储结构
- CommitLog:消息的物理存储文件,顺序写入。
- ConsumeQueue:消息的逻辑索引文件,记录 MessageQueue 中消息的 Offset 和大小。
与 Kafka 的对比
特性 | RocketMQ | Kafka |
---|---|---|
Topic 支持 | 高并发下性能更优,适合金融场景 | Topic 过多时性能下降 |
消息可靠性 | 强一致性,适合对可靠性要求高的场景 | 弱一致性,优先保证可用性 |
消费者组管理 | 消费者组进度独立,支持广播模式 | 消费者组进度共享,仅支持集群模式 |
高可用架构 | Dledger + Raft 协议 | 多副本 + ISR 机制 |
六、章节总结:RocketMQ 快速实战与核心概念详解
核心知识点回顾
-
MQ 的作用
- 异步:提高系统响应速度和吞吐量。
- 解耦:降低服务间依赖,提升稳定性。
- 削峰:应对突发流量冲击,保护系统资源。
-
RocketMQ 的优势
- 高吞吐:适合大规模消息处理。
- 高可靠性:金融场景下的消息不丢失保障。
- 功能全面:支持延迟队列、死信队列、事务消息等高级特性。
-
快速实战步骤
- 搭建单机环境:NameServer + Broker。
- Java 客户端集成:生产者/消费者代码示例。
- 部署可视化管理(Dashboard)。
- 构建分布式集群(主从架构 + Dledger 高可用)。
-
核心概念
- Topic 与 MessageQueue:消息的逻辑分类与物理存储。
- Offset 与消费组:确保消息消费的顺序性和幂等性。
- Dledger 集群:通过 Raft 协议实现高可用。
后续学习建议
-
深入 RocketMQ 内核:
- 学习 CommitLog 和 ConsumeQueue 的底层实现。
- 掌握 Dledger 的 Raft 协议细节(如选举、日志复制)。
-
高级功能实践:
- 实现事务消息、延迟队列、死信队列。
- 优化生产环境配置(JVM 调优、磁盘 I/O 优化)。
-
对比其他 MQ 产品:
- 对比 Kafka、RabbitMQ 的设计差异,选择适合业务场景的 MQ。
关键提示
- 生产环境建议:
- 避免使用
autoCreateTopic=true
,提前规划 Topic。 - 使用 Dledger 集群时确保奇数节点部署。
- 定期监控 Dashboard,关注 Broker 和消费者组状态。
- 避免使用