RocketMQ基本介绍
大约 5 分钟
1. 核心组件
1.1 介绍
如上图所示,RocketMQ的核心组件包括:
- NameServer(注册中心):整个服务的注册中心,Broker在启动时向所有的NameServer注册,生产者和消费者在生产和消费之前也通过NameServer获取一个Broker的地址
- Broker(主机):服务的核心,用于暂存和传输消息
- Producer(生产者):生产消息
- Consumer(消费者):消费消息
1.2 NameServer
1. 路由维护和删除机制
- Broker在启动后向NameServer注册,此时NameServer将Broker的信息维护至
brokerLiveTable
中,主要记录了Broker的最后一次心跳时间lastUpdateTime
- Broker每30s向NameServer发送心跳,更新
brokerLiveTable
的lastUpdateTime
- NameServer每10秒扫描一次注册的Broker,如果发现有broker超过120s失联,则删除该broker相关的路由信息
- Producer和Consumer每隔30s从NameServer获取一次路由信息,也意味着如果Broker不可用需要30s才能被感知
2. 集群机制
- NameServer是无状态的节点,多节点间互不通信,所有数据均存在内存
- 当客户端连接NameServer后,会通过负载均衡算法随机连接一个Broker节点
- NameServer如果宕机,对于已经获取节点信息的客户端来说不会受到影响
1.3 Broker
- Broker启动后向所有的NameServer都注册信息
- Broker以group分隔,每个group存在一个master和多个slave
- 只有master可以写入,slave可以采取同步双写、异步复制这两种逻辑来实现同步数据
- 客户端默认从master消费,slave当作热备,但也可以配置从两者都消费
- Producer和Consumer每30s向Broker发送心跳,Broker每10s扫描一次,如果发现有超过120s没发送心跳的客户端就断开连接
2. 消息
2.1 消息模型
这里贴一个官网的消息模型图片,RocketMQ的消息模型包含以下几部分:
- Topic:用于一级分类
- Tag:用于Topic下的二级分类
- Group:生产者组或消费者组,同一组下有多个节点,可以订阅多个Topic
- MessageQueue:消息队列,一个Topic下有多个Queue。在MessageQueue中每个存储单元是定长的,对应的索引为64位的offset,理论上是无限长的
- Offset:由于一个消息可能会被多个消费组消费,所以需要Offset维护各个消费组的消费位点
2.3 消息发送和接收
1. 消息类型:
- 普通消息
- 延时消息:消费者延时响应的消息。生产者发送的延时消息存在一个临时topic中,Broker中的delay server对临时topic定时扫描,时间到了再发送到指定的topic。RocketMQ从1s-2h分为18个延时等级
- 顺序消息
- 全局有序:生产者以单线程(保证顺序)发送消息到Topic的同一MessageQueue中(使用相同的hashKey),消费者也以单线程进行消费(否则消费速率可能不同,导致消息无序)
- 分区有序:需要Producer端自己实现MessageQueue的选取策略(例如多个订单的消费顺序)
- 事务消息:先发送half消息再执行本地事务,Broker根据执行情况判断是投递(执行成功)还是删除消息(执行失败)
- 批量消息:大批量消息同时发送(无法延时)
2. 消息发送方式:
生产者创建指定Topic、Tag、Content和Callback等信息发送消息,分为同步、异步和单向三种:
- 同步发送
send
:等待broker响应结果,稳定性高但会阻塞线程 - 异步发送
syncSend
:用回调函数来判断发送的响应情况 - 单向发送
sendOneWay
:只发送,不采用回调等方式判断是否发送成功
此外,为了保证消息的幂等性,防止因为网络问题导致Broker发送的消息没有被确认,进而重复消费,可以设置额外的MessageKey在业务中进行二次判断
3. 消息接收方式:
消费者订阅Topic和Tag,并对消息进行消费,分为pull和push两种:
- 拉取式消费pull:Consumer拉取,系统消耗低,实时性差
- 推送式消费push:Broker推送,系统消耗高,实时性强
4. 消息消费模式: Consumer对消息进行消费,并进行自动或手动的ACK确认,根据消费的模式可以分为两种:
- 集群消费:同group只会有一个consumer消费一次
- 广播消费:同group每个consumer都会消费一次
2.4 消息的删除
Broker的消息删除机制有三种:
- 手动删除
- 空间删除:设置一定内存的上限规则,达到90%后则拒绝接收Producer的消息
- 定时删除:默认72小时
3. 用途
- 异构解耦:将同步的业务分割成异步的业务,如订单支付环节中"扣减库存->订单生成->发送短信->增加积分"的环节,发送短信和增加积分完全可以异步进行
- 削峰填谷:因为RocketMQ的性能远大于数据库等中间件,当大流量进入的时候可以先将请求推入MQ,然后按照限流的策略慢慢的执行
- 数据分发:比如将mysql的数据同步到es、redis等,可以用顺序发送来保证一致性