Confirm确认消息

理解Confirm消息确认机制

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障

 

Confirm确认消息流程

 

Confirm确认消息实现

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志等后续处理。

ReceiverConfirmListener.java

package com.bfxy.rabbitmq.api.confirmlistener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class ReceiverConfirmListener {

	public static void main(String[] args) throws Exception {
		
		
                ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
                connectionFactory.setHost("10.0.0.11");
                connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
                connectionFactory.setAutomaticRecoveryEnabled(true);
                connectionFactory.setNetworkRecoveryInterval(3000);
                Connection connection = connectionFactory.newConnection();
        
                Channel channel = connection.createChannel();  
		//4 声明
		String exchangeName = "test_confirmlistener_exchange";
		String exchangeType = "topic";
		String queueName = "test_confirmlistener_queue";
		//String routingKey = "user.*";
		String routingKey = "confirm.#";
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
                //durable 是否持久化消息
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //参数:队列名称、是否自动ACK、Consumer
                channel.basicConsume(queueName, false, consumer);  
                //循环获取消息  
                while(true){  
                    //获取消息,如果没有消息,这一步将会一直阻塞  
                    Delivery delivery = consumer.nextDelivery();  
                    String msg = new String(delivery.getBody());    
                    System.out.println("收到消息:" + msg);  
                    //手工签收消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } 
	}
}

 

SenderConfirmListener.java

package com.bfxy.rabbitmq.api.confirmlistener;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SenderConfirmListener {

	
	public static void main(String[] args) throws Exception {
		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("10.0.0.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 创建Connection
		Connection connection = connectionFactory.newConnection();
		//3 创建Channel
		Channel channel = connection.createChannel();  
		
		//4 声明
		String exchangeName = "test_confirmlistener_exchange";
		String routingKey1 = "confirm.save";
		
    	//5 发送
		String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";
		
		channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("------- error ---------");
			}
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("------- ok ---------");
			}
		});
        
		channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 

 
	}
	
}

 

Return消息机制

  • Return Listener用于处理一些不可路由的消息
  • 我们的消费生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中,然后我们的消费者监听队列,进行消费处理操作
  • 但是在某一些情况下,如果我们在发送消息的时候,当前exchane不存在或者指定的路由key找不到,这个时候我们需要监听这种不可达的消息,就要使用Return Listener。
  • 在基础API中有一个关键的配置项:Mandatory。
  • Mandatory:如果为ture,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端会自动删除消息。



ReceiverReturnListener.java

package com.bfxy.rabbitmq.api.returnlistener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class ReceiverReturnListener {

	public static void main(String[] args) throws Exception {
		
		
                ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
                connectionFactory.setHost("10.0.0.11");
                connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
                connectionFactory.setAutomaticRecoveryEnabled(true);
                connectionFactory.setNetworkRecoveryInterval(3000);
                Connection connection = connectionFactory.newConnection();
        
                Channel channel = connection.createChannel();  
		        //4 声明
		String exchangeName = "test_returnlistener_exchange";
		String exchangeType = "topic";
		String queueName = "test_returnlistener_queue";
		//String routingKey = "user.*";
		String routingKey = "return.#";
		channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
                //durable 是否持久化消息
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //参数:队列名称、是否自动ACK、Consumer
                channel.basicConsume(queueName, true, consumer);  
                //循环获取消息  
                while(true){  
                    //获取消息,如果没有消息,这一步将会一直阻塞  
                    Delivery delivery = consumer.nextDelivery();  
                    String msg = new String(delivery.getBody());    
                    System.out.println("收到消息:" + msg);  
                } 
	}
}


SenderReturnListener.java

package com.bfxy.rabbitmq.api.returnlistener;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;

public class SenderReturnListener {

	
	public static void main(String[] args) throws Exception {
		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("10.0.0.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 创建Connection
		Connection connection = connectionFactory.newConnection();
		//3 创建Channel
		Channel channel = connection.createChannel();  
		
		//4 声明
		String exchangeName = "test_returnlistener_exchange";
		String routingKey1 = "abcd.save";
		String routingKey2 = "return.save";
		String routingKey3 = "return.delete.abc";
		
		//5 监听
    	channel.addReturnListener(new ReturnListener() {
			public void handleReturn(int replyCode,
						            String replyText,
						            String exchange,
						            String routingKey,
						            AMQP.BasicProperties properties,
						            byte[] body)
					throws IOException {
				System.out.println("**************handleReturn**********");
				System.out.println("replyCode: " + replyCode);
				System.out.println("replyText: " + replyText);
				System.out.println("exchange: " + exchange);
				System.out.println("routingKey: " + routingKey);
				System.out.println("body: " + new String(body));
			}
    	});
    	
    	//6 发送
		String msg = "Hello World RabbitMQ 4 Return Listener Message ...";
		
		boolean mandatory = true;
		channel.basicPublish(exchangeName, routingKey1 , mandatory, null , msg.getBytes()); 
//		channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); 	
///		channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
		
 
	}
	
}

 

最后修改于 2020-07-13 11:07:47
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇