🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
>[info] 幂等性 **什么是幂等性?** * 用户对于同一操作发起的一次请求或者多次请求的结果是一致的 比如数据库的乐观锁,在执行更新操作前,先去数据库查询version,然后执行更新语句,以version作为条件,如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种乐观锁的机 制来保障幂等性 ***** **产生场景:** * 用户注册的时候手抖了多点击了一次注册,导致数据库写入2条数据。 ***** **场景代码示例:basic_nack.php** ~~~ <?php /** * - Start this consumer in one window by calling: php demo/basic_nack.php * - Then on a separate window publish a message like this: php demo/amqp_publisher.php good * that message should be "ack'ed" * - Then publish a message like this: php demo/amqp_publisher.php bad * that message should be "nack'ed" */ include(__DIR__ . '/config.php'); use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; $exchange = 'router'; $queue = 'msgs'; $consumerTag = 'consumer'; $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); $channel = $connection->channel(); $channel->queue_declare($queue, false, true, false, false); $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel->queue_bind($queue, $exchange); /** * @param \PhpAmqpLib\Message\AMQPMessage $message */ function process_message($message) { /* if(插入成功){ echo "将消息删除:"; 服务器死掉,相当于exit; $message->ack(true); }else if(插入失败){ echo "将消息不要删除,等着下次消费"; $message->nack(true); }*/ if ($message->body == 'good') { $message->ack(); } else { echo "成功收到消息,消息内容为:".$message->body ; echo "将消息打回,重回队列:"; $message->nack(true); } // Send a message with the string "quit" to cancel the consumer. if ($message->body === 'quit') { $message->getChannel()->basic_cancel($message->getConsumerTag()); } } $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message'); /** * @param \PhpAmqpLib\Channel\AMQPChannel $channel * @param \PhpAmqpLib\Connection\AbstractConnection $connection */ function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); // Loop as long as the channel has callbacks registered while ($channel->is_consuming()) { $channel->wait(); } ~~~ 1. 运行代码监听 如果监听到 good 那就成功收到并处理 ![](https://img.kancloud.cn/83/90/839029a5747a1d36b537c161995431f2_403x73.png) 2. 在后台往 msg 队列 推送一条 good 消息 ![](https://img.kancloud.cn/22/94/2294abeed987ee006ca4072b31f1862c_863x508.png) 3. 随便推送 ![](https://img.kancloud.cn/2a/a3/2aa3a29fd11a1e7c756be7c58e3374b3_479x159.png) ***** **什么是Con - 幂等性** * 在业务高峰期最容易产生消息重复消费问题,当Con消费完消息时,在给Pro返回ack时由于网络中断,导致Pro未收到确认信息,该条消息就会重新发送并被Con消费,但实际上该消费者已成功消费了该条消息,这就造成了重复消 费. * 而Con - 幂等性,即消息不会被多次消费,即使我们收到了很多一样的消息. ***** **主流幂等性实现方案:** 1. 唯一ID+指纹码 核心:利用数据库主键去重 * 唯一ID:业务表的主键 * 指纹码:为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号或者标志位(具体视业务场景而定) ![](https://img.kancloud.cn/1a/ee/1aeef246a5f20211602f22c211b6fd9c_1673x56.png) * 优势 实现简单 * 弊端 高并发下有数据库写入的性能瓶颈 * 解决方案 根据ID进行分库分表算法路由 ***** **小结:** * 首先我们需要根据消息生成一个全局唯一ID,然后还需要加上一个指纹码。这个指纹码它并不一定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障这次操作是绝对唯一的。 * 将ID + 指纹码拼接好的值作为数据库主键,就可以进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操作,如果有就代表已经被消费了,就不需要管了