合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
``` 让Workerman作为客户端向远程服务端发起异步连接,并通过send接口和onMessage回调异步发送和处理连接上的数据。 ``` >[danger]AsyncTcpConnection继承至TcpConnection,所以TcpConnection的方法合属性也可以用在AsyncTcpConnection中使用 目前AsyncTcpConnection支持的协议有tcp、ssl、ws、frame、text、自定义协议。目前不支持http协议 其中ssl要求Workerman>=3.3.4,并安装openssl扩展 [TOC] 常量: ``` { "READ_BUFFER_SIZE": 65535, "STATUS_INITIAL": 0, "STATUS_CONNECTING": 1, "STATUS_ESTABLISHED": 2, "STATUS_CLOSING": 4, "STATUS_CLOSED": 8 } ``` 属性: json_encode(get_object_vars($connection), JSON_UNESCAPED_UNICODE); ``` { "onMessage": { }, "onClose": { }, "onError": { }, "onBufferFull": { }, "onBufferDrain": { }, "protocol": null, "transport": "tcp", "worker": null, "bytesRead": 0, "bytesWritten": 0, "id": 1, "maxSendBufferSize": 1024, "context": { }, "maxPackageSize": 10485760, "onConnect": { } } ``` json_encode(get_class_vars(get_class($connection)), JSON_UNESCAPED_UNICODE); ``` { "onConnect": null, "transport": "tcp", "onMessage": null, "onClose": null, "onError": null, "onBufferFull": null, "onBufferDrain": null, "protocol": null, "worker": null, "bytesRead": 0, "bytesWritten": 0, "id": 0, "maxSendBufferSize": 1048576, "context": null, "maxPackageSize": 1048576, "defaultMaxSendBufferSize": 1048576, "defaultMaxPackageSize": 10485760, "connections": [ ], "_statusToString": { "0": "INITIAL", "1": "CONNECTING", "2": "ESTABLISHED", "4": "CLOSING", "8": "CLOSED" }, "statistics": { "connection_count": 0, "total_request": 0, "throw_exception": 0, "send_fail": 0 } } ``` json_encode($r->getProperties(\ReflectionMethod::IS_PUBLIC), JSON_UNESCAPED_UNICODE); ``` [ { "name": "onConnect", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "transport", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "onMessage", "class": "Workerman\Connection\TcpConnection" }, { "name": "onClose", "class": "Workerman\Connection\TcpConnection" }, { "name": "onError", "class": "Workerman\Connection\TcpConnection" }, { "name": "onBufferFull", "class": "Workerman\Connection\TcpConnection" }, { "name": "onBufferDrain", "class": "Workerman\Connection\TcpConnection" }, { "name": "protocol", "class": "Workerman\Connection\TcpConnection" }, { "name": "worker", "class": "Workerman\Connection\TcpConnection" }, { "name": "bytesRead", "class": "Workerman\Connection\TcpConnection" }, { "name": "bytesWritten", "class": "Workerman\Connection\TcpConnection" }, { "name": "id", "class": "Workerman\Connection\TcpConnection" }, { "name": "maxSendBufferSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "context", "class": "Workerman\Connection\TcpConnection" }, { "name": "defaultMaxSendBufferSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "maxPackageSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "defaultMaxPackageSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "connections", "class": "Workerman\Connection\TcpConnection" }, { "name": "_statusToString", "class": "Workerman\Connection\TcpConnection" }, { "name": "statistics", "class": "Workerman\Connection\ConnectionInterface" } ] ``` 方法 json_encode(get_class_methods($connection), JSON_UNESCAPED_UNICODE); ``` [ "__construct", "connect", "reconnect", "cancelReconnect", "getRemoteHost", "getRemoteURI", "checkConnection", "getStatus", "send", "getRemoteIp", "getRemotePort", "getRemoteAddress", "getLocalIp", "getLocalPort", "getLocalAddress", "getSendBufferQueueSize", "getRecvBufferQueueSize", "isIpV4", "isIpV6", "pauseRecv", "resumeRecv", "baseRead", "baseWrite", "doSslHandshake", "pipe", "consumeRecvBuffer", "close", "getSocket", "bufferIsEmpty", "destroy", "__destruct" ] ``` //$r = new \ReflectionObject($connection); json_encode($r->getMethods(\ReflectionMethod::IS_PUBLIC), JSON_UNESCAPED_UNICODE); ``` [ { "name": "__construct", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "connect", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "reconnect", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "cancelReconnect", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "getRemoteHost", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "getRemoteURI", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "checkConnection", "class": "Workerman\Connection\AsyncTcpConnection" }, { "name": "getStatus", "class": "Workerman\Connection\TcpConnection" }, { "name": "send", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRemoteIp", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRemotePort", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRemoteAddress", "class": "Workerman\Connection\TcpConnection" }, { "name": "getLocalIp", "class": "Workerman\Connection\TcpConnection" }, { "name": "getLocalPort", "class": "Workerman\Connection\TcpConnection" }, { "name": "getLocalAddress", "class": "Workerman\Connection\TcpConnection" }, { "name": "getSendBufferQueueSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "getRecvBufferQueueSize", "class": "Workerman\Connection\TcpConnection" }, { "name": "isIpV4", "class": "Workerman\Connection\TcpConnection" }, { "name": "isIpV6", "class": "Workerman\Connection\TcpConnection" }, { "name": "pauseRecv", "class": "Workerman\Connection\TcpConnection" }, { "name": "resumeRecv", "class": "Workerman\Connection\TcpConnection" }, { "name": "baseRead", "class": "Workerman\Connection\TcpConnection" }, { "name": "baseWrite", "class": "Workerman\Connection\TcpConnection" }, { "name": "doSslHandshake", "class": "Workerman\Connection\TcpConnection" }, { "name": "pipe", "class": "Workerman\Connection\TcpConnection" }, { "name": "consumeRecvBuffer", "class": "Workerman\Connection\TcpConnection" }, { "name": "close", "class": "Workerman\Connection\TcpConnection" }, { "name": "getSocket", "class": "Workerman\Connection\TcpConnection" }, { "name": "bufferIsEmpty", "class": "Workerman\Connection\TcpConnection" }, { "name": "destroy", "class": "Workerman\Connection\TcpConnection" }, { "name": "__destruct", "class": "Workerman\Connection\TcpConnection" } ] ``` ## **异步访问外部http服务** ``` require_once __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Connection\TcpConnection; use Workerman\Connection\AsyncTcpConnection; $task = new Worker(); // 进程启动时异步建立一个到www.baidu.com连接对象,并发送数据获取数据 $task->onWorkerStart = function($task){ // 不支持直接指定http,但是可以用tcp模拟http协议发送数据 $connection_to_baidu = new AsyncTcpConnection('tcp://www.baidu.com:80'); // 当连接建立成功时,发送http请求数据 $connection_to_baidu->onConnect = function(AsyncTcpConnection $connection_to_baidu){ echo "connect success\n"; $connection_to_baidu->send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: keep-alive\r\n\r\n"); }; $connection_to_baidu->onMessage = function(AsyncTcpConnection $connection_to_baidu, $http_buffer){ echo $http_buffer; }; $connection_to_baidu->onClose = function(AsyncTcpConnection $connection_to_baidu){ echo "connection closed\n"; }; $connection_to_baidu->onError = function(AsyncTcpConnection $connection_to_baidu, $code, $msg){ echo "Error code:$code msg:$msg\n"; }; $connection_to_baidu->connect(); }; Worker::runAll(); ``` ## **示例 2、异步访问外部websocket服务,并设置以哪个本地ip及端口访问** ``` require_once __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Connection\TcpConnection; use Workerman\Connection\AsyncTcpConnection; $worker = new Worker(); $worker->onWorkerStart = function($worker) { // 如果你想只在一个进程里发起连接,可以通过判断$worker->id来做到,例如下面是只在0号进程发起连接 if ($worker->id != 0) { return; } // 设置访问对方主机的本地ip及端口(每个socket连接都会占用一个本地端口) $context_option = array( 'socket' => array( // ip必须是本机网卡ip,并且能访问对方主机,否则无效 'bindto' => '114.215.84.87:2333', ), ); $con = new AsyncTcpConnection('ws://echo.websocket.org:80', $context_option); $con->onConnect = function(AsyncTcpConnection $con) { $con->send('hello'); }; $con->onMessage = function(AsyncTcpConnection $con, $data) { echo $data; }; $con->connect(); }; Worker::runAll(); ``` 示例 3、异步访问外部wss端口,并设置本地ssl证书 ``` require_once __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Connection\TcpConnection; use Workerman\Connection\AsyncTcpConnection; $worker = new Worker(); $worker->onWorkerStart = function($worker){ // 设置访问对方主机的本地ip及端口以及ssl证书 $context_option = array( 'socket' => array( // ip必须是本机网卡ip,并且能访问对方主机,否则无效 'bindto' => '114.215.84.87:2333', ), // ssl选项,参考https://php.net/manual/zh/context.ssl.php 'ssl' => array( // 本地证书路径。 必须是 PEM 格式,并且包含本地的证书及私钥。 'local_cert' => '/your/path/to/pemfile', // local_cert 文件的密码。 'passphrase' => 'your_pem_passphrase', // 是否允许自签名证书。 'allow_self_signed' => true, // 是否需要验证 SSL 证书。 'verify_peer' => false ) ); // 发起异步连接 $con = new AsyncTcpConnection('ws://echo.websocket.org:443', $context_option); // 设置以ssl加密方式访问 $con->transport = 'ssl'; $con->onConnect = function(AsyncTcpConnection $con) { $con->send('hello'); }; $con->onMessage = function(AsyncTcpConnection $con, $data) { echo $data; }; $con->connect(); }; Worker::runAll(); ``` ## **Mysql代理** `mysql -uroot -P4406 -h127.0.0.1 -p` 代理后后台链接数据库时将3306换成4406也能成功登陆了 ``` require_once __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Connection\TcpConnection; use Workerman\Connection\AsyncTcpConnection; // 真实的mysql地址,假设这里是本机3306端口 $REAL_MYSQL_ADDRESS = 'tcp://127.0.0.1:3306'; // 代理监听本地4406端口 $proxy = new Worker('tcp://0.0.0.0:4406'); $proxy->onConnect = function(TcpConnection $connection){ global $REAL_MYSQL_ADDRESS; // 异步建立一个到实际mysql服务器的连接 $connection_to_mysql = new AsyncTcpConnection($REAL_MYSQL_ADDRESS); // mysql连接发来数据时,转发给对应客户端的连接 $connection_to_mysql->onMessage = function(AsyncTcpConnection $connection_to_mysql, $buffer)use($connection){ $connection->send($buffer); }; // mysql连接关闭时,关闭对应的代理到客户端的连接 $connection_to_mysql->onClose = function(AsyncTcpConnection $connection_to_mysql)use($connection){ $connection->close(); }; // mysql连接上发生错误时,关闭对应的代理到客户端的连接 $connection_to_mysql->onError = function(AsyncTcpConnection $connection_to_mysql)use($connection){ $connection->close(); }; // 执行异步连接 $connection_to_mysql->connect(); // 客户端发来数据时,转发给对应的mysql连接 $connection->onMessage = function(TcpConnection $connection, $buffer)use($connection_to_mysql) { $connection_to_mysql->send($buffer); }; // 客户端连接断开时,断开对应的mysql连接 $connection->onClose = function(TcpConnection $connection)use($connection_to_mysql) { $connection_to_mysql->close(); }; // 客户端连接发生错误时,断开对应的mysql连接 $connection->onError = function(TcpConnection $connection)use($connection_to_mysql) { $connection_to_mysql->close(); }; }; // 运行worker Worker::runAll(); ``` ## **reConnect重新连接** ~~~php use Workerman\Worker; use Workerman\Connection\AsyncTcpConnection; use Workerman\Connection\TcpConnection; require_once __DIR__ . '/vendor/autoload.php'; $worker = new Worker(); $worker->onWorkerStart = function($worker){ $con = new AsyncTcpConnection('ws://echo.websocket.org:80'); $con->onConnect = function(AsyncTcpConnection $con) { $con->send('hello'); }; $con->onMessage = function(AsyncTcpConnection $con, $msg) { echo "recv $msg\n"; }; $con->onClose = function(AsyncTcpConnection $con) { // 如果连接断开,则在1秒后重连 $con->reConnect(1); }; $con->connect(); }; Worker::runAll(); ~~~ 调用reconnect成功重连后,$con的onConnect方法会再次被调用(如果有设置的话)。有时候我们想让onConnect方法只执行一次,重连的时候不再执行,参考如下例子: ~~~php use Workerman\Worker; use Workerman\Connection\AsyncTcpConnection; use Workerman\Connection\TcpConnection; require_once __DIR__ . '/vendor/autoload.php'; $worker = new Worker(); $worker->onWorkerStart = function($worker){ $con = new AsyncTcpConnection('ws://echo.websocket.org:80'); $con->onConnect = function(AsyncTcpConnection $con) { static $is_first_connect = true; if (!$is_first_connect) return; $is_first_connect = false; $con->send('hello'); }; $con->onMessage = function(AsyncTcpConnection $con, $msg) { echo "recv $msg\n"; }; $con->onClose = function(AsyncTcpConnection $con) { // 如果连接断开,则在1秒后重连 $con->reConnect(1); }; $con->connect(); }; Worker::runAll(); ~~~ ## **transport:设置传输属性,可选值为tcp和ssl,默认是tcp** ``` require_once __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Connection\TcpConnection; use Workerman\Connection\AsyncTcpConnection; $task = new Worker(); // 进程启动时异步建立一个到www.baidu.com连接对象,并发送数据获取数据 $task->onWorkerStart = function($task){ $connection_to_baidu = new AsyncTcpConnection('tcp://www.baidu.com:443'); // 设置为ssl加密连接 $connection_to_baidu->transport = 'ssl'; $connection_to_baidu->onConnect = function(AsyncTcpConnection $connection_to_baidu){ echo "connect success\n"; $connection_to_baidu->send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: keep-alive\r\n\r\n"); }; $connection_to_baidu->onMessage = function(AsyncTcpConnection $connection_to_baidu, $http_buffer){ echo $http_buffer; }; $connection_to_baidu->onClose = function(AsyncTcpConnection $connection_to_baidu){ echo "connection closed\n"; }; $connection_to_baidu->onError = function(AsyncTcpConnection $connection_to_baidu, $code, $msg){ echo "Error code:$code msg:$msg\n"; }; $connection_to_baidu->connect(); }; // 运行worker Worker::runAll(); ```