TTL消息
- TTL是指Time To Live的缩写,也就是生存时间
- RabbitMQ支持消息的国企时间,在消息发送时可以进行指定
- RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过队列的超时时间配置,那么消息会自动清除。
- 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列上。
- 可以监听这个队列中消息做相应的处理,这个特性可以补偿RabbitMA3.0之前支持的immediate参数的功能。
死信队列:DLX,Dead-Letter-Exchange
- 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能重新publish到另一个Exchange,这个Exchange就是DLX
- DLX也是一个正常的Exchange,和一般的Exchange没什么区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。
- 设置死信队列,首先需要设置死信队列的Exchange和queue,然后进行绑定:
Exchange:dlx:exchange
Queue:dlx.queue
RoutingKey:# - 正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
消息变成死信队列有以下几种情况:
- 消息被拒绝(basic.reject/basic.nack),并且requeue=false
- 消息TTL过期
- 队列达到最大长度
ReceiverDLXtExchange.java
package com.bfxy.rabbitmq.api.dlx;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class ReceiverDLXtExchange {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明正常的 exchange queue 路由规则
String queueName = "test_dlx_queue";
String exchangeName = "test_dlx_exchange";
String exchangeType = "topic";
String routingKey = "group.*";
// 声明 exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 注意在这里要加一个特殊的属性arguments: x-dead-letter-exchange
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
//arguments.put("x-dead-letter-routing-key", "dlx.*");
//arguments.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, false, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
//dlx declare:
channel.exchangeDeclare("dlx.exchange", exchangeType, true, false, false, null);
channel.queueDeclare("dlx.queue", false, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
// durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
SenderDLXExchange.java
package com.bfxy.rabbitmq.api.dlx;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SenderDLXExchange {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_dlx_exchange";
String routingKey = "group.bfxy";
//5 发送
Map<String, Object> headers = new HashMap<String, Object>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
// TTL
.expiration("6000")
.headers(headers).build();
String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , props , msg.getBytes());
}
}
最后修改于 2020-07-13 14:20:05
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

