[[332032]]
关于延迟任务,在业务场景中太常见了。例如,下单后xx分钟内未支付订单,该订单将被关闭。例如,如果在XX分钟内没有抢到红包,则该红包将失效。
所以说起延迟任务的执行计划,很多人可能第一时间想到的是轮询,即设置定时任务,稍有经验的开发者都会知道这一点。轮询机制会给数据库带来很大的压力,当然对于小企业来说无所谓。如果是需要处理大量数据的业务,轮询肯定不行。而如果要保证高可用,就得涉及到分布式计划任务。怎么看都很麻烦。
很多小聪明都知道可以使用消息队列来实现。确实,MQ的异步、解耦特性在这种任务延迟的场景下能够爆发出强大的战斗力。既然RabbitMQ应用广泛,那么对于如何实现延迟任务自然有自己的解决方案。
用文字和UML活动图来谈谈RabbitMQ所谓的“死信”机制是如何实现延迟消息的需求及其功能缺陷
1、死信是什么说到死信,巴拉巴拉的术语死信队列、死信交换机就出来了。
这个词有点抽象,但并不难理解。不管你信不信,就当他死了吧~
例如,如果您的生产者向MQ Broker 发送一条消息,由于各种原因该消息没有被消费,并且该消息最终挂起/消亡。你可以把他视为一纸空文
那么死信队列呢?死信开关怎么样?其实这两个东西和普通的队列、交换机是一样的,没有本质的区别。
这种设置和处理在RabbitMQ中是点对点的,即普通队列可以绑定死信开关。
指定队列的死信开关需要设置队列的属性参数(arguments)
具体参数名称:
绑定死信交换: x-dead-letter-exchange
routingkey : x-dead-letter-routing-key 路由死信时使用
2、什么情况会产生死信在RabbitMQ中,有几种情况会产生死信:
1.队列长度已满
2. 消费者拒绝消费消息(丢弃)
3. 消息TTL过期
这里提到了TTL,那么我们需要解释一下它是什么。
TTL是time to live的缩写,即生存时间。
在RabbitMQ 中,可以在队列和单个消息上设置TTL。如果设置在队列上,则可以认为队列中所有消息的TTL为设置值。
队列TTL属性参数: x-message-ttl
单条消息TTL参数:过期
如果设置了TTL值,并且消息在队列中保留超过TTL而没有被消费,消息队列就会丢弃该消息,从而产生“死信”。
死信生成后,如果队列配置了死信交换机,则消息会流向绑定的死信交换机,然后死信交换机将消息路由到死信队列。
然后,死信队列被推送给该队列的使用者。
3、基于死信机制的延时任务实现方案那么,根据上述知识点1和2,相应的延迟任务执行方案自然就出来了。
具体计划:
1、创建一个没有消费者的队列,设置TTL值,绑定死信开关
2.所有需要延迟的消息都会被发送到这个队列中。
3、死信开关绑定对应的死信队列,它的消费者是处理延迟消息的服务。
根据上述方案的逻辑,向队列发送消息后,会等到消息过期后的——,即指定的延迟时间,消费者才能处理该消息。
可以满足延迟任务的需要。
活动图如下:
3、Spring 中 RabbitMQ 死信实现方式了解了原理和机制之后,我们就开始实际玩一下吧。
依赖的配置以及具体application.yml文件的编写这里不再解释。如果你想了解更多,可以阅读我之前的文章。
最重要、最核心的就是RabbitMQ的队列和交换机配置。
根据以上知识点可以得出,只要配置TTL和死信开关就可以实现该功能。
所以这里直接贴出我写的配置类:
@Configuration`publicclassRabbitBindConfig{`ublicfinalstaticStringSKYPYB_ORDINARY_EXCHANGE='skypyb-普通交换';`ublicfinalstaticStringSKYPYB_DEAD_EXCHANGE='skypyb-dead-exchange';`ublicfinalstaticStringSKYPYB_ORDINARY_QUEUE_1='skypyb-普通队列';`publicfinal staticStringSKYPYB _DEAD_QUEUE='skypyb-死队列'; ` publicfinalstaticStringSKYPYB_ORDINARY_KEY='skypyb.key.ordinary.one';`publicfinalstaticStringSKYPYB_DEAD_KEY='skypyb.key.dead';`@Bean`publicDirectExchangeordinaryExchange(){`returnnewDirectExchange(SKYPYB_ORDINARY_EXCHANGE,false,true);`}`@Bean`publicDirectExchangedeadExchange( ) {`returnnewDirectExchange(SKYPYB_DEAD_EXCHANGE,false,true);`}`@Bean`publicQueueordinaryQueue(){`Maparguments=newHashMap();`//TTL5s`arguments.put('x-message-ttl',1000*5) ; `//绑定死信队列和死信交换`arguments.put('x-dead-letter-exchange',SKYPYB_DEAD_EXCHANGE);`arguments.put('x-dead-letter-routing-key',SKYPYB_DEAD_KEY); `returnnewQueue(SKYPYB_ORDINARY_QUEUE_1,false,false,true,arguments);`}`@Bean`publicQueuedeadQueue(){`returnnewQueue(SKYPYB_DEAD_QUEUE,false,false,true);`}`@Bean`publicBindingbindingOrdinaryExchangeAndQueue(){`returnBindingBuilder.bind (ordinaryQueue()).to(ordinaryExchange()).with(SKYPYB_ORDINARY_KEY);`}`@Bean`publicBindingbindingDeadExchangeAndQueue(){`returnBindingBuilder.bind(deadQueue()).to(deadExchange()).with(SKYPYB_DEAD_KEY); `}`}`可以看到我定义了几个与普通队列和死信队列相关的常量。
并根据这些常量实例化相应的交换机和队列,并设置绑定关系。
实例化普通队列时进行特殊处理;普通队列绑定死信开关,并指定死信路由键。在实例化之前指定其TTL 值(5 秒到期)。
那么现在有了这样的配置,延迟消息所需要的所有条件都已经达到了。
编写一个消费者和发送者来测试它。
消费者:
@RabbitListener(queues={RabbitBindConfig.SKYPYB_DEAD_QUEUE})`@Component`publicclassDeadReceiver{`privateLoggerlogger=LoggerFactory.getLogger(DeadReceiver.class);`@RabbitHandler`publicvoidonDeadMessage(@PayloadStringmessage,`@HeadersMapheaders,`Channelchannel) throwsIOException{`logger. info('死信队列消费者收到消息:{}',message);`//可以从headers中获取deliverytag`LongdeliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);`try{`channel.basicAck(deliveryTag) ,false);`}catch(Exceptione){`System.err.println(e.getMessage());`booleanredelivered=(boolean)headers.get(AmqpHeaders.REDELIVERED);`channel.basicNack(deliveryTag,false ,已重新发送);`}`}`}`发件人:
`@RunWith(SpringRunner.class)`@SpringBootTest(classes=Application.class)`publicclassRabbitmqTest{`@Autowired`privateRabbitTemplaterabbitTemplate;`privateLoggerlogger=LoggerFactory.getLogger(RabbitmqTest.class);`@Test`publicvoidtestDead(){`rabbitTemplate. ConvertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,'消息正文');`rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,'消息正文');`log ger.info('-----消息发送完成-----');`}`}`最终控制台结果,延迟队列功能确实已经实现了:
2020-01-12 11:14:17.582 INFO 12032 — [main] com.skypyb.test.RabbitmqTest : —消息发送完成
2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-2] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者收到消息: 消息正文
2020-01-12 11:14:22.599 INFO 10576 — [cTaskExecutor-1] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者收到消息: 消息正文
除了队列TTL之外,还可以设置消息级TTL。
SpringAMQP对单个消息的TTL设置需要在MessageProperties类中完成,每个消息都有一个内置的类。
为了方便起见,SpringAMQP在消息发送过程中提供了一个钩子,允许我们设置Message的属性,即MessagePostProcessor
@FunctionalInterface`publicinterfaceMessagePostProcessor{`MessagepostProcessMessage(Messagemessage)throwsAmqpException;`defaultMessagepostProcessMessage(Messagemessage,Correlationcorrelation){`returnpostProcessMessage(message);`}`}`由于他使用了@FunctionalInterface注解,为了方便我就用lambda表达式来写一个,设置单条消息的TTL为3秒:
@RunWith(SpringRunner.class)@SpringBootTest(classes=Application.class)publicclassRabbitmqTest{@AutowiredprivateRabbitTemplaterabbitTemplate;`privateLoggerlogger=LoggerFactory.getLogger(RabbitmqTest.class);`@Test`publicvoidtestDead(){`rabbitTemplate.convertAndSend(`RabbitBindConfig.SKY) PYB_ORDINARY_EXCHANGE ,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,`'消息body',`(msg)-{`msg.getMessageProperties().setExpiration('3000');`returnmsg;`});`rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig .SKYPYB_ORDINARY_KEY,'消息正文'); `logger.info('-----消息已发送Completed-----');`}`}` 修改代码,再次发送,控制台输出为:
2020-01-12 11:51:22.788 INFO 26232 — [main] com.skypyb.test.RabbitmqTest : —消息发送完成
2020-01-12 11:51:25.787 INFO 10576 — [cTaskExecutor-4] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者收到消息: 消息正文
2020-01-12 11:51:27.784 INFO 10576 — [cTaskExecutor-5] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者收到消息: 消息正文
可以看到,嘿嘿,果然不出所料,消息接收的时间是有差别的,正好符合设置的消息TTL 3s和队列TTL 5s。
然而这个功能是有缺陷的
这是使用RabbitMQ死信机制作为延迟任务时不可避免会出现的一个缺点。
下面解释一下
4、RabbitMQ 死信实现方式缺陷将上述消息代码的顺序颠倒过来,如下所示:
@RunWith(SpringRunner.class)`@SpringBootTest(classes=Application.class)`publicclassRabbitmqTest{`@Autowired`privateRabbitTemplaterabbitTemplate;`privateLoggerlogger=LoggerFactory.getLogger(RabbitmqTest.class);`@Test`publicvoidtestDead(){`rabbitTemplate.convertAndSend (RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,'消息正文');`rabbitTemplate.convertAndSend(`RabbitBindConfig.SKYPYB_ORDINARY_EXCHANGE,`RabbitBindConfig.SKYPYB_ORDINARY_KEY,`'消息正文',`(msg )-{`msg.get消息属性()。 setExpiration('3000');`returnmsg;`});`logger.info('-----消息发送完成-----');`}`}`运行代码,结果是执行结果偏离了预期。 …控制台打印:
2020-01-12 15:00:19.371 INFO 9680 — [main] com.skypyb.test.RabbitmqTest : —消息发送完成
2020-01-12 15:00:24.380 INFO 10576 — [cTaskExecutor-1] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者收到消息: 消息正文
2020-01-12 15:00:24.380 INFO 10576 — [cTaskExecutor-3] c.s.rabbitmq.controller.DeadReceiver : 死信队列消费者收到消息: 消息正文
可以看到,消费者消费消息时,足足等了5秒!
这是为什么?这是由于RabbitMQ的特性造成的。
RabbitMQ的队列是一个先进先出(FIFO)有序队列,输入消息按顺序推送到MQ中。
而RabbitMQ只会对队列尾部的消息进行超时处理,所以会出现上述情况。
也就是说,即使第二条消息在3 秒后过期,因为第一条消息在5 秒后过期,RabbitMQ 也会等到第一条消息被丢弃后再判断第二条消息。最后的结果是,第一个过期后,第二个也过期了。
结语其实就日常生活中可能遇到的场景而言,使用RabbitMQ的死信机制就足够了。
毕竟大多数延迟任务都是固定时间的,比如下单后半小时没有付款就关闭订单。
只要场景是固定时间的延迟任务,RabbitMQ无疑可以满足这个需求。
要回答标题中的问题,您可以说:
RabbitMQ死信机制可以作为任务延迟场景的解决方案
但是,因为RabbitMQ消息死亡不是异步的,而是阻塞的。因此,它不能用作复杂延迟场景——的解决方案,其中每个消息的死亡需要彼此独立。
关于RabbitMQ 死信机制真的可以作为延时任务这个场景的解决方案吗?-51CTO.COM到此分享完毕,希望能帮助到您。
本文采摘于网络,不代表本站立场,转载联系作者并注明出处:https://www.iotsj.com//kuaixun/7443.html
用户评论
我一直都在想用rabbitmq做延时任务,看到这篇文章好期待结果!
有10位网友表示赞同!
死信队列确实是个很好的方案,但不知道能不能完全替代专门的延时任务机制。
有10位网友表示赞同!
这篇分析是不是比较学术化?希望能看看实际应用场景下的案例分享更直观.
有13位网友表示赞同!
我之前还真遇到过类似问题,希望本文能给我一些思路解决。
有10位网友表示赞同!
死信队列真的很有用,在流量高峰也能够保证消息能及时处理.
有6位网友表示赞同!
有没有其他解决方案可以替代?死信机制还是有些复杂感觉。
有8位网友表示赞同!
如果要追求极高的可靠性,使用死信机制是不是 overkill?
有11位网友表示赞同!
实际情况下死信队列的性能如何?会不会影响系统整体速度?
有18位网友表示赞同!
这篇文章让我对RabbitMQ有了更深入了解,之前只知道它可以做消息队列,没想到还可以用来延迟任务.
有14位网友表示赞同!
学习一下死信机制的设计原理,以后应用起来才更有底气.
有8位网友表示赞同!
在实际项目中如何配置死信队列的参数呢?有没有一些技巧分享?
有13位网友表示赞同!
这种方案适合所有类型的延时任务吗?不同类型任务需要的配置是否也不同?
有12位网友表示赞同!
这篇文章给我感觉就是理论性的东西,希望看到更多案例分析.
有13位网友表示赞同!
关于RabbitMQ的监控和报警机制,有什么建议吗?死信队列异常需要及时发现.
有13位网友表示赞同!
使用死信队列需要考虑哪些成本?比如资源消耗以及维护复杂度等等。
有9位网友表示赞同!
有没有更简单易用的替代方案?死信机制是不是太过于繁琐了?
有9位网友表示赞同!
学习一下RabbitMQ的官方文档,更全面地了解死信机制的使用方法.
有13位网友表示赞同!
如果系统规模比较庞大,使用这种方案可扩展性如何?会不会遇到瓶颈?
有10位网友表示赞同!
对于一些高并发场景,死信队列是否能够满足需求呢?
有19位网友表示赞同!
和其他的延时任务解决方案相比,RabbitMQ的优势在哪里?
有17位网友表示赞同!