RocketMQ 是一款我们国人再熟悉不过的消息中间件,由阿里开源(官方文档),本身也是java实现,在中间件中,对于源码原理的学习是不二之选。 本文梳理MQ的功能特性,作为入门学源码的基础。
消息队列在实际应用中常用的使用场景,包含 「应用解耦、异步处理、流量削锋、消息通讯、日志处理」 等。
对于基本概念,可以到官网去了解 领域模型
最核心的点,是因为考虑到并发度,多个 Message Queue 绑定多个消费者,并发度能达到10w级别
同步发送会阻塞在当前代码上,直到服务端处理结束返回响应结果。
java// 同步发送消息
Message message = new Message("sync-message-topic", "tag-a", ("Hello RocketMQ").getBytes());
producer.send(message);
异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是send方法需要传入 SendCallback 的实现,SendCallback 接口主要有 onSuccess 和 onException 两个方法,表示消息发送成功和消息发送失败。
java// 异步发送消息, 发送结果通过SendCallback返回给客户端
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。oneway 一般使用在日志记录的场景。
java// 单向发送
producer.sendOneway(msg);
普通消息的发送,即为消息发送方式定义的三种发送方式:同步发送、异步发送、单向发送
顺序消息是一种对消息发送和消费顺序有严格要求的消息。需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。
实现顺序发送最核心的是调用了 SendResult send(Message msg, MessageQueueSelector selector, Object arg)
方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准(通过 arg 作为选择 Message Queue 的区分标准)。
java// 局部顺序消息发送 - 本质就是根据参数选择发送队列
for (int i = 0; i < 100; i++) {
// 发送消息
message = new Message("local-order-topic", "tag", ("Hello RocketMQ " + i).getBytes());
mqTemplate.getProducer().send(message, new MessageQueueSelector() {
/**
* 选择发送队列
* @param mqs 队列集合
* @param msg 消息
* @param arg 参数 - producer.send(message, new MessageQueueSelector() { ... }, i);
* @return 队列
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据 i 与队列数取模,选择发送队列
int queueIndex = (Integer) arg % mqs.size();
MessageQueue messageQueue = mqs.get(queueIndex);
System.out.println("参数 "+ arg + " 使用第 " + queueIndex + " 个队列,队列名称为:" + messageQueue.getQueueId() + "");
return messageQueue;
}
}, i);
}
本质上需要顺序消费的消息,都往一个 Message Queue 中发送。
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
延迟消息在4.0版本,通过18个等级在控制延迟时间
javaMessage message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置等级2,此消息将在 10 秒后消费。
message.setDelayTimeLevel(2);
// Send the message
producer.send(message);
在5.0版本开始,就不在通过固定等级的延迟时间做处理,可以自定义的延迟时间,以下是延迟5秒消费的3种方式。
javaMessage message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 延迟5秒发送
message.setDelayTimeMs(5 * 1000); // 毫秒
message.setDelayTimeSec(5); // 秒
message.setDeliverTimeMs(System.currentTimeMillis() + 5 * 1000); // 时间戳
// Send the message
producer.send(message);
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
javapublic class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
}
批量消息发送有3个条件:
在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:
使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。
整个事务消息的详细交互流程如下图所示:
事务消息发送步骤如下:
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
:::note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置 :::
事务消息回查步骤如下:
Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力
RocketMQ Push Consumer默认为集群模式,同一个消费组内的消费者分担消费。
javaconsumer.setMessageModel(MessageModel.CLUSTERING);
集群模式下,同一个消费组内的消费者会分担收到的全量消息,这里的分配策略是怎样的?如果扩容消费者是否一定能提升消费能力?
Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等,可以通过如下代码进行设置相应负载均衡策略
java consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
默认的分配策略是平均分配,这也是最常见的策略。
平均分配策略下消费组内的消费者会按照类似分页的策略均摊消费。在平均分配的算法下,可以通过增加消费者的数量来提高消费的并行度。比如下图中,通过增加消费者来提高消费能力。
但也不是一味地增加消费者就能提升消费能力的,比如下图中Topic的总队列数小于消费者的数量时,消费者将分配不到队列,即使消费者再多也无法提升消费能力。
当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。
广播消费模式适用于每条消息需要被消费组的每个消费者处理的场景,也就是说消费组内的每个消费者都会收到订阅Topic的全量消息,因此即使扩缩消费者数量也无法提升或降低消费能力
通过以下代码来设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。
javaconsumer.setMessageModel(MessageModel.BROADCASTING);
在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复。
并发消费是通过在注册消费回调接口时传入 MessageListenerConcurrently
接口的实现来完成。
通过 import MessageListenerConcurrently #consumeMessage
实现消息消费状态的处理
javaconsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。
想要实现队列中的消息按照FIFO的顺序消费,必须注册 MessageListenerOrderly
接口,调用 #consumeMessage 方法,通过返回的状态来实现消息消费是否成功。
从返回状态中可以看到,顺序消费没有重试的概念,而是挂起当前队列,让队列等一会在投递,可以通过设置挂起等待时间,时间到了以后会再次投递(此配置只在顺序消费中起作用)
javaconsumer.setSuspendCurrentQueueTimeMillis(5*1000)
如何确保顺序消费
实现队列 FIFO 顺序消费,要保证以下2点:
首先保证生产者的发送消息的顺序:同一个 Topic + 通过 MessageQueueSelector 选择同一个 Message Queue 其次保证消费者的消费消息的顺序:使用 MessageListenerOrderly 监听器监听消息,失败会挂起当前队列
通过 Tag 对 Topic 进行分类,例如:交易 Trade_Topic,可以区分订单Tag、支付Tag、物流Tag;
代码实现:
java// 绑定订单消息
consumer.subscribe("TagFilterTest", "订单Tag");
// 绑定全部消息
consumer.subscribe("TagFilterTest", "*");
// 绑定订单消息与支付消息
consumer.subscribe("TagFilterTest", "订单Tag || 支付Tag");
// 如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。
consumer.subscribe("TagFilterTest", "TagA");
consumer.subscribe("TagFilterTest", "TagB");
SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。
示例: 消息发送端: 设置消息的自定义属性。
javaMessage msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// 设置自定义属性A,属性值为1。
msg.putUserProperties("a", "1");
消息消费端: 使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。
javaconsumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)"));
当消息消费失败时,会进入到以Topic名称为后缀的重试队列 %RETRY% TopicName
中,通过定义的消息重试的时间间隔,对消息重新投递,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列
控制台:
默认的重试间隔:
可自定义设置最大投递次数:
java// 设置最大消费次数,在并发模式下,-1 表示 16; 在有序模式下,-1 表示 Integer.MAX 值。
// 达到最大重试次数,会进入到 DLQ 死信队列。
consumer.setMaxReconsumeTimes(1);
需要注意:
并发消费与顺序消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
当消息一直失败,超过定义的重试次数以后,该消息不会立刻被丢弃,而是放到了死信队列(Dead-Letter Queue)中,这类消息被称为死信消息(Dead-Letter Message)。
如果当某个 TopicNmae 产生对应的死信消息时,会添加到对应 %RDLQ% TopicName
为名称的死信队列中,死信队列的消息不会被消费,可以等人工处理
控制台:
发送消息前,需要确保目标主题已经被创建和初始化。可以利用 RocketMQ Admin 工具创建目标 Topic 。
RocketMQ 默认开启了 autoCreateTopicEnable 配置,会自动为发送的消息创建 Topic,但该特性仅推荐在初期测试时使用。
生产环境强烈建议管理所有主题的生命周期,关闭自动创建参数,以避免生产集群出现大量无效主题,无法管理和回收,造成集群注册压力增大,影响生产集群的稳定性。
解决方案:在启动 Broker 时,将 autoCreateTopicEnable 属性设置为 false
源码:
如果一个Broker掉线,那么此时队列总数是否会发化?
如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(--order)为true,表示顺序消息:
shell$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]
源码:
其次要保证NameServer中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!