2023-06-11
内卷九股文
0

目录

RocketMQ
使用场景
概念模型
为什么Topic下面还要定义 Message Queue呢?
生产者
消息发送方式
同步发送
异步发送
单向发送
消息定义
普通消息
顺序消息
延迟消息
批量消息
事务消息
消费者
消费模式
推Push
拉Pull
消息模式
集群消费模式
负载均衡策略
广播消费模式
消费点位
消费方式
并发消费
顺序消费
消息过滤
Tag过滤
SQL92过滤
消费重试
重试队列 - RETRY
死信队列 - DLQ
生产必备
Topic 的自动创建
顺序消息的一致性

RocketMQ 是一款我们国人再熟悉不过的消息中间件,由阿里开源(官方文档),本身也是java实现,在中间件中,对于源码原理的学习是不二之选。 本文梳理MQ的功能特性,作为入门学源码的基础。

RocketMQ

使用场景

消息队列在实际应用中常用的使用场景,包含 「应用解耦、异步处理、流量削锋、消息通讯、日志处理」 等。

概念模型

对于基本概念,可以到官网去了解 领域模型

为什么Topic下面还要定义 Message Queue呢?

最核心的点,是因为考虑到并发度,多个 Message Queue 绑定多个消费者,并发度能达到10w级别

生产者

消息发送方式

同步发送

同步发送会阻塞在当前代码上,直到服务端处理结束返回响应结果。

java
// 同步发送消息 Message message = new Message("sync-message-topic", "tag-a", ("Hello RocketMQ").getBytes()); producer.send(message);

异步发送

异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是send方法需要传入 SendCallback 的实现,SendCallback 接口主要有 onSuccess 和 onException 两个方法,表示消息发送成功和消息发送失败。

java
// 异步发送消息, 发送结果通过SendCallback返回给客户端 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } });

单向发送

由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。oneway 一般使用在日志记录的场景。

java
// 单向发送 producer.sendOneway(msg);

消息定义

普通消息

普通消息的发送,即为消息发送方式定义的三种发送方式:同步发送、异步发送、单向发送

顺序消息

顺序消息是一种对消息发送和消费顺序有严格要求的消息。需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。

对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

image.png

实现顺序发送最核心的是调用了 SendResult send(Message msg, MessageQueueSelector selector, Object arg) 方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准(通过 arg 作为选择 Message Queue 的区分标准)。

java
// 局部顺序消息发送 - 本质就是根据参数选择发送队列 for (int i = 0; i < 100; i++) { // 发送消息 message = new Message("local-order-topic", "tag", ("Hello RocketMQ " + i).getBytes()); mqTemplate.getProducer().send(message, new MessageQueueSelector() { /** * 选择发送队列 * @param mqs 队列集合 * @param msg 消息 * @param arg 参数 - producer.send(message, new MessageQueueSelector() { ... }, i); * @return 队列 */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 根据 i 与队列数取模,选择发送队列 int queueIndex = (Integer) arg % mqs.size(); MessageQueue messageQueue = mqs.get(queueIndex); System.out.println("参数 "+ arg + " 使用第 " + queueIndex + " 个队列,队列名称为:" + messageQueue.getQueueId() + ""); return messageQueue; } }, i); }

本质上需要顺序消费的消息,都往一个 Message Queue 中发送。

延迟消息

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

延迟消息在4.0版本,通过18个等级在控制延迟时间

image.png

java
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // 设置等级2,此消息将在 10 秒后消费。 message.setDelayTimeLevel(2); // Send the message producer.send(message);

在5.0版本开始,就不在通过固定等级的延迟时间做处理,可以自定义的延迟时间,以下是延迟5秒消费的3种方式。

java
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // 延迟5秒发送 message.setDelayTimeMs(5 * 1000); // 毫秒 message.setDelayTimeSec(5); // 秒 message.setDeliverTimeMs(System.currentTimeMillis() + 5 * 1000); // 时间戳 // Send the message producer.send(message);

批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

java
public class SimpleBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //If you just send messages of no more than 1MiB at a time, it is easy to use batch //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support String topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes())); producer.send(messages); } }

批量消息发送有3个条件:

  1. 批量的 Topic 必须相同
  2. list大小在4M以内,否则会报异常
  3. list数据只会进入Topic一个 Message Queue中

事务消息

在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

image.png

使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

image.png

事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

整个事务消息的详细交互流程如下图所示:

image.png

事务消息发送步骤如下:

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
  • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
  • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  1. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  2. :::note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置 :::

事务消息回查步骤如下:

  1. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  2. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

消费者

消费模式

推Push

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

拉Pull

Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

消息模式

集群消费模式

当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。

集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力 image.png

RocketMQ Push Consumer默认为集群模式,同一个消费组内的消费者分担消费。

java
consumer.setMessageModel(MessageModel.CLUSTERING);

负载均衡策略

集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?

Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等,可以通过如下代码进行设置相应负载均衡策略

java
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());

默认的分配策略是平均分配,这也是最常见的策略。

  • AllocateMachineRoomNearby 机房优先分配策略。
  • AllocateMessageQueueAveragely 平均分配策略
  • AllocateMessageQueueAveragelyByCircle 周期平均哈希队列算法
  • AllocateMessageQueueByConfig 按配置分配消息队列
  • AllocateMessageQueueByMachineRoom 机房哈希队列算法
  • AllocateMessageQueueConsistentHash 一致性hash分配策略

平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力。

image.png

image.png

但也不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力。

image.png

广播消费模式

当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力 image.png

通过以下代码来设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。

java
consumer.setMessageModel(MessageModel.BROADCASTING);

消费点位

在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复。

image.png

消费方式

并发消费

并发消费是通过在注册消费回调接口时传入 MessageListenerConcurrently 接口的实现来完成。 通过 import MessageListenerConcurrently #consumeMessage 实现消息消费状态的处理

  • return ConsumeConcurrentlyStatus.CONSUME_SUCCESS 成功
  • return ConsumeConcurrentlyStatus.RECONSUME_LATER 重试
  • return null 重试
  • throw Exception 重试
java
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });

在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。

顺序消费

想要实现队列中的消息按照FIFO的顺序消费,必须注册 MessageListenerOrderly 接口,调用 #consumeMessage 方法,通过返回的状态来实现消息消费是否成功。

  • return ConsumeOrderlyStatus.SUCCESS; 成功
  • return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; 挂起队列一会

从返回状态中可以看到,顺序消费没有重试的概念,而是挂起当前队列,让队列等一会在投递,可以通过设置挂起等待时间,时间到了以后会再次投递(此配置只在顺序消费中起作用

java
consumer.setSuspendCurrentQueueTimeMillis(5*1000)

如何确保顺序消费

实现队列 FIFO 顺序消费,要保证以下2点:

首先保证生产者的发送消息的顺序:同一个 Topic + 通过 MessageQueueSelector 选择同一个 Message Queue 其次保证消费者的消费消息的顺序:使用 MessageListenerOrderly 监听器监听消息,失败会挂起当前队列

消息过滤

Tag过滤

通过 Tag 对 Topic 进行分类,例如:交易 Trade_Topic,可以区分订单Tag、支付Tag、物流Tag;

代码实现:

java
// 绑定订单消息 consumer.subscribe("TagFilterTest", "订单Tag"); // 绑定全部消息 consumer.subscribe("TagFilterTest", "*"); // 绑定订单消息与支付消息 consumer.subscribe("TagFilterTest", "订单Tag || 支付Tag"); // 如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。 consumer.subscribe("TagFilterTest", "TagA"); consumer.subscribe("TagFilterTest", "TagB");

SQL92过滤

SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。

示例: 消息发送端: 设置消息的自定义属性。

java
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes()); // 设置自定义属性A,属性值为1。 msg.putUserProperties("a", "1");

消息消费端: 使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。

java
consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)"));

消费重试

重试队列 - RETRY

当消息消费失败时,会进入到以Topic名称为后缀的重试队列 %RETRY% TopicName 中,通过定义的消息重试的时间间隔,对消息重新投递,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

控制台: image.png

默认的重试间隔: image.png

可自定义设置最大投递次数:

java
// 设置最大消费次数,在并发模式下,-1 表示 16; 在有序模式下,-1 表示 Integer.MAX 值。 // 达到最大重试次数,会进入到 DLQ 死信队列。 consumer.setMaxReconsumeTimes(1);

需要注意:

并发消费与顺序消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

image.png

死信队列 - DLQ

当消息一直失败,超过定义的重试次数以后,该消息不会立刻被丢弃,而是放到了死信队列(Dead-Letter Queue)中,这类消息被称为死信消息(Dead-Letter Message)。

如果当某个 TopicNmae 产生对应的死信消息时,会添加到对应 %RDLQ% TopicName 为名称的死信队列中,死信队列的消息不会被消费,可以等人工处理

控制台:

image.png

生产必备

Topic 的自动创建

发送消息前,需要确保目标主题已经被创建和初始化。可以利用 RocketMQ Admin 工具创建目标 Topic 。

RocketMQ 默认开启了 autoCreateTopicEnable 配置,会自动为发送的消息创建 Topic,但该特性仅推荐在初期测试时使用。

生产环境强烈建议管理所有主题的生命周期,关闭自动创建参数,以避免生产集群出现大量无效主题,无法管理和回收,造成集群注册压力增大,影响生产集群的稳定性。

解决方案:在启动 Broker 时,将 autoCreateTopicEnable 属性设置为 false

源码:

image.png

顺序消息的一致性

如果一个Broker掉线,那么此时队列总数是否会发化?

如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(--order)为true,表示顺序消息:

shell
$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876 create topic to 127.0.0.1:10911 success. TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]

源码: image.png

其次要保证NameServer中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!