当前位置: 首页 > news >正文

RocketMQ快速实战及核心概念

RocketMQ学习笔记

一、MQ简介

MQ定义

  • MQ:Message Queue,消息队列

  • Message:消息,不同进程之间传递的数据

  • Queue:队列,具有FIFO(先进先出)特性,用于缓存数据

广义上,只要能实现消息跨进程传输队列数据缓存,都可称为消息队列

MQ的作用

  1. 异步

    • 例子:快递员发快递,先放到菜鸟驿站,客户按自己时间取快递
    • 作用:提高系统响应速度、吞吐量
  2. 解耦

    • 例子:《Thinking in JAVA》翻译成其他语言,实现英语与其他语言交流
    • 作用:
      • 服务间解耦,减少相互影响,提高系统稳定性及可扩展性
      • 实现数据分发:生产者发送消息后,可由一个或多个消费者消费,消费者增减对生产者无影响
  3. 削峰

    • 例子:长江涨水,引入三峡大坝储存水,下游稳定排水
    • 作用:以稳定系统资源应对突发流量冲击

二、RocketMQ产品特点

1、RocketMQ介绍

  • 阿里巴巴开源的消息中间件
  • 2016年开源后捐赠给Apache,现为Apache顶级项目
  • 早期使用ActiveMQ,后因IO性能瓶颈转向Kafka,但Kafka不适合阿里业务场景
  • 2012年自研,最早叫MetaQ,后改名RocketMQ
  • 从阿里巴巴双11高并发场景中打磨,适合金融互联网场景

2、RocketMQ特点

产品 优点 缺点 适合场景
Apache Kafka 吞吐量非常大,性能好,集群高可用 有丢数据可能,功能较单一 日志分析、大数据采集
RabbitMQ 消息可靠性高,功能全面 Erlang语言不好定制,吞吐量较低 企业内部小规模服务调用
Apache Pulsar 基于BookKeeper构建,消息可靠性非常高 周边生态有差距,使用公司较少 企业内部大规模服务调用
Apache RocketMQ 高吞吐、高性能、高可用,功能全面,客户端协议丰富,使用Java语言开发方便定制 服务加载较慢 几乎全场景,特别适合金融场景

RocketMQ优势

  • 经历全球最严苛高并发场景(双11)考验
  • 消息吞吐量比Kafka稍低,但比RabbitMQ高很多
  • 阿里内部每天处理请求超5万亿次,支持核心应用超3000个
  • 天生为金融互联网而生,消息可靠性比Kafka高,吞吐量比RabbitMQ高
  • 高级功能全面:广播消费、延迟队列、死信队列等,事务消息领先潮流

三、RocketMQ快速实战

1、快速搭建RocketMQ服务

  • 官网:http://rocketmq.apache.org
  • 下载:https://rocketmq.apache.org/download
  • 版本:5.3.0(推荐最新版,2022年7月发布5.0大版本,重构代码超60%)
  • 注意:4.x版本已于2024年3月停止维护,不建议使用

配置步骤:RocketMQ的后端服务分为nameserver和broker两个服务

  1. 下载运行包(Binary),解压到/app/rocketmq目录
  2. 调整JVM内存配置(学习环境,非生产环境)
    • 修改bin/runserver.sh和runbroker.sh
    • JAVA_OPT="${JAVA_OPT}-server-Xms1g-Xmx1g-Xmn512m-XX:MetaspaceSize=128m-XX:MaxMetaspaceSize=320m"
    • JAVA_OPT="${JAVA_OPT}-server-Xms2g-Xmx2g"
  3. 启动nameserver服务
    • nohup bin/mqnamesrv&
    • 日志确认:The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
  4. 启动broker服务
    • 配置环境变量:export NAMESRV_ADDR='localhost:9876'
    • nohup bin/mqbroker&
    • 日志确认:The broker[xxxxx] boot success. serializeType=JSON and name server is localhost:9876

常用命令

  • mqshutdown namesrv:关闭nameserver服务
  • mqshutdown broker:关闭broker服务
  • jps:检查服务启动状态

2、快速实现消息收发

1》命令行快速实现消息收发

  • 发送消息

    • bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    • 默认发送1000条消息
    • 日志示例:SendResult[sendStatus=SEND_OK, msgId=C0A841708122246B179D98C9E31103E6,...]
  • 接收消息

    • bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    • 消息接收日志:ConsumeMessageThread_please_rename_unique_group_name_4_18 Receive New Messages: [MessageExt[...]]
    • 消费者会持续挂起,等待新消息,用CTRL+C停止

2》Java客户端实现

  • Maven依赖

    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
    </dependency>
    
  • 消息生产者

    public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");producer.setNamesrvAddr("192.168.65.112:9876");producer.start();for (int i = 0; i < 2; i++) {try {Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
    }
    
  • 消息消费者

    public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DemoConsumer");consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.print("Consumer Started");}
    }
    

3、搭建Java客户端项目

  • 创建标准Maven项目,引入rocketmq-client依赖
  • 实现消息生产者和消费者
  • 重点:生产者和消费者都需要指定nameserver地址
  • 消费者需要订阅具体的Topic,只有发送到该Topic的消息才会被消费

4、搭建RocketMQ可视化管理服务

  • 下载:RocketMQ Dashboard源码(官网下载)
  • 编译
    • mvn clean package -Dmaven.test.skip=true
    • 生成jar包:rocketmq-dashboard-1.0.1-SNAPSHOT.jar
  • 配置
    • 创建application.yml文件,配置namesrv地址
    rocketmq:config:namesrvAddrs:- 192.168.65.112:9876
    
  • 启动
    • java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar 1>dashboard.log 2>&1&
  • 访问:http://服务器IP:8080

Dashboard功能

  • 驾驶舱:展示RocketMQ近期运行情况
  • 运维:管理nameserver服务
  • 集群:管理broker服务

5、升级分布式集群

  • 目标:解决单点故障问题,防止数据丢失
  • 架构:主从架构,Master节点响应请求,Slave节点备份数据

集群规划

机器名 nameServer服务部署 broker服务部署
worker1 nameServer
worker2 nameServer broker-a, broker-b-s
worker3 nameServer broker-a-s, broker-b

配置步骤

  1. 在三台服务器上部署nameServer服务
  2. 配置broker集群:
    • 使用2m-2s-async模板(2主2从,异步同步)
    • 配置broker-a(Master):
      brokerClusterName=rocketmq-cluster
      brokerName=broker-a
      brokerId=0
      namesrvAddr=worker1:9876;worker2:9876;worker3:9876
      brokerRole=ASYNC_MASTER
      
    • 配置broker-a-s(Slave):
      brokerClusterName=rocketmq-cluster
      brokerName=broker-a
      brokerId=1
      namesrvAddr=worker1:9876;worker2:9876;worker3:9876
      brokerRole=SLAVE
      
    • 配置broker-b(Master):
      brokerClusterName=rocketmq-cluster
      brokerName=broker-b
      brokerId=0
      namesrvAddr=worker1:9876;worker2:9876;worker3:9876
      brokerRole=ASYNC_MASTER
      

关键配置参数

  • brokerClusterName:集群名,相同名字的节点组成一个集群
  • brokerName:Broker服务名,相同名字的节点组成一组主从
  • brokerId:节点唯一标识,0表示Master,>0表示Slave
  • brokerRole:节点角色,ASYNC_MASTER/SYNC_MASTER/SLAVE
  • namesrvAddr:nameserver地址,分号分隔

6、升级Dledger高可用集群

Dledger集群概述

  • 核心目标:解决主从架构中服务高可用性不足的问题,实现节点角色自动切换。
  • 设计原理
    • 基于 Raft协议 实现分布式一致性。
    • 所有节点通过 随机选举 机制动态决定Leader(Master)和Follower(Slave)角色。
    • 强一致性:确保数据在集群中的同步,避免单点故障导致的消息丢失。
    • 容错能力:只要集群中超过半数节点正常运行,即可提供服务。

Dledger集群配置步骤

1. 环境准备

  • 服务器要求:至少3台服务器(奇数台,保证选举机制的容错性)。

  • 配置示例

    机器名 NameServer地址 Broker配置
    worker1 worker1:9876 RaftNode00 (Leader)
    worker2 worker2:9876 RaftNode00 (Follower)
    worker3 worker3:9876 RaftNode00 (Follower)

2. 配置文件修改

  • 关键配置项
    enableDLegerCommitLog=true  # 启用Dledger功能
    dLegerGroup=RaftNode00      # Raft组名
    dLegerPeers=n0-worker1:40911;n1-worker2:40911;n2-worker3:40911  # 节点列表
    dLegerSelfId=n0             # 当前节点ID
    
  • 存储路径配置
    storePathRootDir=/app/rocketmq/storeDledger/
    storePathCommitLog=/app/rocketmq/storeDledger/commitlog
    

3. 启动Dledger集群

  • 启动命令
    nohup bin/mqbroker -c conf/dledger/broker.conf &
    
  • 验证集群状态
    • 使用 mqadmin clusterList 命令查看节点角色。
    • 通过 Dashboard 检查 Leader/Follower 分布。

4. 故障切换测试

  • 模拟故障
    • 关闭 Leader 节点(如 worker1)。
    • 观察 Raft 协议自动选举新的 Leader(worker2 或 worker3)。
  • 恢复验证
    • 重启原 Leader 节点,验证其重新加入集群后是否降级为 Follower。

Dledger集群优势与局限

  • 优势
    • 高可用:自动角色切换,避免服务中断。
    • 强一致性:Raft 协议保证数据同步。
    • 容错性:支持超过半数节点故障下的持续运行。
  • 局限
    • 性能开销:相比主从架构,写入性能略有下降。
    • 部署复杂度:需严格遵循奇数节点部署原则。

四、总结RocketMQ的运行架构

核心组件与职责

  1. NameServer(命名服务)

    • 作用:独立运行,管理 Broker 和 Topic 的注册信息。
    • 类比:相当于系统的 “CPU”,协调所有组件的通信。
    • 特点
      • 无状态,可水平扩展。
      • 客户端和 Broker 启动时需指定 NameServer 地址。
  2. Broker(核心服务)

    • 作用:负责消息的存储、转发和查询。
    • 类比:相当于系统的 “硬盘” 和 “显卡”,处理核心业务逻辑。
    • 关键功能
      • 消息持久化(CommitLog、ConsumeQueue)。
      • 主从同步(异步/同步模式)。
      • 支持 Dledger 高可用架构。
  3. Client(客户端)

    • 作用:生产者和消费者,负责消息的发送和接收。
    • 类比:相当于系统的 “输入输出设备”,通过 NameServer 和 Broker 协作完成任务。
    • 特点
      • 生产者需指定 NameServer 地址。
      • 消费者需订阅 Topic 并维护消费进度(Offset)。

架构图解

rocketmq消息模型结构

五、理解RocketMQ的消息模型

核心概念

  1. Topic

    • 定义:消息的逻辑分类,如订单、日志等业务类型。
    • 管理
      • 默认需手动创建(生产环境)。
      • 测试环境可通过配置 autoCreateTopic=true 自动创建。
  2. MessageQueue

    • 定义:Topic 下的物理队列,按 FIFO 原则存储消息。
    • 分布
      • 一个 Topic 可包含多个 MessageQueue。
      • MessageQueue 均匀分布到多个 Broker 上,实现负载均衡。
  3. Offset(偏移量)

    • 定义:消息在 MessageQueue 中的唯一标识。
    • 作用
      • 生产者发送消息后,记录当前最大 Offset。
      • 消费者通过 Offset 记录消费进度,确保消息不重复处理。

消息处理流程

  1. 生产者发送消息

    • 根据 Topic 和 MessageQueue 的分布策略(如轮询)选择目标队列。
    • 将消息写入 Broker 的 CommitLog 和 ConsumeQueue。
  2. 消费者消费消息

    • 订阅指定 Topic。
    • 从 MessageQueue 中拉取消息,基于 Offset 记录消费进度。
    • 支持广播模式(所有消费者组消费)和集群模式(消费者组内负载均衡)。
  3. 消息存储结构

    • CommitLog:消息的物理存储文件,顺序写入。
    • ConsumeQueue:消息的逻辑索引文件,记录 MessageQueue 中消息的 Offset 和大小。

与 Kafka 的对比

特性 RocketMQ Kafka
Topic 支持 高并发下性能更优,适合金融场景 Topic 过多时性能下降
消息可靠性 强一致性,适合对可靠性要求高的场景 弱一致性,优先保证可用性
消费者组管理 消费者组进度独立,支持广播模式 消费者组进度共享,仅支持集群模式
高可用架构 Dledger + Raft 协议 多副本 + ISR 机制

六、章节总结:RocketMQ 快速实战与核心概念详解

核心知识点回顾

  1. MQ 的作用

    • 异步:提高系统响应速度和吞吐量。
    • 解耦:降低服务间依赖,提升稳定性。
    • 削峰:应对突发流量冲击,保护系统资源。
  2. RocketMQ 的优势

    • 高吞吐:适合大规模消息处理。
    • 高可靠性:金融场景下的消息不丢失保障。
    • 功能全面:支持延迟队列、死信队列、事务消息等高级特性。
  3. 快速实战步骤

    • 搭建单机环境:NameServer + Broker。
    • Java 客户端集成:生产者/消费者代码示例。
    • 部署可视化管理(Dashboard)。
    • 构建分布式集群(主从架构 + Dledger 高可用)。
  4. 核心概念

    • Topic 与 MessageQueue:消息的逻辑分类与物理存储。
    • Offset 与消费组:确保消息消费的顺序性和幂等性。
    • Dledger 集群:通过 Raft 协议实现高可用。

后续学习建议

  1. 深入 RocketMQ 内核

    • 学习 CommitLog 和 ConsumeQueue 的底层实现。
    • 掌握 Dledger 的 Raft 协议细节(如选举、日志复制)。
  2. 高级功能实践

    • 实现事务消息、延迟队列、死信队列。
    • 优化生产环境配置(JVM 调优、磁盘 I/O 优化)。
  3. 对比其他 MQ 产品

    • 对比 Kafka、RabbitMQ 的设计差异,选择适合业务场景的 MQ。

关键提示

  • 生产环境建议
    • 避免使用 autoCreateTopic=true,提前规划 Topic。
    • 使用 Dledger 集群时确保奇数节点部署。
    • 定期监控 Dashboard,关注 Broker 和消费者组状态。
http://www.wxhsa.cn/company.asp?id=4902

相关文章:

  • 【南方科技大学主办】第五届电气工程与机电一体化手艺国际学术会议(ICEEMT 2025)
  • 为什么不建议在 Docker 中跑 MySQL?
  • reLeetCode 热题 100-1 指针283. 移动零 - MKT
  • 解决c# DocX生成的word文档wps打开排版外边距错乱微软office正常问题
  • The 2025 ICPC Asia East Continent Online Contest (II)
  • 工厂方法模式(Factory Method) - 指南
  • 拾忆录
  • 从零搭建RAG应用:跳过LangChain,掌握文本分块、向量检索、指代消解等核心技术实现
  • python高阶技巧
  • 机器视觉之图像处理篇 - 指南
  • 尝试hikari和jdbctemplate
  • 配置Nginx根据IP地址进行流量限制以及返回JSON格式数据
  • 回归
  • CSS纯文本渐变动效
  • 泛微流程共享
  • MySQL报错:未知系统变量tx_isolation及隔离级别查询
  • Redssion
  • if __name__ == __main__:
  • 提升系统可靠性:Air8000多串口硬件设计的黄金法则
  • 20250915笔记
  • enumerate函数
  • 2025国内 HR SaaS 竞争格局:易路以AI深度融合引领行业转型
  • HyperWorks许可激活
  • f-string用法
  • OpenStack Nova instance 常见操作
  • libdpi.dll libdatareport.dll libdash_plugin.dll libcurl-x86.dll libcurl-x64.dll libcurl_x64.dll - 指南
  • 理解 Kubernetes CSI
  • 9.15
  • 常用数学定理公式
  • 线性规划