企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 队列 * * * * * **简介** 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。 实现高性能,高可用,可伸缩和最终一致性架构。 是大型分布式系统不可缺少的中间件。 场景:异步处理、应用解耦、流量削锋、日志处理 * * * 以上就是关于为什么要使用队列的大致说明 下面介绍在OneBase使用TP官方开发的队列think-queue。 首先需要仔细理解下面这张图 ![](https://img.kancloud.cn/be/3c/be3c0c9e5690c586ebb6e9c3a74c9005_1200x533.png) 执行流程 1. 命令行`Command`开始监听队列`queue:work` 2. 执行进程`Worker`获取新消息`Queue::pop()` 3. 消息队列`Queue`返回一个可用的`Job`实例`$job` 3.1 生产者推送`Queue::push()`新消息到消息队列`Queue` 3.2 消息队列`Queue`返回是否推送成功给生产者 4. 执行进程`Worker`调用`$job`的`fire()`方法 5. 消息`Job`解析`$job`的`payload`,实例化一个消费者,并调用消费者实例的`fire($job, $data)`方法。 6. 消费者读取消息内容`$data`,处理业务逻辑,删除或重发该消息 `$job->delete()` 或 `$job->release()`。 7. 消息`Job`从Database或Redis中删除消息或重发消息 8. 消息`Job`返回消息处理结果给执行进程`Worker` 9. 执行进程`Worker`在终端输出响应或结束运行 **一、队列配置** 队列配置在app目录下config.php配置文件中,可以配置驱动,建议Redis。 此处为演示方便使用的数据库驱动。 ``` // +---------------------------------------------------------------------- // | 队列配置 // +---------------------------------------------------------------------- 'queue' => [ // sync驱动表示取消消息队列还原为同步执行 // 'connector' => 'Sync', // Redis驱动 // 'connector' => 'redis', // "expire"=>60,//任务过期时间默认为秒,禁用为null // "default"=>"default",//默认队列名称 // "host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址 // "port"=>Env::get("redis.port", 6379),//Redis端口 // "password"=>Env::get("redis.password", "123456"),//Redis密码 // "select"=>5,//Redis数据库索引 // "timeout"=>0,//Redis连接超时时间 // "persistent"=>false,//是否长连接 // Database驱动 "type"=>"Database",//数据库驱动 "expire"=>60,//任务过期时间,单位为秒,禁用为null "default"=>"default",//默认队列名称 "table"=>"jobs",//存储消息的表明,不带前缀 "dsn"=>[], ] ``` **二、消息创建与推送** ``` // 将任务加入队列 public function push() { $job_data = []; $job_data["member_id"] = time(); $job_data["to_member_id"] = time(); $job_data["params"] = ['xx' => 'cc', 'vv' => 'bb']; $is_pushed = Queue::push("app\queue\controller\Test", $job_data, 'test_job_queue'); if($is_pushed !== false ) { echo date("Y-m-d H:i:s")." a new job is pushed to the message queue"; } else { echo date("Y-m-d H:i:s")." a new job pushed fail"; } } ``` $job_data 是队列中依赖的业务数据 app\queue\controller\Test 是处理队列数据的类路径 test_job_queue 是队列的名称 **三、启动消息队列** ![](https://img.kancloud.cn/c2/46/c2466278f15aa69d7d5e5af3fabeee5a_723x72.png) 此时执行多次消息推送代码。 ![](https://img.kancloud.cn/2f/13/2f130a1e7b1f91e34ecc635373d1dbb9_642x134.png) ![](https://img.kancloud.cn/39/79/3979c08f19032be2ff6a6982108e02af_1287x400.png) 可以看到 消息已经入库,但是没有处理。 **四、消费与删除** 打开 queue 模块下的控制器与逻辑目录,参考Test与TestLogic 来编写相关数据处理业务逻辑代码。 ![](https://img.kancloud.cn/b2/55/b255a4b88ef6cf2fe14bb312603a05cc_741x666.png) **五、启动监听处理** 命令行执行:php think queue:listen --queue test_job_queue ![](https://img.kancloud.cn/03/0c/030c1df55cb583e410d383ad235d6e36_798x627.png) 就能看到 数据表中的任务迅速消失,此时再调用消息创建推送到表中,也会迅速处理。 到此队列基本上算是使用起来了,任务种类不同比如批量下载任务和批量消息发送任务 可以写多个不同的消费者类进行处理。^_^