TTL消息

  • TTL是指Time To Live的缩写,也就是生存时间
  • RabbitMQ支持消息的国企时间,在消息发送时可以进行指定
  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过队列的超时时间配置,那么消息会自动清除。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列上。
  • 可以监听这个队列中消息做相应的处理,这个特性可以补偿RabbitMA3.0之前支持的immediate参数的功能。

 

死信队列:DLX,Dead-Letter-Exchange

  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能重新publish到另一个Exchange,这个Exchange就是DLX
  • DLX也是一个正常的Exchange,和一般的Exchange没什么区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。
  • 设置死信队列,首先需要设置死信队列的Exchange和queue,然后进行绑定:
    Exchange:dlx:exchange
    Queue:dlx.queue
    RoutingKey:#
  • 正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");

 

消息变成死信队列有以下几种情况:

  1. 消息被拒绝(basic.reject/basic.nack),并且requeue=false
  2. 消息TTL过期
  3. 队列达到最大长度

ReceiverDLXtExchange.java

package com.bfxy.rabbitmq.api.dlx;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class ReceiverDLXtExchange {

	public static void main(String[] args) throws Exception {
		
		
                ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
                connectionFactory.setHost("10.0.0.11");
                connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
                connectionFactory.setAutomaticRecoveryEnabled(true);
                connectionFactory.setNetworkRecoveryInterval(3000);
                Connection connection = connectionFactory.newConnection();
        
                Channel channel = connection.createChannel();  
		//4 声明正常的 exchange queue 路由规则
		String queueName = "test_dlx_queue";
		String exchangeName = "test_dlx_exchange";
		String exchangeType = "topic";
		String routingKey = "group.*";
		//	声明 exchange
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		
		
		//	注意在这里要加一个特殊的属性arguments: x-dead-letter-exchange
		Map<String, Object> arguments = new HashMap<String, Object>();
		arguments.put("x-dead-letter-exchange", "dlx.exchange");
		//arguments.put("x-dead-letter-routing-key", "dlx.*");
		//arguments.put("x-message-ttl", 6000);
		channel.queueDeclare(queueName, false, false, false, arguments);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		
		//dlx declare:
		channel.exchangeDeclare("dlx.exchange", exchangeType, true, false, false, null);
		channel.queueDeclare("dlx.queue", false, false, false, null);
		channel.queueBind("dlx.queue", "dlx.exchange", "#");
		
		
                //	durable 是否持久化消息
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //	参数:队列名称、是否自动ACK、Consumer
                channel.basicConsume(queueName, true, consumer);  
                //	循环获取消息  
                while(true){  
                    //	获取消息,如果没有消息,这一步将会一直阻塞  
                    Delivery delivery = consumer.nextDelivery();  
                    String msg = new String(delivery.getBody());    
                    System.out.println("收到消息:" + msg);  
                } 
	}
}

 

SenderDLXExchange.java

package com.bfxy.rabbitmq.api.dlx;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SenderDLXExchange {

	
	public static void main(String[] args) throws Exception {
		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("10.0.0.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 创建Connection
		Connection connection = connectionFactory.newConnection();
		//3 创建Channel
		Channel channel = connection.createChannel();  
		//4 声明
		String exchangeName = "test_dlx_exchange";
		String routingKey = "group.bfxy";
		//5 发送
		
		Map<String, Object> headers = new HashMap<String, Object>();
		
		AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
		.deliveryMode(2)
		.contentEncoding("UTF-8")
		//	TTL
		.expiration("6000")
		.headers(headers).build();
		
		String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... ";
		channel.basicPublish(exchangeName, routingKey , props , msg.getBytes()); 		
		
	}
	
}

 

最后修改于 2020-07-13 14:20:05
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇