合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
>[info] topic 通配符模式 按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。 Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ \* ”与“#”,用于做模糊匹配,其中“\*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。 ![](https://img.kancloud.cn/d5/8a/d58a1571e87e677f57dc584eb590a299_967x441.png) ![](https://img.kancloud.cn/37/ec/37eccf9f1c43019dfd821f8d8129fde0_1002x396.png) 1. `rabbitmq.php` 配置文件信息 ~~~ <?php // rabbitmq 配置信息 return [ # 连接信息 'amqp' => [ 'host' => '127.0.0.1', 'port'=>'5672', 'user'=>'guest', 'password'=>'guest', 'vhost'=>'/' ], # 通配符队列 'topic_queue' => [ 'exchange_name' => 'topic_exchange', 'exchange_type'=>'topic',# 通配符模式 'queue_name' => 'topic_queue', 'route_key' => 'topic.goods.order',# 模糊匹配 'consumer_tag' => 'topic' ], # 商品队列 'goods_queue' => [ 'exchange_name' => 'goods_topic_exchange', 'exchange_type'=>'topic',# 通配符模式 'queue_name' => 'goods_queue', 'route_key' => '*.goods.*',# 模糊匹配 'consumer_tag' => 'goods' ], # 订单队列 'order_queue' => [ 'exchange_name' => 'order_topic_exchange', 'exchange_type'=>'topic',# 通配符模式 'queue_name' => 'order_queue', 'route_key' => '#.order',# 模糊匹配 'consumer_tag' => 'order' ], ]; ~~~ 2. `Consumer` 消费者 ~~~ <?php declare (strict_types = 1); namespace app\common\service\rabbitmq; //use app\common\model\test\MessageModel; use PhpAmqpLib\Connection\AMQPStreamConnection; use think\facade\Log; /** * 消费者 */ class Consumer { /** * 商品消费者 * @return \think\Response */ public function goods() { $mqConfig = config('rabbitmq.amqp'); $goodsConfig = config('rabbitmq.goods_queue'); // 创建连接 $connection = new AMQPStreamConnection( $mqConfig['host'], $mqConfig['port'], $mqConfig['user'], $mqConfig['password'], $mqConfig['vhost'] ); // 连接信道 $channel = $connection->channel(); // 设置消费者(Consumer)客户端同时只处理一条队列 // 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer), // 直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。 // 消费者端要把自动确认autoAck设置为false,basic_qos才有效。 // 流量控制 $channel->basic_qos(0, 1, false); // 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致 // 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理 // 创建交换机(Exchange) // $channel->exchange_declare($goodsConfig['exchange_name'], $goodsConfig['exchange_type'], false, true, false); // 创建队列 $channel->queue_declare($goodsConfig['queue_name'], false, true, false, false); // 绑定队列和交换机 // $channel->queue_bind($goodsConfig['queue_name'], $goodsConfig['exchange_name'], $goodsConfig['route_key']); /** * 消费消息 * * queue: queue_name 被消费的队列名称 * consumer_tag: consumer_tag 消费者客户端身份标识,用于区分多个客户端 * no_local: false 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现 * no_ack: true 收到消息后,是否不需要回复确认即被认为被消费 * exclusive: false 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下 * nowait: false 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错 * callback: $callback 回调逻辑处理函数 */ $channel->basic_consume($goodsConfig['queue_name'], $goodsConfig['consumer_tag'], false, false, false, false, array($this, 'process_message')); // 退出,执行shutdown来关闭通道与连接 register_shutdown_function(array($this, 'shutdown'), $channel, $connection); // 阻塞队列监听事件 while (count($channel->callbacks)) { $channel->wait(); } } /** * 订单消费者 * @return \think\Response */ public function order() { $mqConfig = config('rabbitmq.amqp'); $orderConfig = config('rabbitmq.order_queue'); // 创建连接 $connection = new AMQPStreamConnection( $mqConfig['host'], $mqConfig['port'], $mqConfig['user'], $mqConfig['password'], $mqConfig['vhost'] ); // 连接信道 $channel = $connection->channel(); // 设置消费者(Consumer)客户端同时只处理一条队列 // 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer), // 直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。 // 消费者端要把自动确认autoAck设置为false,basic_qos才有效。 // 流量控制 $channel->basic_qos(0, 1, false); // 同样是创建路由和队列,以及绑定路由队列,注意要跟producer(生产者)的一致 // 这里其实可以不用设置,但是为了防止队列没有被创建所以做的容错处理 // 创建交换机(Exchange) // $channel->exchange_declare($orderConfig['exchange_name'], $orderConfig['exchange_type'], false, true, false); // 创建队列 $channel->queue_declare($orderConfig['queue_name'], false, true, false, false); // 绑定队列和交换机 // $channel->queue_bind($orderConfig['queue_name'], $orderConfig['exchange_name'], $orderConfig['route_key']); /** * 消费消息 * * queue: queue_name 被消费的队列名称 * consumer_tag: consumer_tag 消费者客户端身份标识,用于区分多个客户端 * no_local: false 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现 * no_ack: true 收到消息后,是否不需要回复确认即被认为被消费 * exclusive: false 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下 * nowait: false 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错 * callback: $callback 回调逻辑处理函数 */ $channel->basic_consume($orderConfig['queue_name'], $orderConfig['consumer_tag'], false, false, false, false, array($this, 'process_message')); // 退出,执行shutdown来关闭通道与连接 register_shutdown_function(array($this, 'shutdown'), $channel, $connection); // 阻塞队列监听事件 while (count($channel->callbacks)) { $channel->wait(); } } /** * 消息处理 * @param $message */ public function process_message($message) { if ($message->body !== 'quit') { $messageBody = json_decode($message->body); // 自定义的消息类型 if (!isset($messageBody->message_type)) { Log::write("error data:" . $message->body, 2); } else { // $messageModel = new MessageModel(); try { // 消息 Log::write("message_data:" . json_encode($message, JSON_UNESCAPED_UNICODE)); $body = json_decode($message->body, true); dump("消费消息如下:"); dump($body); // $messageModel->test($body); } catch (\Think\Exception $e) { Log::write($e->getMessage(), 2); Log::write(json_encode($message), 2); } catch (\PDOException $pe) { Log::write($pe->getMessage(), 2); Log::write(json_encode($message), 2); } } } // 手动确认ack,确保消息已经处理 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); // Send a message with the string "quit" to cancel the consumer. if ($message->body === 'quit') { $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']); } } /** * 关闭进程 * @param $channel * @param $connection */ public function shutdown($channel, $connection) { // 关闭信道 $channel->close(); // 关闭连接 $connection->close(); } } ~~~ 3. `Producer` 生产者 ~~~ <?php declare (strict_types = 1); namespace app\common\service\rabbitmq; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * rabbitmq 生产者 */ class Producer { // 连接 protected $connection; // 管道 protected $channel; // 配置内容(基本) protected $mqConfig; // 配置内容(通配符) protected $topicConfig; protected $goodsConfig; protected $orderConfig; public function __construct() { $this->mqConfig = config('rabbitmq.amqp'); $this->topicConfig = config('rabbitmq.topic_queue'); $this->goodsConfig = config('rabbitmq.goods_queue'); $this->orderConfig = config('rabbitmq.order_queue'); // 创建连接 $this->connection = new AMQPStreamConnection( $this->mqConfig['host'], $this->mqConfig['port'], $this->mqConfig['user'], $this->mqConfig['password'], $this->mqConfig['vhost'] // 虚拟主机(起到消息隔离的作用) ); // 创建通道 $this->channel = $this->connection->channel(); } /** * 发送消息 * @param $data 消息内容 */ public function send($data) { /* * 流量控制 Specifies QoS * 消费者在开启acknowledge的情况下,对接收到的消息需要异步对消息进行确认 * 由于消费者自身处理能力有限,从rabbitmq获取一定数量的消息后,希望rabbitmq不再将队列中的消息推送过来, * 当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接收来自队列的消息 * @param int $prefetch_size 最大unacked消息的字节数 * @param int $prefetch_count 最大unacked消息的条数 * @param bool $a_global 上述限制的限定对象,false限制单个消费者,true限制整个通道 * @return mixed */ $this->channel->basic_qos(0, 1, false); /** * creatingAQueue * name: hello 队列名称 * passive: false 如果设置true存在则返回OK,否则就报错。设置false存在返回OK,不存在则自动创建 * durable: true 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列,服务重启后也会存在,因为服务会把持久化的queue存放到磁盘上当服务重启的时候,会重新加载之前被持久化的queue * exclusive: false 是否排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除 * auto_delete: false 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除 */ $this->channel->queue_declare($this->topicConfig['queue_name'], false, true, false, false); /** * 创建交换机(Exchange) * name: hello 交换机名称 * type: direct 交换机类型,分别为direct/topic/topic,参考另外文章的Exchange Type说明。 * passive: false Return OK if set true exists, otherwise an error is reported. If false exists, OK is returned. If it does not exist, it is created automatically * durable: false 是否持久化,设置false是存放到内存中的,RabbitMQ重启后会丢失 * auto_delete: false 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除 */ $this->channel->exchange_declare($this->topicConfig['exchange_name'], $this->topicConfig['exchange_type'], false, true, false); // 绑定消息交换机和队列 $this->channel->queue_bind($this->topicConfig['queue_name'], $this->topicConfig['exchange_name'], $this->topicConfig['route_key']); $this->channel->queue_bind($this->goodsConfig['queue_name'], $this->topicConfig['exchange_name'], $this->goodsConfig['route_key']); $this->channel->queue_bind($this->orderConfig['queue_name'], $this->topicConfig['exchange_name'], $this->orderConfig['route_key']); // 将要发送数据变为json字符串 $messageBody = json_encode($data, JSON_UNESCAPED_UNICODE); /** * 创建AMQP消息类型 * delivery_mode 消息是否持久化 * AMQPMessage::DELIVERY_MODE_NON_PERSISTENT 不持久化 * AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化 */ $message = new AMQPMessage($messageBody, [ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); /** * 发送消息 * msg: $message AMQP消息内容 * exchange: vckai_exchange 交换机名称 * routing_key: hello 路由key */ $this->channel->basic_publish($message, $this->topicConfig['exchange_name'], $this->topicConfig['route_key']); // 关闭连接 $this->stop(); } // 关闭连接 public function stop() { $this->channel->close(); $this->connection->close(); } } ~~~ 4. 创建 `goods` 和 `order` 自定义命令,作为消费者。 同 fanout 订阅/广播模式 5. 配置 `config/console.php` 命令行 同 fanout 订阅/广播模式 6. 路由发送消息 `route/app.php` 同 fanout 订阅/广播模式 7. 启动消费者 同 fanout 订阅/广播模式 8. 生产者发送消息 同 fanout 订阅/广播模式 ![](https://img.kancloud.cn/24/01/24015d8b7d220ef3ed9cdc9e500a02a1_694x287.png) ![](https://img.kancloud.cn/8e/86/8e86ce4f1d6efe5ee09a25e9eeed4f4d_707x283.png)