菜单

消息队列和 RocketMQ

相关源文件

本文档概述了消息队列,并详细介绍了Apache RocketMQ。它涵盖了RocketMQ作为分布式消息和流平台的根本概念、架构、特性和常用使用模式。有关高可用性配置的信息,请参阅高可用性设计

消息队列介绍

消息队列是充当中间系统的系统,它们使分布式系统的不同部分之间能够进行异步通信。它们有助于解耦系统中的组件,使整体架构更健壮、更具可扩展性。

消息队列的关键优势

  1. 异步处理:允许系统在不等待整个过程完成的情况下处理请求
  2. 解耦:通过消除直接的点对点连接来减少组件之间的依赖
  3. 峰值负载处理:通过缓冲请求来帮助管理突发的流量高峰
  4. 可靠性:即使服务暂时不可用,也能确保消息不会丢失

来源:docs/high-performance/message-queue/rocketmq-questions.md16-77

消息队列模型

队列模型 vs 主题模型

有两种主要的实现消息队列的模型

!消息队列模型

  1. 队列模型:传统的生产者将消息放入队列,消费者从队列中获取消息。每条消息只由一个消费者处理。

  2. 主题模型(发布/订阅):发布者将消息发送到主题,主题的所有订阅者都会收到每条消息的副本。

RocketMQ 实现的是一种基于主题的模型,但具有独特的特性,提供了更高的灵活性。

来源:docs/high-performance/message-queue/rocketmq-questions.md162-182

RocketMQ 架构

核心组件

RocketMQ 由四个主要组件组成

!RocketMQ 架构

  1. 生产者:向代理发送消息。

    • 可以将其分组为生产者组,用于类似的生产者
  2. 消费者:接收和处理消息。

    • 组织成消费者组
    • 支持推送和拉取消费模式
  3. 代理:存储和转发消息。

    • 可以部署为主从配置,以实现高可用性
    • 每个主题可以有多个队列,分布在不同的代理上
  4. NameServer:充当轻量级的服务发现和路由组件。

    • 维护主题和代理的路由信息
    • 代理向NameServer注册并发送心跳以提供元数据
    • 生产者和消费者查询NameServer以获取路由信息

消息流

  1. 代理向所有NameServer节点注册并发送心跳
  2. 生产者向NameServer查询主题路由信息
  3. 生产者根据路由信息向代理发送消息
  4. 消费者向NameServer查询主题路由信息
  5. 消费者从代理拉取消息或订阅推送通知

来源:docs/high-performance/message-queue/rocketmq-questions.md222-272

RocketMQ 消息结构和特性

消息类型

RocketMQ 支持多种消息类型以适应不同的用例

  1. 普通消息:适用于通用场景的标准消息

    • 生命周期包括初始化、待消费、消费、消费确认和删除
    • 用于服务解耦和事件驱动应用程序
  2. 延迟/定时消息:在未来的特定时间传递的消息

    • RocketMQ 5.x 提供精确的时间控制
    • 适用于延迟任务或超时处理
  3. 顺序消息:必须按照生产顺序消费的消息

    • 需要使用 FIFO 主题
    • 顺序保证在队列级别
  4. 事务消息:确保本地事务与消息发布的事务一致性

    • 遵循“半消息”机制进行事务检查

来源:docs/high-performance/message-queue/rocketmq-questions.md277-323

消费者类型

RocketMQ 提供多种消费模式以满足不同需求

  1. PushConsumer:

    • 高级抽象,消息似乎被推送到消费者
    • 通过监听器回调管理消费
    • 实际上实现为高效的长轮询
    • 易于使用,但对消费速率的控制较少
  2. SimpleConsumer:

    • 更原始的 API,提供对消息获取的直接控制
    • 提供显式确认
    • 允许自定义消费速率处理
    • 更适用于需要精确控制的场景
  3. 消费者组和消息分发:

    • 同一组内的消费者共同分担处理负载
    • 一个队列在同一时间只能由组内的一个消费者消费
    • 不同组对同一主题的消费进度可能不同

!RocketMQ 队列与消费者关系

来源:docs/high-performance/message-queue/rocketmq-questions.md347-411 docs/high-performance/message-queue/rocketmq-questions.md200-216

高级特性

顺序消息消费

在 RocketMQ 中,顺序可以保证在队列级别,但不能在整个主题级别保证。为确保相关消息的严格顺序

  1. 需要顺序的消息必须发送到同一个队列
  2. 这可以通过实现一个 MessageQueueSelector 来实现,该选择器根据消息内容确定队列
  3. 通常使用基于哈希的路由来确保相关消息(如对同一订单的操作)发送到同一个队列

来源:docs/high-performance/message-queue/rocketmq-questions.md436-484

分布式事务

RocketMQ 使用“半消息”机制实现分布式事务

!分布式事务流程

  1. 生产者将“半消息”发送到代理(消费者不可见)
  2. 生产者执行本地事务
  3. 生产者根据事务结果提交或回滚消息
  4. 如果代理未收到提交/回滚,它将执行事务状态检查

此机制可确保仅在本地事务成功时才将消息对消费者可见,从而保持分布式组件之间的一致性。

来源:docs/high-performance/message-queue/rocketmq-questions.md524-587

重复消息处理

消息队列通常保证“至少一次”传递,这意味着可能会出现重复消息。为处理此问题,消费者必须实现幂等处理

  1. 跟踪已处理的消息:使用数据库唯一键或 Redis 记录已处理的消息 ID
  2. 自然幂等性:某些操作是天然幂等的(例如设置一个值)
  3. 业务特定检查:处理前,检查操作是否已执行

来源:docs/high-performance/message-queue/rocketmq-questions.md509-522

RocketMQ 性能考量

高性能读写

RocketMQ 通过多种优化实现了高性能

  1. 顺序磁盘写入:使用顺序文件访问来最大化 I/O 性能
  2. 零拷贝传输:使用 mmapsendfile 来减少内核和用户空间之间的数据复制
  3. 页缓存优化:利用操作系统页缓存以加快访问速度
  4. 批量处理:批量处理消息以提高网络和磁盘操作效率
  5. 读写分离线程:防止操作之间发生阻塞

来源:docs/high-performance/message-queue/rocketmq-questions.md773-777

消息积压处理

当消息积压速度超过消费者处理速度时,RocketMQ 提供了一些策略

  1. 消费者扩容:添加更多消费者实例(以及对应的队列)以增加处理能力
  2. 性能调优:优化消费者处理逻辑以更高效地处理消息
  3. 流量控制:当积压增加时,实施反压机制以减缓生产者
  4. 主题分区:将负载分布到更多队列以实现更并行化的处理

来源:docs/high-performance/message-queue/rocketmq-questions.md753-767

最佳实践

生产者最佳实践

  1. 复用生产者实例,而不是为每条消息创建新实例
  2. 避免在单个进程中创建过多的生产者实例
  3. 设置适当的超时值以防止资源耗尽
  4. 实施适当的错误处理和重试机制

消费者最佳实践

  1. 保持消费者与队列的平衡比例(最好是 1:1)
  2. 实现幂等消息处理
  3. 根据需求选择合适的消费者类型
    • PushConsumer 用于具有可预测处理时间的简单场景
    • SimpleConsumer 用于对消费速率和时序进行更精细控制

来源:docs/high-performance/message-queue/rocketmq-questions.md326-346

常见问题和故障排除

问题可能原因解决方案
消息丢失网络问题,配置不当启用持久化存储,使用事务消息,实现适当的 ACK 机制
重复消息ACK 失败,消费者重启实现幂等处理,跟踪已处理的消息 ID
消费缓慢资源不足,处理效率低下扩容消费者,优化处理逻辑,调整批量大小
顺序问题队列选择不当使用 MessageQueueSelector 确保相关消息发送到同一个队列
系统崩溃资源耗尽,内存泄漏监控系统资源,实现熔断器,设置适当的超时值

来源:docs/high-performance/message-queue/rocketmq-questions.md509-522 docs/high-performance/message-queue/rocketmq-questions.md436-484 docs/high-performance/message-queue/rocketmq-questions.md753-767