RocketMQ 四、rocketMQ应用笔记
第4章 rocketMQ应用
一、普通消息
1.消息发送分类
同步发送消息
Producer在发出一条消息后,在收到MQ返回的ACK后才会发送下一条消息
该方法可靠性最高,但是效率低
异步发送消息
Producer在发出一条消息后,不需要收到MQ返回的ACK后,直接发送下一条消息
该方法可靠性上课,效率也不错
单向发送消息
Producer仅负责发送消息,不等待,不处理MQ的ACK; MQ不返回ACK
该方法效率最高,但是可靠性差
2.代码举例
省去具体代码…
二、顺序消息
1.什么是顺序消息
严格按照消息的发送顺序进行消费的消息
默认方式下生产者会以轮询的方式把消息发送到不同的Queue队列,这样会导致消息消费顺序和发送顺序不一致; 若仅将消息发送到同一个Queue中,同时在消费时也仅从这个Queue上拉取消息,就能保证消息的顺序性
2.有序性分类
全局有序
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,称为全局有序
在创建Topic时指定Queue的数量,有三种指定方式
1)在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
2)在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
3)使用mqadmin命令手动创建Topic时指定Queue数量
分区有序
若有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,称为分区有序
如何实现Queue的选择?在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的。
在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论谁做选择key,都不能重复,都是唯一的。一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。
取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被Consumer获取到。此时使用消息key作为选择key是比较好的做法。
以上做法会不会出现如下新的问题呢?不属于那个Consumer的消息被拉取走了,那么应该消费该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。
三、延时消息
1.什么是延时消息
当消息写入到Broker后,在指定的时长才可被处理的消息,称为延时消息
采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。
2.延时等级
延时消息的延迟市场不支持设置任意时长,是通过特定的延时等级来指定的;延时等级定义在RocketMQ服务端的MessageStoreConfig的messageDelayLevel中
如果需要自定义的延时等级,可以通过在broker加载的配置中新增messageDelayLevel的配置(配置文件在RocketMQ安装目录下的conf目录中)
3.延时消息实现原理
修改消息
Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程:
修改消息的Topic为SCHEDULE_TOPIC_XXXX
根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话)
延迟等级delayLevel与queueId的对应关系为queueId = delayLevel -1
需要注意,在创建queueId目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,而是用到哪个延迟等级创建哪个目录
- 修改消息索引单元内容;
- 索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到Broker时的时间戳
- 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息是如何排序的?
是按照消息投递时间排序的。一个Broker中同一等级的所有延时消息会被写入到consumequeue目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。
投递延时消息
Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中
ScheuleMessageService在Broker启动时,会创建并启动一个定时器Timer,用于执行相应的定时任务。系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消费与投递。每个TimerTask都会检测相应Queue队列的第一条消息是否到期。若第一条消息未到期,则后面的所有消息更不会到期(消息是按照投递时间排序的);若第一条消息到期了,则将该消息投递到目标Topic,即消费该消息。
将消息重新写入commitlog
延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。
这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类ScheuleMessageService。
四、事务消息
1.基础
分布式事务
通俗来讲,分布式事务就是一次操作由若干分支操作组成,这些操作分属不同应用和不同服务器; 分布式事务需要保证这些分支的一致性,即要么全部成功,要么全部失败; 分布式事务与普通事务一样,是为了保证结果的一致性
事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能, 通过事务消息能达到分布式事务的最终一致; XA是一种分布式事务解决方案, 一种分布式事务处理模式
半事务消息
暂不能投递的消息,发送方已经成功地将消息发送到了Broker, 但是Broker未收到最终确认指令, 此时该消息被标记成”暂不能投递”状态,其不会被消费者看到
本地事务状态
Producer回调操作执行的结果为本地事务状态,其会发送给TC, 而TC会再发送给TM; TM会根据TC发送来的本地事务状态来决定全局事务确认指令
消息回查
即重新查询本地事务的执行状态
注意,消息回查不是重新执行回调操作。以银行转账为例, 回调操作是进行预扣款操作,而消息回查则是查看预扣款操作执行的结果。
引发消息回查的原因最常见的有两个:
1)回调操作返回UNKNWON
2)TC没有接收到TM的最终全局事务确认指令
RocketMQ中的消息回查设置
关于消息回查,有三个主要的属性设置,其都在broker加载的配置文件中设置
transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
2.XA模式三剑客
XA协议
XA是一种分布式事务解决方案,它有三个重要组件:TC、TM、RM
TC
Transaction Coordinator 事务协调者; 维护全局和分支事务的状态,驱动全局事务提交或回滚
在RocketMQ中Broker充当TC的角色
TM
Transaction Manager 事务管理器; 定义全局事务的范围: 开始全局事务、提交或回滚全局事务; 它实际上是全局事务的发起者
RocketMQ中事务消息的Producer充当TM的角色
RM
Resource Manager, 资源管理器; 管理分支事务处理的资源, 与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚
RocketMQ中事务消息的Producer和Broker都是RM
3.XA模式架构
XA模式是一个典型的2PC,其执行原理如下
- TM向TC发起指令,开启一个全局事务。
- 根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
- 各个RM在接收到指令后会在进行本地事务预执行。
- RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
- TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指令。
- 若所有结果都是成功响应,则向TC发送Global Commit指令。
- 只要有结果是失败响应,则向TC发送Global Rollback指令。
- TC在接收到指令后再次向RM发送确认指令。
事务消息方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案
中的消息预提交与预扣款操作间是同步的。
4.注意
- 事务消息不支持延迟消息
- 对于事务消息要做好幂等性检查,因为事务消息可能被多次消费(可能在回滚后被再次消费)
五、批量消息
1.批量发送消息
发送限制
生产者在发送消息时可以一次性发送多条消息,这可以大大提升Producer的效率,不过批量发送的消息需要注意以下几点
- Topic必须相同
- 刷盘策略必须相同
- 不能是延时消息和事务消息
批量发送大小
默认情况下,总大小不能超过4MB
若要一次批量发送超过4MB的消息,有两种方法
①将批量消息进行拆分,拆成若干大小小于等于4m的消息集合分多次批量发送
②在Producer端和Broker端修改属性
Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性
生产者发送消息的大小
生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占20字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、要发送的QueueId等。最终写入到Broker中消息单元中的数据都是来自于这些属性。
2.批量消费消息
修改批量属性
Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。
若要使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。不过,该值不能超过32。因为默认情况下消费者每次可以拉取的消息最多是32条。
若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定
问题
Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性如何设置合适
pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。
consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理
六、消息过滤
消费者在可以订阅Topic的同时,对Topic中的消息进行条件过滤
1.Tag过滤
通过consumer的subscribe()方法制定要订阅消息的Tag; 若订阅多个Tag的消息, Tag用”||”连接
2.SQL过滤
SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式; 通过SQL过滤,可以实现对消息的较复杂的过滤; 不过,只有通过Push模式的消费者才能使用SQL过滤
SQL过滤表达式支持多种常量类型与运算符
支持的常量类型 :
- 数值
- 字符: 必须用单引号包裹
- 布尔类型
- NULL
支持的运算符 :
- 数值比较:>,>=,<,<=,BETWEEN,=
- 字符比较:=,<>,IN
- 逻辑运算 :AND,OR,NOT
- NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:enablePropertyFilter = true
在启动Broker时需要指定这个修改过的配置文件。例如对于单机Broker的启动,其修改的配置文件是conf/broker.conf,启动时使用如下命令:sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
七、消息发送重试机制
1.简介
Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,又称消息重投机制
需要注意以下几点
- 生产者若采用同步/异步发送方式, 发送失败后会重试, 但oneway消息发送方式在消息发送失败后是没有重试机制的
- 只有普通消息发送具有重试机制
- 消息重投只保证消息尽可能发送成功,不丢失,但不保证消息不重复;
- 在RocketMQ中无法避免消息重复,但要避免消息的重复消费
- 消息重复在一般情况下不会发生,但在消息量大、网络抖动时,消息重复可能会十分频繁
- Producer主动重发、consumer负载变化时也可能会导致消息重复(Rebalance不会导致消息重复,但可能会产生重复消费)
- 避免消息重复消费的方法: 为消息添加唯一标示(key), 让消费者对消息进行消费判断来避免重复消费
- 消息发送重试有三种策略: 同步发送失败策略、异步发送失败策略、消息刷盘失败策略
2.同步发送失败策略
对于普通消息,消息发送默认采用轮询策略来选择所发送到的队列。如果发送失败,默认重试2次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker。当然,若只有一个Broker其也只能发送到该Broker,但其会尽量发送到该Broker上的其它Queue。
同时,Broker还具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标Broker。其可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。
如果超过重试次数,则抛出异常,由Producer去保证消息不丢。当然当生产者出现RemotingException、MQClientException和MQBrokerException时,Producer会自动重投消息。
3.异步发送失败策略
异步发送失败时,重试时不会选择其他Broker,而是仅在一个Broker上进行重试,所以无法保证消息不丢
4.消息刷盘失败策略
消息刷盘超市或slave不可用(slave在数据同步时向master返回状态不是SEND_OK)时,默认是不会将消息尝试发送到其他Broker的; 不过,对于重要消息可以通过在Broker的配置文件设置retryAnotherBrokerWhenNotStoreOK为true来开启
八、消息消费重试机制
1.顺序消息的消费重试
对于顺序消息,当Consumer消费消息失败后,为保证消息的顺序性,会不断地进行消息重试,直到成功消费; 消费重试默认时间间隔为1000ms; 重试期间应用会出现消息消费被阻塞的情况
因为顺序消费的重试是不间断的,为避免消费被永久阻塞, 要保证应用能及时监控并处理消费失败的情况
顺序消息没有发送失败重试机制, 但有消费失败重试机制
2.无序消息的消费重试
对于无序消息(普通消息、延时消息、事务消息), 当Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。需要注意的是,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息。
3.消费重试次数与间隔
对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如下表:
重试次数 | 重试的时间间隔 | 重试次数 | 重试的时间间隔 |
---|---|---|---|
1 | 10s | 9 | 7m |
2 | 30s | 10 | 8m |
3 | 1m | 11 | 9m |
4 | 2m | 12 | 10m |
5 | 3m | 13 | 20m |
6 | 4m | 14 | 30m |
7 | 5m | 15 | 1h |
8 | 6m | 16 | 2h |
一条消息在消费失败次数达到上限后,会将该消息投递到死信队列
3.重试队列
对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列。
当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称为%RETRY%consumerGroup@consumerGroup 的重试队列。
Broker对于重试消息的处理是通过延时消息实现的。先将消息保存到SCHEDULE_TOPIC_XXXX延迟队列中,延迟时间到后,会将消息投递到%RETRY%consumerGroup@consumerGroup重试队列中。
1)这个重试队列是针对消息组的,而不是针对每个Topic设置的(一个Topic的消息可以让多个消费者组进行消费,所以会为这些消费者组各创建一个重试队列)
2)只有当出现需要进行重试消费的消息时,才会为该消费者组创建重试队列
九、死信队列
1.概念
达到最大重试次数后,消费依然失败的消息会被发送到该消费者对应的特殊队列中, 即死信队列, 其中的消息称为死信消息
2.特征
- 死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的
- 死信存储有效期与正常消息相同,均为 3 天(commitlog文件的过期时间),3 天后会被自动删除
- 死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup ,即每个消费者组都有一个死信队列
- 如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列
3.死信消息的处理
实际上,当⼀条消息进入死信队列,就意味着系统中某些地方出现了问题,从而导致消费者无法正常消费该消息,比如代码中原本就存在Bug。因此,对于死信消息,通常需要开发人员进行特殊处理。最关键的步骤是要排查可疑因素,解决代码中可能存在的Bug,然后再将原来的死信消息再次进行投递消费。