消费端限流
什么是消费端限流?
假设一个场景,首先,我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:
- 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时出来这么多数据。
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(uint prefetchSize ,unshort prefetchCount,bool global);
###
prefetchSize:默认为1000,Broker批量发送prefetchSize条消息给consumer
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block,直到有消息ack
global:true/false是否将上面设置应用于channel
注意:prefetchSize和global这两项,rabbitMQ没有实现,暂且不研究prefetchSize在no_ask=false的情况下,即在自动应答的情况两个值是不生效的。
package com.bfxy.rabbitmq.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test001";
// durable 是否持久化消息
channel.queueDeclare(queueName, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
//流控,只需这一行代码,表示每次消费一条消息
channel.basicQos(0, 1, false);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费端ACK于重回队列
消费端的手工ACK和NACK
- 消费端进行消费的时候,如果由于业务异常,我们可以进行日志记录,然后进行补偿。(可以选择NACK,不要重回队列)
- 如果由于服务器宕机等严重问题,那我们就需要手动进行ACK保障消费端消费成功。
消费端的重回队列
- 消费端重回队列是为了对没有处理成功的消息,把消息重新会给Broker!
- 一般我们在实际应用中,都会关闭重回队列,也就是设置为false;
Receiver.java
package com.bfxy.rabbitmq.api.requeue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test001";
//durable 是否持久化消息
channel.queueDeclare(queueName, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 循环获取消息
while(true){
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
Thread.sleep(1000);
if((Integer)delivery.getProperties().getHeaders().get("flag") == 0) {
//throw new RuntimeException("异常");
//最后false表示不重回队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
} else {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
}
package com.bfxy.rabbitmq.api.requeue;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.0.0.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 创建Connection
Connection connection = connectionFactory.newConnection();
//3 创建Channel
Channel channel = connection.createChannel();
//4 声明
String queueName = "test001";
//参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数
channel.queueDeclare(queueName, true, false, false, null);
for(int i = 0; i < 5;i++) {
String msg = "Hello World RabbitMQ " + i;
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("flag", i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
channel.basicPublish("", queueName , props , msg.getBytes());
}
}
}
无论NACK还是ACK,rabbitMQ中的offset都会更新,都表示接到客户端反馈,只是nack后面有一个参数,来设置是否重回队里。
最后修改于 2020-07-13 12:09:16
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

