Java面试题024:一文深入了解微服务消息队列RocketMQ
RocketMQ是阿里开源的一款高性能、高吞吐量的消息中间件。支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)支持 18 个级别的延迟消息(Kafka 不支持)支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)支持 Con
欢迎大家关注我的JAVA面试题专栏,该专栏会持续更新,从原理角度覆盖Java知识体系的方方面面。
1、RocketMQ简介
RocketMQ是阿里开源的一款高性能、高吞吐量的消息中间件。
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:
- 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
- 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持 18 个级别的延迟消息(Kafka 不支持)
- 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
- 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(即过滤由MQ完成,而不是由消费者完成。RabbitMQ 和 Kafka 不支持)
- 支持重复消费(RabbitMQ 不支持,Kafka 支持)
2、基本概念
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
(1)NameServer
NameServer 是整个 RocketMQ 的“大脑” ,它是 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Broker。
名称服务器(NameServer)用来保存 Broker 相关元信息并给 Producer 和 Consumer 查找Broker 信息。NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。
Name Server和ZooKeeper的作用大致是相同的,Name Server做的东西很少,就是保存一些运行数据,Name Server之间不互连(RocketMQ的NameServer集群间互不通信)。Name Server很轻量级,broker端要做更多的东西。而使用ZooKeeper时,broker只需要连接其中的一台机器,运行数据分发、一致性都交给了ZooKeeper来完成。
(2)Broker
消息服务器(Broker)是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。
每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到NameServer 获取到 Broker 的路由信息,进而和Broker取得连接。
NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到Broker 宕机,则从路由注册表中将其移除,这样就可以实现 RocketMQ 的高可用。
Broker分为Master与Slave,BrokerId为0表Master,非0表示Slave,Master 既可以写又可以读,Slave不可以写只可以读。
Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多Slave(同步刷盘)、多 Master多 Slave(异步刷盘)。
- 单 Master
一旦 Broker 重启或宕机会导致整个服务不可用,风险较大,不建议线上环境使用。
- 多 Master
所有消息服务器都是 Master ,没有 Slave 。这种方式优点是配置简单,单个 Master 宕机或重启维护对应用无影响。缺点是单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受影响。
- 多 Master 多 Slave(异步复制)
每个 Master 配置一个 Slave,所以有多对 Master-Slave,消息采用异步复制方式,主备之间有毫秒级消息延迟。优点是消息丢失的非常少,且消息实时性不会受影响,Master 宕机后消费者可以继续从 Slave 消费,中间的过程对用户应用程序透明,不需要人工干预,性能同多 Master 方式几乎一样。
- 多 Master 多 Slave(同步双写)
每个 Master 配置一个 Slave,所以有多对 Master-Slave ,消息采用同步双写方式,主备都写成功才返回成功。优点是数据与服务都没有单点问题,Master 宕机时消息无延迟,服务与数据的可用性非常高。缺点是性能相对异步复制方式略低,发送消息的延迟会略高。
(3)Producer
生产者向brokers发送由业务应用程序系统生成的消息。
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,每隔30s从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知。
Producer每隔30s向所有关联的broker发送心跳,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。
(4)Consumer
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,每隔30s从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且每隔30s向Master、Slave发送心跳。
3、注册中心
RocketMQ不使用Zookeeper作为注册中⼼,主要从这几方面来考虑:
(1)可用性。Zookeeper并不能保证服务的可用性,Zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态。对于—个注册中⼼来说肯定是不能接受的,作为服务发现来说就应该是为可⽤性⽽设计。
(2)性能。ZooKeeper的写入性能瓶颈明显,它属于强一致性设计,每次写入需要多数节点确认。而NameServer采用"心跳上报+定时清理"机制实现最终一致性,NameServer本身的实现⾮常轻量,单节点即可处理所有请求,⽽且可以通过增加机器的⽅式⽔平扩展。
(3)消息发送应该弱依赖注册中⼼,⽽RocketMQ的设计理念也正是基于此,⽣产者在第—次发送消息的时候从 NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可⽤,短时间内对于⽣产者和消 费者并不会产⽣太⼤影响。
4、消息模型
消息队列有两种模型:队列模型和发布/订阅模型
队列模型:最初的—种消息队列模型,对应消息队列“发-存-收”的模型。⽣产者往某个队列⾥⾯发送消息,—个队列可以存储多个⽣产者的消息,—个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被—个消费者消费。
发布 - 订阅模型:消息发送⽅称为发布者,消息接收⽅称为订阅者,服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。
唯—的不同点在于:—份消息数据是否可以被多次消费。
RocketMQ使⽤的消息模型是标准的发布-订阅模型。
Topic(主题)可以看做消息的归类,它是消息的第—级类型。⽐如—个电商系统可以分为:交易消息、物流消息等,—条消息必须有—个 Topic 。Topic 与⽣产者和消费者的关系⾮常松散,—个 Topic 可以有0个、1个、多个⽣产者向其发送消息,—个⽣产者也可以同时向不同的 Topic 发送消息。 —个 Topic 也可以被 0个、1个、多个消费者订阅。
Tag(标签)可以看作⼦主题,它是消息的第⼆级类型,⽤于为⽤户提供额外的灵活性。使⽤标签,同—业务模块不同⽬的的消息就可以⽤相同 Topic ⽽不同的 Tag 来标识。⽐如交易消息⼜可以分为:交易创建消息、交易完成消息等,—条消息可以没有 Tag 。
RocketMQ中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中—份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,—条消息被Consumer Group1消费过,也会再给Consumer Group2消费。消费组中包含多个消费者,同—个组内的消费者是竞争消费的关系,每个消费者负责消费组内的—部分消息。默认情况,如果—条消息被消费者Consumer1消费了,那同组的其他消费者就不会再收到这条消息。
Offset:在Topic的消费过程中,消息需要被不同组进⾏多次消费,消费完的消息并不会⽴即被删除,这就需要RocketMQ为每个消费组在每个队列上维护—个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费—条消息,消费位置就加—。也就是说, Queue 是—个⻓度⽆限的数组,Offset 就是下标。
5、消息消费模式
有两种:集群消费(Clustering)和广播消费(Broadcasting)
(1)集群消费
定义:同一消费者组内的多个消费者共同消费队列消息,每条消息仅被组内的一个消费者处理。
特点:
- 负载均衡:队列在消费者间均匀分配,提高吞吐。
- 消息独占:队列级全局有序(若顺序消费),单条消息仅消费一次。
- 适用场景:适用于每条消息只需要被处理一次的场景。
(2)广播消费
定义:同一消费者组内的每个消费者都会消费所有消息。
特点:
- 全量投递:每条消息被组内所有消费者处理。
- 无进度同步:消费进度由各消费者本地维护。
- 适用场景:适用于每条消息需要被集群下每一个消费者处理的场景。
RocketMQ默认为集群消费模式。5.0 SDK 已经不支持广播消费,如果需要使用,可以给每个消费者创建一个单独的订阅组实现类似的功能。
6、消息类型
消息类型分为4种:普通消息、定时与延迟消息、顺序消息和事务消息
(1)普通消息
普通消息是一种基础的消息类型,由生产投递到指定 Topic 后,被订阅了该 Topic 的消费者所消费。普通消息的 Topic 中无顺序的概念,可以使用多个分区数来提升消息的生产和消费效率。
普通消息的 Topic 只能用于收发普通消息,不能用于收发延迟消息、顺序消息和事务消息。
(2)定时与延迟消息
定时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟到某个时间点被消费,这类消息统称为定时消息。
延时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟一段时间后再被消费,这类消息统称为延时消息。
实际上,延时消息可以看成是定时消息的一种特殊用法。
定时消息只要在需要定时发送消息的 property
属性中增加 __STARTDELIVERTIME
属性值,就能在一定范围内(40天)实现该消息在任意时间的定时发送。
Message msg = new Message("test-topic", ("message content").getBytes(StandardCharsets.UTF_8));
// 设定消息在10秒之后被发送
long delayTime = System.currentTimeMillis() + 10000;
// 将 __STARTDELIVERTIME 设定到 msg 的属性中
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
SendResult result = producer.send(msg);
在4.x的版本中,RocketMQ
不支持任意时间精度的延迟,而是预设了18个延迟等级。使用使用ConcurrentSkipListMap
存储延迟级别与时间的映射关系。
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */>delayLevelTable = new ConcurrentSkipListMap<>();
public void setDelayTimeLevel(int level) {
this.putProperty("DELAY", String.valueOf(level));
}
在5.x的版本中支持任意时间延迟,延时消息先通过 System.currentTimeMillis() + delayTime
计算得到定时发送的时间点,再以定时消息的方式发送。
Duration messageDelayTime = Duration.ofSeconds(10);
final Message message = provider.newMessageBuilder()
.setTopic(topic).setTag(tag).setKeys("yourMessageKey-3ee439f945d7")
//设置延迟消息的时间
.setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis())
.setBody(body).build();
电商的订单超时⾃动取消,就是一个典型的利⽤延时消息的例⼦,⽤户提交了一个订单,就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ 延时消息的实现机理:将延时消息发送到延时队列。内部通过启动线程池来定时扫描延时队列里的消息,到了设定时间,就把消息发送原来队列,然后消费者就可以消费了。其实 Broker 内部每 100 ms 扫描一次,所以严格来说,通过 RocketMQ 发送延时消息,精度不会太高。(官网说定时和延时消息在精度上会有1秒左右的偏差。)
broker将消息追加到commit log前会判断消息是否为延时消息,如果是会将原始topic、队列ID等数据存储在properties,并将消息topic、队列ID改为延时队列相关属性,最终消息会被持久化到延时队列。每个延时队列都有定时任务隔100ms进行检测,如果消息超时则通过consumer queue记录找到commitlog中的消息,并将其原始topic、队列ID等信息恢复,再调用持久化消息的API,相当于将消息重投到最开始设置的队列中。由于每个延时队列延时的时间相同,消息入队后,检测超时可以顺序检测,离超时时间越近的消息越前,如果有大量消息同时定时相同时间,处理流程可能会导致堆积从而影响定时精度。
(3)顺序消息
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。
顺序消息可以被用在以下场景中:
-
订单创建场景:电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。
-
日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 mysql 的 binlog 日志时,需要保证数据库的操作是有顺序的。
-
金融场景:在一些撮合交易的场景下,例如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。
RocketMQ是如何保证消息消费的顺序性?
需要从 Product 发送消息顺序,broker 中的 Queue 的接收顺序,Consumer 消费的顺序三个方面来保证。
1)生产者
需要保证单个生产者串行(同步)来发送顺序消息。
2)broker消息存储
生产者发送的消息是按照时间顺序追加到commitlog中的,然后会被定时分发到cosumerQueue中。同一个消费组内,一个consumerQueue只会被一个消费者消费,而且还是按照顺序来消费的,所以我们只需要让相关的顺序消息发送到同一个consumerQueue中即可。
SendResult sendResult=producer.send(message, new MessageQueueSelector() {
/**
* @param mqs 该Topic下所有可选的messageQueue
* @param message 待发送的消息
* @param arg 发送消息时传递的参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
//订单号
Integer id= (Integer) arg;
//根据参数计算出一个要接收消息的MessageQueue的下标
int index=id %mqs.size();
return mqs.get(index);
}
},orderId);
生产者发送消息时,需要在send方法中传入一个MessageQueueSelector,其中需要重写一个select方法,这个方法就是用来定义要把消息发送到哪个MessageQueue的。这样就实现了分区顺序消息,仅保证同一笔订单的顺序性,如果我们需要保证所有订单的顺序性,那么就直接写死一个队列,让所有的消息都发往这一个队列中即可,这就是全局顺序消息。
3)消费队列分布式锁
同一订单按照先后顺序放到同一Queue,那么取消息的时候就可以保证先进先取出。
但是在一个消费者集群的情况下,消费者1先去Queue拿消息,拿到了北京订单1,它拿完后,消费者2去queue拿到北京订单2。拿的顺序是没问题,但关键是先拿到不代表先消费完它。会存在虽然你消费者1先拿到北京订单1,但由于网络等原因,消费者2比你真正的先消费消息。
RocketMQ采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。
RocketMQ用ConsumMessageOrderlyService来保证顺序消费,该类在初始化的时候会启动一个定时任务:
该定时任务会向Broker申请当前消费者负责的队列锁,将自己的消费组、客户端ID和负责的队列发往Broker,Broker就会把对应队列与这个消费者绑定,并把这个关系存储在本地,别的消费者如果想消费这个队列也得来申请加分布式锁,保证了同一个消费组内,一个队列只会被分配给一个消费者。
(4)事务消息
事务消息是 RocketMQ 提供的一种高级特性消息,通过将二阶段提交的动作和本地事务绑定,来保障分布式场景下消息生产和本地事务的最终一致性
相关概念:
- 半消息 half message
生产者发送事务消息到 RocketMQ 服务端后,消息会被持久化并标记为“暂不可投递”的状态,直到本地事务执行完成并确认后,消息才会决定是否对消费者可见,此状态下的消息,称为半消息(half message)。
- 二阶段提交
实现事务最终一致性的关键机制,一阶段为发送 half message,二阶段为生产者执行本地事务,并根据执行结果向 RocketMQ 服务端提交 commit(允许投递)或 rollback(丢弃消息)的确认结果,以此来决定 half message 的去留。
- OP 消息
用于给消息状态打标记,没有对应 OP 消息的 half message,就说明二阶段确认状态未知,需要 RocketMQ 服务端进行本地事务状态主动回查,OP 消息的内容为对应的 half message 的存储的Offset。
典型的分布式场景:跨行转账
必须严格保证转出方账户扣款与转入方账户入账的最终一致性。
阶段一:发送事务消息(准备转账)
用户 1 发起转账后,其所在的银行系统 A,向 RocketMQ 服务端对应的业务 topic 发送一条事务消息,内容包含 “用户 1 转出 1000 元至用户 2 账户”。此时,该消息对转入方的系统 B 不可见,避免在转出方本地事务确认前,转入方提前执行入账操作,确保资金流转安全性。
阶段二:执行本地事务(转出账户扣减)
事务消息发送成功后,系统 A 继续执行本地事务,扣减用户1 的账户余额,若扣减成功,则提交二次确认 commit 到 RocketMQ 服务端,消息被继续投递到下游,反之,提交 rollback,事务结束,双方账户余额保持不变。
阶段三:下游服务消费(转入银行余额增加)
转入方银行系统 B 预先订阅转账 topic,接收消息后执行用户 2 账户余额增加操作。若消费过程中因网络异常、账户状态问题导致失败,RocketMQ 将自动触发重试机制,若多次重试仍未成功,消息将转入死信队列,后续由人工介入核对,通过补偿流程保障资金最终准确入账。
实现流程
1.生产者发送事务消息到 RocketMQ 服务端。
2. 服务端存储这条消息后返回发送成功的响应,此时消息对下游消费者不可见,处于 half message 状态。
3. 生产者收到半消息成功的响应后,继续往下执行本地事务(如更新业务数据库)。
4. 根据本地事务的执行结果,生产者会向 RocketMQ 服务端提交最终状态,也就是二次确认。
5. 确认结果为 commit 时,服务端会将事务消息继续向下投递给消费者,确认结果为 rollback 时,服务端将会丢弃该消息,不再向下投递。
6. 确认结果是 unknown 或一直没有收到确认结果时,一定时间后,将会触发事务状态主动回查。
7. 当生产者未提交最终状态或者二次确认的结果为 unknown 时,RocketMQ 服务端将会主动发起事务结果查询请求到生产者服务。
8. 生产者收到请求后提交二次确认结果,逻辑再次回到第5步,此时如果生产者服务暂时不可用,则 RocketMQ 服务端会在指定时间间隔后,继续主动发起回查请求,直到超过最大回查次数后,回滚消息。
7、消息过滤
消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。
消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。
应用场景:交易流水 Topic 包含了下单流水、支付流水、发货流水等,业务若只想消费者其中一种类别的消息,可在客户端进行过滤。账单系统:只需订阅支付消息。物流系统:只需订阅物流消息。库存系统:只需订阅下单消息。
消息过滤主要支持两种过滤方式,分别是 SQL 过滤和 TAG 过滤。其核心逻辑都是在发送消息的时候,设置一些自定义字段,然后通过消费组订阅的时候指定对应的过滤表达式,消息在服务端进行过滤后,才被消费组消费。
(1)TAG 过滤
- 发送消息
发送消息时,每条消息必须指明 Tag。发送消息只能设置一个 TAG。
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("yourMessageKey-1c151062f96e")
.setBody(body)
.build();
- 订阅消息
订阅单个 Tag的情况:
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
String tag = "TAGA";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
}).build();
订阅多个 Tag:如需订阅某 Topic 下多种类型的消息,在多个 Tag 之间用两个竖线||
分隔
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
String tag = "TAGA || TAGB";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
}).build();
订阅所有 Tag:如需订阅某 Topic 下所有类型的消息,Tag 用星号(*)表示。
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
8、消息重试
当消息第一次被消费者消费后,没有得到正常的回应,或者用户主动要求服务端重投,RocketMQ 会通过消费重试机制自动重新投递该消息,直到该消息被成功消费,当重试达到一定次数后,消息仍未被成功消费,则会停止重试,将消息投递到死信队列中。
出现以下三种情况会按照消费失败处理并会发起重试:
- 消费者返回 ConsumeResult.FAILURE。
- 消费者返回 null。
- 消费者主动/被动抛出异常。
当消息需要重试时,RocketMQ 中配置了如下的 messageDelayLevel 参数来设置重试次数与时间间隔。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SDK 中第一次 delayLevel 为3(10s)。
pushConsumer.setMaxReconsumeTimes(3);
9、死信队列
信队列用于处理无法被正常消费的消息,即死信消息。
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数16次(默认次数)后,若消费依然失败, 则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列。
-
不会再被消费者正常消费。
-
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。
-
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
-
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
-
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。
10、RocketMQ的高可⽤
NameServer是无状态且相互不通信的,所以只要集群部署就可以保证高可用。
消息消费高可用
RocketMQ的高可用主要是体现在Broker的读和写的高可用,Broker的高可用是通过集群和主从实现的。
Broker可以配置两种角色:Master和Slave,Master角色的Broker支持读和写,Slave角色的Broker只支持读, Master会向Slave同步消息。 也就是说Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。
消息发送高可用
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。
11、 刷盘机制
RocketMQ 的消息是存储在磁盘上的,这样做有两个优点:
- 保证断点后恢复
- 让存储的消息量超出内存的限制
RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合实现的,CommitLog 负责将消息存储在真正的物理存储文件,而 ConsumeQueue 则是消息的逻辑队列,存储对应消息指向的物理存储的地址。
Producer 写入 RocketMQ 的时候,支持两种写磁盘方式:同步刷盘和异步刷盘
12、消息主从复制
一个 Broker 组有一个 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步复制和异步复制两种方式。
在实际应用中,由于同步刷盘方式会频繁触发磁盘写操作,明显降低性能,故通常配置为:
- 刷盘方式:ASYNC_FLUSH(异步刷盘)
- 主从复制:SYNC_MASTER(同步复制)
异步刷盘能够避免频繁触发磁盘写操作,除非服务器宕机,否则不会造成消息丢失。
主从同步复制能够保证消息不丢失,即使 Master 节点异常,也能保证 Slave 节点存储所有消息并被正常消费掉。
更多推荐
所有评论(0)