合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
>[info] 重回队列机制 **ACK & NACK** 当我们设置 ``` autoACK=false ``` 时,就可以使用手工ACK方式了,其实手工方式包括了手工ACK与NACK 当我们手工 ACK 时,会发送给Broker一个应答,代表消息处理成功,Broker就可回送响应给Pro. NACK 则表示消息处理失败,如果设置了重回队列,Broker端就 会将没有成功处理的消息重新发送. ***** **使用方式:** * Con消费时,如果由于业务异常,我们可以手工 NACK 记录日志,然后进行补偿API:void basicNack(long deliveryTag, boolean multiple, boolean requeue)API:void basicAck(long deliveryTag, boolean multiple) * 如果由于服务器宕机等严重问题,我们就需要手工 ACK 保障Con消费成功 ***** **重回队列:** * 重回队列是为了对没有处理成功的消息,将消息重新投递给Broker * 重回队列,会把消费失败的消息重新添加到队列的尾端,供Con继续消费 * 一般在实际应用中,都会关闭重回队列,即设置为false ***** **代码实现:basic_nack.php** ~~~ <?php /** * - Start this consumer in one window by calling: php demo/basic_nack.php * - Then on a separate window publish a message like this: php demo/amqp_publisher.php good * that message should be "ack'ed" * - Then publish a message like this: php demo/amqp_publisher.php bad * that message should be "nack'ed" */ include(__DIR__ . '/config.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; $exchange = 'router'; $queue = 'msgs'; $consumerTag = 'consumer'; $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange); /** * @param \PhpAmqpLib\Message\AMQPMessage $message */ function process_message($message) { /* if(插入成功){ echo "将消息删除:"; 服务器死掉,相当于exit; $message->ack(true); }else if(插入失败){ echo "将消息不要删除,等着下次消费"; $message->nack(true); }*/ if ($message->body == 'good') { $message->ack(); } else { echo "成功收到消息,消息内容为:".$message->body ; echo "将消息打回,重回队列:"; $message->nack(true); } // Send a message with the string "quit" to cancel the consumer. if ($message->body === 'quit') { $message->getChannel()->basic_cancel($message->getConsumerTag()); } } $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message'); /** * @param \PhpAmqpLib\Channel\AMQPChannel $channel * @param \PhpAmqpLib\Connection\AbstractConnection $connection */ function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); // Loop as long as the channel has callbacks registered while ($channel->is_consuming()) { $channel->wait(); } ~~~ 1. 运行代码 ![](https://img.kancloud.cn/67/90/679061674b8e9b36404dc4c6ed62bc44_557x143.png)