💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、豆包、星火、月之暗面及文生图、文生视频 广告
# [Connection类提供的接口](https://www.workerman.net/doc/workerman/tcp-connection.html#Connection%E7%B1%BB%E6%8F%90%E4%BE%9B%E7%9A%84%E6%8E%A5%E5%8F%A3) Workerman中有两个重要的类Worker与Connection。 每个客户端连接对应一个Connection对象,可以设置对象的onMessage、onClose等回调,同时提供了向客户端发送数据send接口与关闭连接close接口,以及其它一些必要的接口。 可以说Worker是一个监听容器,负责接受客户端连接,并把连接包装成connection对象式提供给开发者操作。· >[danger]TcpConnection没有connect()方法,一般不用来进行手动实例化,一般出现在回调属性里如:`onConnect = function(TcpConnection $connection)` ``` use Workerman\Worker; use Workerman\Connection\TcpConnection; use Workerman\Connection\AsyncTcpConnection; require_once __DIR__ . '/vendor/autoload.php'; $worker = new Worker('tcp://0.0.0.0:8686'); $worker->onMessage = function($connection, $data) { // $c_str=var_export(json_encode($connection,JSON_UNESCAPED_UNICODE),true); // $c_str=json_encode(get_class_methods($connection), JSON_UNESCAPED_UNICODE); $c_str=json_encode(get_object_vars($connection), JSON_UNESCAPED_UNICODE); $c_str=json_encode(get_class_vars(get_class($connection)), JSON_UNESCAPED_UNICODE); // $r = new \ReflectionClass($connection); $r = new \ReflectionObject($connection); $c_str=json_encode($r->getConstants(), JSON_UNESCAPED_UNICODE); $c_str=json_encode($r->getProperties(\ReflectionMethod::IS_PUBLIC), JSON_UNESCAPED_UNICODE); $c_str1=json_encode($r->getMethods(\ReflectionMethod::IS_PUBLIC), JSON_UNESCAPED_UNICODE); // file_put_contents("./aaa.json", $c_str); // $c_str=var_export(json_encode(['a'=>1,'b'=>2],JSON_UNESCAPED_UNICODE),true); $connection->send('接受客户端消息成功'.$c_str); }; ``` **属性** json_encode($r->getProperties(\ReflectionMethod::IS_PUBLIC), JSON_UNESCAPED_UNICODE); ``` [ { "name": "onMessage", "class": "Workerman\Connection\TcpConnection" }, { "name": "onClose", "class": "Workerman\Connection\TcpConnection" }, { "name": "onError", "class": "Workerman\Connection\TcpConnection" }, { "name": "onBufferFull", "class": "Workerman\Connection\TcpConnection" }, { "name": "onBufferDrain", "class": "Workerman\Connection\TcpConnection" }, { "name": "protocol", "class": "Workerman\Connection\TcpConnection" }, { "name": "transport", "class": "Workerman\Connection\TcpConnection" }, { "name": "worker", "class": "Workerman\Connection\TcpConnection" }, { "name": "bytesRead", "class": "Workerman\Connection\TcpConnection" }, { "name": "bytesWritten", "class": "Workerman\Connection\TcpConnection" }, { "name": "id", "class": "Workerman\Connection\TcpConnection" }, { "name": "maxSendBufferSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "context", "class": "Workerman\Connection\TcpConnection" }, { "name": "defaultMaxSendBufferSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "maxPackageSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "defaultMaxPackageSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "connections", "class": "Workerman\Connection\TcpConnection" }, { "name": "_statusToString", "class": "Workerman\Connection\TcpConnection" }, { "name": "statistics", "class": "Workerman\Connection\ConnectionInterface" } ] ``` json_encode(get_object_vars($connection), JSON_UNESCAPED_UNICODE);查询不到$connection数据 json_encode(get_class_vars(get_class($connection)), JSON_UNESCAPED_UNICODE); ``` { "onMessage": null, "onClose": null, "onError": null, "onBufferFull": null, "onBufferDrain": null, "protocol": null, "transport": "tcp", "worker": null, "bytesRead": 0, "bytesWritten": 0, "id": 0, "maxSendBufferSize": 1048576, "context": null, "maxPackageSize": 1048576, "defaultMaxSendBufferSize": 1048576, "defaultMaxPackageSize": 10485760, "connections": [ ], "_statusToString": { "0": "INITIAL", "1": "CONNECTING", "2": "ESTABLISHED", "4": "CLOSING", "8": "CLOSED" }, "statistics": { "connection_count": 0, "total_request": 0, "throw_exception": 0, "send_fail": 0 } } ``` 方法 json_encode($r->getMethods(\ReflectionMethod::IS_PUBLIC), JSON_UNESCAPED_UNICODE); ``` [ { "name": "__construct", "class": "Workerman\Connection\TcpConnection" }, { "name": "getStatus", "class": "Workerman\Connection\TcpConnection" }, { "name": "send", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRemoteIp", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRemotePort", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRemoteAddress", "class": "Workerman\Connection\TcpConnection" }, { "name": "getLocalIp", "class": "Workerman\Connection\TcpConnection" }, { "name": "getLocalPort", "class": "Workerman\Connection\TcpConnection" }, { "name": "getLocalAddress", "class": "Workerman\Connection\TcpConnection" }, { "name": "getSendBufferQueueSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRecvBufferQueueSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "isIpV4", "class": "Workerman\Connection\TcpConnection" }, { "name": "isIpV6", "class": "Workerman\Connection\TcpConnection" }, { "name": "pauseRecv", "class": "Workerman\Connection\TcpConnection" }, { "name": "resumeRecv", "class": "Workerman\Connection\TcpConnection" }, { "name": "baseRead", "class": "Workerman\Connection\TcpConnection" }, { "name": "baseWrite", "class": "Workerman\Connection\TcpConnection" }, { "name": "doSslHandshake", "class": "Workerman\Connection\TcpConnection" }, { "name": "pipe", "class": "Workerman\Connection\TcpConnection" }, { "name": "consumeRecvBuffer", "class": "Workerman\Connection\TcpConnection" }, { "name": "close", "class": "Workerman\Connection\TcpConnection" }, { "name": "getSocket", "class": "Workerman\Connection\TcpConnection" }, { "name": "bufferIsEmpty", "class": "Workerman\Connection\TcpConnection" }, { "name": "destroy", "class": "Workerman\Connection\TcpConnection" }, { "name": "__destruct", "class": "Workerman\Connection\TcpConnection" } ] ``` json_encode(get_class_methods($connection), JSON_UNESCAPED_UNICODE); ``` [ "__construct", "getStatus", "send", "getRemoteIp", "getRemotePort", "getRemoteAddress", "getLocalIp", "getLocalPort", "getLocalAddress", "getSendBufferQueueSize", "getRecvBufferQueueSize", "isIpV4", "isIpV6", "pauseRecv", "resumeRecv", "baseRead", "baseWrite", "doSslHandshake", "pipe", "consumeRecvBuffer", "close", "getSocket", "bufferIsEmpty", "destroy", "__destruct" ] ``` 常量 json_encode($r->getConstants(), JSON_UNESCAPED_UNICODE); ``` { "READ_BUFFER_SIZE": 65535, "STATUS_INITIAL": 0, "STATUS_CONNECTING": 1, "STATUS_ESTABLISHED": 2, "STATUS_CLOSING": 4, "STATUS_CLOSED": 8 } ``` 示例 ``` use Workerman\Worker; use Workerman\Connection\TcpConnection; require_once __DIR__ . '/vendor/autoload.php'; // 设置所有连接的默认应用层发送缓冲区大小默认1024 TcpConnection::$defaultMaxSendBufferSize = 2*1024*1024; // 每个连接能够接收的最大包包长。不设置默认为10MB // 设置每个连接接收的数据包最大为1024000字节 TcpConnection::$defaultMaxPackageSize = 1024000; $worker = new Worker('websocket://0.0.0.0:8484'); #当客户端与Workerman建立连接时(TCP三次握手完成后)触发的回调函数。每个连接只会触发一次onConnect回调. #在onConnect事件里无法确认对方是谁。要想知道对方是谁,需要客户端发送鉴权数据,例如某个token或者用户名密码之类,在onMessage回调里做鉴权. #udp是无连接的,所以当使用udp协议时不会触发onConnect回调,也不会触发onClose回调 $worker->onConnect = function(TcpConnection $connection) { //连接的id 每个进程内部会维护一个自增的connection id,所以多个进程(设置$worker->count的值大于1)之间的connection id会有重复 如果想要不重复的connection id 可以根据需要给connection->id重新赋值,例如加上worker->id前缀 echo $worker->id . $connection->id; //获得该连接的客户端ip echo $connection->getRemoteIp() . "\n"; // 获得该连接的客户端端口 echo $connection->getRemotePort() ."\n"; // 设置当前连接的协议类 $connection->protocol = 'Workerman\\Protocols\\Http'; // 当一个客户端发来数据时,转发给当前进程所维护的其它所有客户端 foreach($connection->worker->connections as $con) { $con->send("111"); } // 设置当前连接的应用层发送缓冲区大小,默认Connection::defaultMaxSendBufferSize(1MB) // 设置当前连接的应用层发送缓冲区大小为102400字节 $connection->maxSendBufferSize = 102400; // 将当前连接的数据流导入到目标连接。内置了流量控制。此方法做TCP代理非常有用 // 建立本地80端口的异步连接 $connection_to_80 = new AsyncTcpConnection('tcp://127.0.0.1:80'); // 设置将当前客户端连接的数据导向80端口的连接 $connection->pipe($connection_to_80); // 设置80端口连接返回的数据导向客户端连接 $connection_to_80->pipe($connection); // 执行异步连接 $connection_to_80->connect(); // 使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用 $connection->pauseRecv(); // 使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用,对于上传流量控制非常有用 // 上面的pauseRecv停止接受数据,这里30秒后恢复接收数据 Timer::add(30, function($connection){ $connection->resumeRecv(); }, array($connection), false); // 可以在这里判断连接来源是否合法,不合法就关掉连接 // $_SERVER['HTTP_ORIGIN']标识来自哪个站点的页面发起的websocket连接 if($_SERVER['HTTP_ORIGIN'] != 'https://www.workerman.net'){ // 安全的关闭连接.调用close会等待发送缓冲区的数据发送完毕后才关闭连接,并触发连接的onClose回调 // 参数为可选参数,要发送的数据(如果有指定协议,则会自动调用协议的encode方法打包$data数据),当数据发送完毕后关闭连接,随后会触发onClose回调 $connection->close("hello\n"); // 立刻关闭连接。与close不同之处是,调用destroy后即使该连接的发送缓冲区还有数据未发送到对端,连接也会立刻被关闭,并立刻触发该连接的onClose回调。 $connection->destroy(); } // 设置连接的onMessage回调,作用与Worker::$onMessage回调相同,区别是只针对当前连接有效,也就是可以针对某个连接的设置onMessage回调 $connection->onMessage = function(TcpConnection $connection, $data) { var_dump($data); //send(mixed $data [,$raw = false]) // 默认第二个参数为false,在发送数据时会根据协议加载协议类库对数据进行转换在发送,如websocket协议时会自动调用\Workerman\Protocols\Websocket::encode打包成websocket协议数据后发送 // 第二个参数为true则不调用协议类转换数据而是直接发送原始数据 // 可以在$connection回调里也可以在$work回调里 $connection->send('receive success'); }; $connection->onError = function(TcpConnection $connection) { echo "connection onError\n"; }; // 设置连接的onClose回调,此回调与Worker::$onClose回调作用相同,区别是只针对当前连接有效,也就是可以针对某个连接的设置onClose回调 $connection->onClose = function(TcpConnection $connection) { echo "connection onClose\n"; }; // 作用与Worker::$onBufferFull回调相同,区别是只针对当前连接起作用,即可以单独设置某个连接的onBufferFull回调 $connection->onBufferFull = function(TcpConnection $connection) { echo "connection onBufferFull\n"; }; // 作用与Worker::$onBufferDrain回调相同,区别是只针对当前连接起作用,即可以单独设置某个连接的onBufferDrain回调 $connection->onBufferDrain = function(TcpConnection $connection) { echo "connection onBufferDrain\n"; }; }; #设置Worker子进程启动时的回调函数,每个子进程启动时都会执行 #注意:onWorkerStart是在子进程启动时运行的,如果开启了多个子进程($worker->count > 1),每个子进程运行一次,则总共会运行$worker->count次 $worker->onWorkerStart = function($worker){ #当前worker进程的id编号,范围为0到$worker->count-1 echo "进程id:{$worker->id} Worker starting...\n"; }; #当客户端通过连接发来数据时(Workerman收到数据时)触发的回调函数 $worker->onMessage = function($connection,$data){ $connection->send("我已收到你发来的数据:".$data); foreach ($connection->worker->connections as $connectionId => $cn) { $cn->send("我已收到客户端的数据:".$data); } }; // 运行worker Worker::runAll(); ```