详细说明使用RabbitMQ延迟任务的方法
在开发过程中,我们经常会遇到这样的需求:
淘宝订单业务:下单后30分钟内未付款,订单自动取消。饿了吗?订购通知:订购成功后60秒向用户发送短信通知。关闭空闲连接:服务器中有很多客户端连接,空闲一段时间后需要关闭。高速缓存:高速缓存中的对象在其空闲时间后被移出高速缓存。任务超时处理:当网络协议滑动窗口请求响应交互时,处理超时无响应的请求。失败重试机制:业务操作失败后,每隔一定时间重试一次失败。这种服务的特点是需要延迟工作,失败后再重试。一个相当愚蠢的方法是使用一个后台线程遍历所有对象并逐个检查它们。这种方法简单易用。但是,当对象太多时,可能会出现性能问题。检查间隔很难设定。间隔太大,影响精度。如果太小,就会出现效率问题。而且不能按照超时顺序处理。
另一个例子是常见的场景:
场景一:物联网系统经常遇到向终端发布命令的情况。如果命令在一段时间内没有得到响应,则需要将其设置为超时。
场景二:下单30分钟后,如果用户未付款,系统自动取消订单。
上述类似的需求是我们经常遇到的问题。最常见的方法是定期轮换数据库并设置状态。数据量小的时候没有大问题,但是数据量大的时候,数据库轮换的方式就会变得特别耗费资源。当面对几千万的数据和上亿的数据时,自己写的IO相对较高,导致长期查询或者根本找不到,更别说数据库分表后了。此外,还有优先级队列、基于优先级队列的JDK延迟队列、时间轮等等。但是如果系统架构中有rabbtmq,那么选择rabbtmq来实现类似的功能也是一种选择。
要利用rabbtmq实现延迟任务,首先要了解rabbtmq的两个概念:消息的TTL和死信交换,并结合起来实现上述需求。
消息的Ttl(生存时间)
消息的TTL是消息的生存时间。RabbitMQ可以分别为队列和消息设置TTL。设置队列意味着队列没有消费者连接的保留时间,也可以单独设置每个消息。这一次之后,我们认为这个消息会消亡,这叫做一纸空文。如果设置了队列并且设置了消息,将采用较小的队列。因此,如果消息被路由到不同的队列,消息死亡的时间可能不同(不同的队列设置)。这里我们将讨论单个消息的TTL,因为它是延迟任务的关键。
您可以通过设置消息的过期字段或x-message-ttl属性来设置时间,这具有相同的效果。只有到期字段是字符串参数,所以写一个int类型的字符串:
byte[] messageBodyBytes='你好,世界!'。getBytes();AMQP。基本属性属性=新AMQP。basic properties();properties . setexpire(' 60000 ');channel . basicpublish(' my-exchange ',' routing-key ',属性,messageBodyBytes);上面的消息被扔进队列后,如果没有被消耗掉,它会在60秒后死亡。不会被消费者消费。这个新闻背后,没有“死”的新闻,是被消费者消费的。死信不会在队列中删除和释放,但会计入队列中的消息数。单靠死信是无法实现延迟任务的,还要靠死信交换。
死信交换
交换的概念在这里不再重复。如果满足以下条件,消息将进入死信路由。请记住,这是一条路由,而不是一个队列,一条路由可以对应多个队列。
1.消费者拒绝了一条消息,拒绝方法的参数中的requeue为false。也就是说,它不会再次被放入队列,被其他消费者使用。
2.上述消息的TTL已到达,消息已过期。
3.队列的长度限制已满。最上面的消息被丢弃或扔在死信路由上。
死信交易所其实就是一个普通的交易所,和创建其他交易所没什么区别。只有当某个带有死信交换的队列中的消息过期时,它才会自动触发该消息的转发,并将其发送到死信交换。
实现延迟队列
延迟任务是通过消息的TTL和死信交换来实现的。我们需要设置两个队列,一个用于发送消息,另一个用于在消息过期后转发目标队列。
生产者向队列1输出一条消息,该消息被设置为有效时间,例如60秒。消息将在队列1中等待60秒。如果没有消费者收到,它将被转发到队列2。如果Queue2有一个使用者,它将接收并处理延迟的任务。
具体实施步骤如下:
第一步是创建两个队列。队列1和队列2。Queue1是一个消息缓冲队列,其中实现了消息的过期转发。如下所示,设置死信交换和死信路由密钥。设置这两个属性是在消息在该队列中过期后发送的路由。dlx的交换需要提前创建,即普通交换。由于我们仍然需要向队列1发送消息,我们还需要创建一个交换并绑定到队列1。在本例中,exchange也称为queue1。
我们还需要构建一个Queue2,当消息在Queue1中过期后,它被用作转发消息的目的队列。因此,在构建Queue2队列后,有必要绑定Queue1: dlx设置的死信路由。队列2的绑定完成后,环境就构建好了。
第二步是实现消息的生产者。由于我们的目标是使进入队列1的消息过期,然后自动将它们转移到队列2,因此我们需要在发送它们时设置过期时间。
ConnectionFactory factory=new ConnectionFactory();factory . setusername(' bsp ');factory . setpassword(' 123456 ');factory . setvirtualhost('/');factory . sethost(' 10 . 23 . 22 . 42 ');factory . setport(5672);conn=factory . new connection();channel=conn . create channel();byte[] messageBodyBytes='你好,世界!'。getBytes();字节i=10while(I-0){ channel . basicpublish(' queue1 ',' queue 1 ',新AMQP。基本属性. Builder()。过期时间(String.valueOf(i * 1000))。build(),new byte[]{ I });}我模拟了上面代码中的1-10号消息,消息的内容是1-10。到期时间为10-1秒。这里需要注意的是,虽然10是第一次传输,但是它具有最长的到期时间。
第三步是实现消息消费者。消费者是延迟任务的实现者。因为特定的任务通常是耗时的任务,一般来说,任务通常在异步线程中执行。
ConnectionFactory factory=new ConnectionFactory();factory . setusername(' bsp ');factory . setpassword(' 123456 ');factory . setvirtualhost('/');factory . sethost(' 10 . 23 . 22 . 42 ');factory . setport(5672);conn=factory . new connection();channel=conn . create channel();channel.basicConsume('queue2 ',true,' consumer ',new DefaultConsumer(channel){ @ override public void handleDelivery(String consumer tag,信封信封,AMQP。BasicProperties属性,byte[] body)引发IOException { long deliveryTag=envelope . getdeliverytag();//做点工作async system . out . println(body[0]);}});如上运行程序后,10s后,消费者开始收到数据,但只收到一次以下结果:
10、9 、8 、7 、6、5 、4 、3 、2 、1
消费者收到的第一个还是10。虽然10是第一个放入队列的,但它的过期时间最长。因此可以看出,即使一条消息比同一队列中的其他消息更早过期,那些更早过期的消息也不会优先进入死信队列,仍然会被消费者按照仓单进行消费。如果第一条传入消息的过期时间为1小时,则死信队列中的使用者可能会等待1小时才能收到第一条消息。根据官方文件,发现“只有当过期的消息到达队列的头部时,才会被实际丢弃(或死信)。”只有当过期的消息到达队列的顶部(队列的头部)时,它们才会被真正丢弃或进入死信队列。
因此,在考虑使用RabbitMQ实现延迟任务队列时,需要保证业务中每个任务的延迟时间一致。如果不同的任务类型需要不同的延迟,则需要为每个延迟时间不同的消息建立单独的消息队列。
摘要
以上就是本文的全部内容。希望本文的内容对大家的学习或工作有一定的参考价值。有问题可以留言交流。谢谢你的支持。
版权声明:详细说明使用RabbitMQ延迟任务的方法是由宝哥软件园云端程序自动收集整理而来。如果本文侵犯了你的权益,请联系本站底部QQ或者邮箱删除。