使用Rabbitmq实现一个简单的延迟队列

使用延迟队列的场景很多,比较常见的就是:下单30分钟未支付,取消订单。市场上流行的MQ中目前好像只有RocketMQ原生支持延迟队列,但是它的开源版本,延迟消息,延迟时间是固定的,只有几个选项,非常不灵活。

得益于Rabbitmq的TTL队列以及死信交换机机制,可以实现一个延迟队列。

TTL队列 & 死信交换机

TTL队列

TTL队列,顾名思义,在创建一个队列的时候可以给队列设置一个TTL时间。这个时间表示这个队列中消息的最大存活时间。如果消息在队列中存活的时间超过了设置值,还没被消费,就会被丢弃。

Map<String, Object> properties = new HashMap<>();
properties.put("x-message-ttl", TimeUnit.HOURS.toMillis(1));  // ttl 设置为 1小时
channel.queueDeclare("myqueue", false, false, false, properties);

死信交换机

死信交换机,当一个消息变成死信的时候,就会尝试把它投递到指定的交换机。这个指定的交换机就叫做:死信交换机

消息变成死信的场景

  1. 消息被消费者拒绝消费,并且设置了 requeue 为 false
  2. 消息TTL到期
  3. 队列达到了最大长度
Map<String, Object> properties = new HashMap<>();
properties.put("x-dead-letter-exchange", "my-queue-dead-exchange"); // 指定队列的死信交换机
properties.put("x-dead-letter-routing-key" , "dlx-routing-key");  // 把死信投递到交换机的时候,设置的路由KEY(这个配置不是必须的)
channel.queueDeclare("myqueue", false, false, false, properties);

如果理解了以上2点,那么就很好理解这么实现一个死信队列了。

  1. 创建一个TTL队列,给这个队列设置一个死信交换机,并且不要给这个队列设置任何消费者
  2. 创建一个正常队列,设置消费者,专门用于消费TTL到期的消息
  3. 以上2个队列都可以绑定在同一个交换机,通过不同的route_key来路由消息

消息投递到TTL队列,不同延迟的TTL队列通过不同的route_key区分,因为没有消费者,就会等待过期,一旦过期就变成了死信,那么就会进如死信交换机,并且添加上指定的route_key。专门监听消费到期消息的消费者就会消费到这条延迟消息

消息生产者

创建2个TTL队列,一个延迟5秒,一个延迟10秒。分别投递5条消息。

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
	
	public static final String DELAY_EXCHANGE = "delay_exchange";
	
	public static final String DELAY_05_SECONDS_QUEUE = "delay_05_seconds_queue";
	
	public static final String DELAY_10_SECONDS_QUEUE = "delay_10_seconds_queue";
	
	public static final String EXPIRATION_QUEUE = "expiration_queue";
	
	public static final String EXPIRATION_QUQUE_ROUTE = "expiration";
	
	public static void main(String[] args) throws Exception {

		// 创建连接工厂
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		
		Connection connection = connectionFactory.newConnection();
		
		Channel channel = connection.createChannel();
		
		// 声明交换机
		channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, false, false, false, null);
		
		// 声明延迟消费队列
		channel.queueDeclare(EXPIRATION_QUEUE, false, false, false, null);
		channel.queueBind(EXPIRATION_QUEUE, DELAY_EXCHANGE, EXPIRATION_QUQUE_ROUTE);
		
		// 声明05秒延迟队列
		Map<String, Object> properties = new HashMap<>();
		properties.put("x-dead-letter-exchange", DELAY_EXCHANGE);
		properties.put("x-message-ttl", TimeUnit.SECONDS.toMillis(5));
		properties.put("x-dead-letter-routing-key" , EXPIRATION_QUQUE_ROUTE);
		channel.queueDeclare(DELAY_05_SECONDS_QUEUE, false, false, false, properties);
		channel.queueBind(DELAY_05_SECONDS_QUEUE, DELAY_EXCHANGE, "05"); // 5 秒 
		
		// 声明10秒延迟队列
		properties = new HashMap<>();
		properties.put("x-dead-letter-exchange", DELAY_EXCHANGE);
		properties.put("x-message-ttl", TimeUnit.SECONDS.toMillis(10));
		properties.put("x-dead-letter-routing-key" , EXPIRATION_QUQUE_ROUTE);
		channel.queueDeclare(DELAY_10_SECONDS_QUEUE, false, false, false, properties);
		channel.queueBind(DELAY_10_SECONDS_QUEUE, DELAY_EXCHANGE, "10"); // 10 秒 
		
		for (int i = 0; i < 5; i ++ ) {
			channel.basicPublish(DELAY_EXCHANGE, "05", true, false, null, 
					("[05秒]我是第" + (i + 1) + "条消息[" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) + "]").getBytes());
		}
		for (int i = 0; i < 5; i ++ ) {
			channel.basicPublish(DELAY_EXCHANGE, "10", true, false, null, 
					("[10秒]我是第" + (i + 1) + "条消息[" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) + "]").getBytes());
		}
		channel.close();
		connection.close();
	}
}

消息消费者

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CountDownLatch;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {
	
	public static final String DELAY_EXCHANGE = "delay_exchange";
	
	public static final String EXPIRATION_QUEUE = "expiration_queue";
	
	public static final String EXPIRATION_QUQUE_ROUTE = "expiration";
	
	public static void main(String[] args) throws Exception {
		
		CountDownLatch countDownLatch = new CountDownLatch(10);
		
		// 创建连接工厂
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("admin");
		
		Connection connection = connectionFactory.newConnection();
		
		Channel channel = connection.createChannel();
		
		// 声明交换机
		channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, false, false, false, null);
		
		// 声明延迟消费队列
		channel.queueDeclare(EXPIRATION_QUEUE, false, false, false, null);
		channel.queueBind(EXPIRATION_QUEUE, DELAY_EXCHANGE, EXPIRATION_QUQUE_ROUTE);
		
		channel.basicConsume(EXPIRATION_QUEUE, false, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
				
				System.out.println("[" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) +"] 消费消息:"
						+ new String(body, StandardCharsets.UTF_8));
				
				channel.basicAck(envelope.getDeliveryTag(), false);
				
				countDownLatch.countDown();
			}
		});
		
		countDownLatch.await();

		channel.close();
		connection.close();
	}
}

测试

先启动消费者,阻塞,进入等待消费状态。然后启动消费生产者,基本是秒发送完毕10条消息,然后就会退出。在短暂延迟后就可以看到消费者的控制台输出。

一共10条日志,延迟5秒消费的5条,以及延迟10秒消费的10条。准确无误。

[2021-07-13 00:42:27] 消费消息:[05秒]我是第1条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第2条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第3条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第4条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:27] 消费消息:[05秒]我是第5条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第1条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第2条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第3条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第4条消息[2021-07-13 00:42:22]
[2021-07-13 00:42:32] 消费消息:[10秒]我是第5条消息[2021-07-13 00:42:22]

1 个赞