消费端限流

什么是消费端限流?

假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:

  • 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时出来这么多数据。

 

RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。

void BasicQos(uint prefetchSize ,unshort prefetchCount,bool global);

###
prefetchSize:默认为1000,Broker批量发送prefetchSize条消息给consumer

prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block,直到有消息ack

global:true/false是否将上面设置应用于channel

注意:prefetchSize和global这两项,rabbitMQ没有实现,暂且不研究prefetchSize在no_ask=false的情况下,即在自动应答的情况两个值是不生效的。

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Receiver {

	public static void main(String[] args) throws Exception {
		
		
                ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
                connectionFactory.setHost("192.168.11.76");
                connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
                connectionFactory.setAutomaticRecoveryEnabled(true);
                connectionFactory.setNetworkRecoveryInterval(3000);
                Connection connection = connectionFactory.newConnection();
        
                Channel channel = connection.createChannel();  
        
                String queueName = "test001";  
                //	durable 是否持久化消息
                channel.queueDeclare(queueName, true, false, false, null);  
                QueueingConsumer consumer = new QueueingConsumer(channel);
        //流控,只需这一行代码,表示每次消费一条消息
                channel.basicQos(0, 1, false);
                //	参数:队列名称、是否自动ACK、Consumer
                channel.basicConsume(queueName, false, consumer);  
                //	循环获取消息  
                while(true){  
                    //	获取消息,如果没有消息,这一步将会一直阻塞  
                    Delivery delivery = consumer.nextDelivery();  
                    String msg = new String(delivery.getBody());    
                    System.out.println("收到消息:" + msg);  
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } 
	}
}

 

消费端ACK于重回队列

消费端的手工ACK和NACK

  • 消费端进行消费的时候,如果由于业务异常,我们可以进行日志记录,然后进行补偿。(可以选择NACK,不要重回队列)
  • 如果由于服务器宕机等严重问题,那我们就需要手动进行ACK保障消费端消费成功。

消费端的重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息重新会给Broker!
  • 一般我们在实际应用中,都会关闭重回队列,也就是设置为false;

Receiver.java

package com.bfxy.rabbitmq.api.requeue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Receiver {

	public static void main(String[] args) throws Exception {
		
		
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("10.0.0.11");
        connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        
        String queueName = "test001";  
        //durable 是否持久化消息
        channel.queueDeclare(queueName, true, false, false, null);  
        QueueingConsumer consumer = new QueueingConsumer(channel);
        
        //	参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, false, consumer);  
        //	循环获取消息  
        while(true){  
            //	获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
            Thread.sleep(1000);
            
            if((Integer)delivery.getProperties().getHeaders().get("flag") == 0) {
            	//throw new RuntimeException("异常");
                //最后false表示不重回队列
            	channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            } else {
            	channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        } 
	}
}

 

package com.bfxy.rabbitmq.api.requeue;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Sender {

	
	public static void main(String[] args) throws Exception {
		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("10.0.0.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 创建Connection
		Connection connection = connectionFactory.newConnection();
		//3 创建Channel
		Channel channel = connection.createChannel();  
		//4 声明
		String queueName = "test001";  
        //参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数
		channel.queueDeclare(queueName, true, false, false, null);
		
		for(int i = 0; i < 5;i++) {
			String msg = "Hello World RabbitMQ " + i;
			Map<String, Object> headers = new HashMap<String, Object>();
			headers.put("flag", i);
			AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
			.deliveryMode(2)
			.contentEncoding("UTF-8")
			.headers(headers).build();
			channel.basicPublish("", queueName , props , msg.getBytes()); 			
		}
	}
	
}

无论NACK还是ACK,rabbitMQ中的offset都会更新,都表示接到客户端反馈,只是nack后面有一个参数,来设置是否重回队里。

最后修改于 2020-07-13 12:09:16
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇