Confirm确认消息
理解Confirm消息确认机制
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答
- 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障
Confirm确认消息流程
Confirm确认消息实现
- 在channel上开启确认模式:channel.confirmSelect()
- 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志等后续处理。
ReceiverConfirmListener.java
package com.bfxy.rabbitmq.api.confirmlistener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class ReceiverConfirmListener {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_confirmlistener_exchange";
String exchangeType = "topic";
String queueName = "test_confirmlistener_queue";
//String routingKey = "user.*";
String routingKey = "confirm.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
//手工签收消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
SenderConfirmListener.java
package com.bfxy.rabbitmq.api.confirmlistener;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SenderConfirmListener {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_confirmlistener_exchange";
String routingKey1 = "confirm.save";
//5 发送
String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- error ---------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- ok ---------");
}
});
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
}
}
Return消息机制
- Return Listener用于处理一些不可路由的消息
- 我们的消费生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中,然后我们的消费者监听队列,进行消费处理操作
- 但是在某一些情况下,如果我们在发送消息的时候,当前exchane不存在或者指定的路由key找不到,这个时候我们需要监听这种不可达的消息,就要使用Return Listener。
- 在基础API中有一个关键的配置项:Mandatory。
- Mandatory:如果为ture,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端会自动删除消息。
ReceiverReturnListener.java
package com.bfxy.rabbitmq.api.returnlistener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class ReceiverReturnListener {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_returnlistener_exchange";
String exchangeType = "topic";
String queueName = "test_returnlistener_queue";
//String routingKey = "user.*";
String routingKey = "return.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
}
}
SenderReturnListener.java
package com.bfxy.rabbitmq.api.returnlistener;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
public class SenderReturnListener {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String exchangeName = "test_returnlistener_exchange";
String routingKey1 = "abcd.save";
String routingKey2 = "return.save";
String routingKey3 = "return.delete.abc";
//5 监听
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("**************handleReturn**********");
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("body: " + new String(body));
}
});
//6 发送
String msg = "Hello World RabbitMQ 4 Return Listener Message ...";
boolean mandatory = true;
channel.basicPublish(exchangeName, routingKey1 , mandatory, null , msg.getBytes());
// channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
/// channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
}
}
最后修改于 2020-07-13 11:07:47
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

