合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
>[info] 限流机制 **Con - 限流机制** 为何要限流: * 假设我们有这样的场景 Rabbitmq服务器有上万条未处理的消息,我们随便打开一个Con - Client,会造成:巨量的消息瞬间全部推送过来,然而我们单个客户端无法同时处理这么多数据!此时很有可能导致服务器崩溃,严 重的可能导致线上的故障。 * 还有一些其他的场景,比如说单个Pro一分钟产生了几百条数据,但是单个Con一分钟可能只能处理60条,这个时候Pro-Con肯定是不平衡的。通常Pro是没办法做限制的。所以Con肯定需要做一些限流措施,否则如果 超出最大负载,可能导致Con性能下降,服务器卡顿甚至崩溃等一系列严重后果 RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于Con或者channel设置Qos的值) 未被确认前,不消费新的消息 >[] 不能设置自动签收功能(autoAck = false) 如果消息未被确认,就不会到达Con,目的就是给Pro减压 ***** **场景:** * 假设当前生产者已经推送了上万条信息给MQ,那么消费者一下子需要消费上万条,负载大。 * 生产者秒推几百条消息给MQ,消费者只能消费几十条,超过负载。 ***** **限流设置API:** ``` // 消费者代码:一次只拿20条并处理 $channel->basic_qos($prefetchSize, 20, $global); ``` * prefetchSize: 单条消息的大小限制,Con通常设置为0,表示不做限制 * prefetchCount: 一次最多能处理多少条消息 * global: 是否将上面设置true应用于channel级别还是取false代表Con级别 >[] prefetchSize和global 这两项,RabbitMQ没有实现,暂且不研究 prefetchCount在 <span style='color:red'>autoAck=false</span> 的情况下生效,即在自动应答的情况下该值无效 **手工ACK:(应答MQ已经处理完成)** ``` $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); ``` 调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple表示是否批量签收,由于我们是一次处理一条消息,所以设置为false ***** **代码示例:basic_qos.php** ~~~ <?php include(__DIR__ . '/config.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); $channel->queue_declare('qos_queue', false, true, false, false); //第二个参数代表:每次只消费100条 $channel->basic_qos(null, 109, null); function process_message($message) { /*业务逻辑*/ //消费完消息之后进行应答,告诉rabbit我已经消费了,可以发送下一组了 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } $channel->basic_consume('qos_queue', '', false, false, false, false, 'process_message'); while ($channel->is_consuming()) { // After 10 seconds there will be a timeout exception. $channel->wait(null, false, 30000); } ~~~ 1. 运行代码生成队列 ![](https://img.kancloud.cn/5e/1f/5e1f2c0af620b7119502b9e1ca88055c_1093x328.png) 2. 推送5条消息 ![](https://img.kancloud.cn/e6/94/e6949c8e3ba01a903bb92096d6e28e19_1026x414.png) 3. 消费 ![](https://img.kancloud.cn/f2/e6/f2e6f3119ee416d7261d5775ccffb437_1009x246.png) * 我们之前是注释掉手工ACK方法,然后启动消费端和生产端,当时Con只消费了1条消息,这是因为我们设置了手工签收,并且设置了一次只处理一条消息,当我们没有回送ACK应答时,Broker端就认为Con还没有处理完 这条消息,基于这种限流机制就不会给Con发送新的消息了,所以Con那时只打印了一条消息