企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持知识库和私有化部署方案 广告
[分享: 使用workerman实现基于UDP的异步SIP服务器,服务器端可主动发送UDP数据给客户端-workerman社区](https://www.workerman.net/a/1656) 自从使用workerman实现物联网终端接入以来,我工作中的所有网络场景(TCP\\UDP\\HTTP)等均使用workerman+channel以微服务方式实现,开发速度快,性能超级高。(几十万台设备同时接入都轻轻松松承受住) 之前多次关注过workerman的UDP服务器,但一没有实现我想要的结果 由于近期的业务需求,外加HTTP3 QUIC协议的广泛使用,workerman作为一个广泛使用的高性能PHP网络开发框架,支持持久化的UDP通信是很有必要的。 一直以来想通过workerman编写个基于UDP的SIP服务器和实现GB28181的国标协议,搭配SRS、ZLMediaKit或者monibuca,满足摄像头、硬盘录像机设备的接入,也可配合FreeSwitch实现基于SIP的语音通话或视频会议系统 workerman 主动发送udp数据 `https://www.workerman.net/q/2688` UDP服务器主动向客户端发送消息 `https://www.workerman.net/q/4284` 直到今天终于使用workerman 实现单进程或多进程方式监听某个UDP端口,主动从平台向客户端发送数据 并且所有功能均使用workerman的loop功能,能够发挥平台最大化性能 当进程只有一个时使用 socket 函数实现端口监听,当进程大于1个时使用stream\_socket实现端口监听(各有利弊,请酌情使用,大部分场景,推荐将进程数保持与CPU数量一致,自动使用 stream\_socket ) 经过我初步测试: 当使用stream\_socket时,服务器首次收到客户端发送的数据后,能够稳定的向客户端发送约5分钟的数据报文,直到该通信会话被Linux内核丢弃,因此使用UDP进行通信,**建议至少60秒进行一次双向心跳交互保活**。 当使用socket时,服务器首次收到客户端发送的数据后,能够稳定的向客户端长期发送数据报文(如果网络中的防火墙或NAT路由器没有将会话过期,应该可以一直使用) 下面直接发布代码 ~~~php <?php chdir(dirname($_SERVER['SCRIPT_FILENAME'])); include_once __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Lib\Timer; $worker = new Worker(); $worker->count = 1; //开启进程数量 $processName = "sip_server_udp5060"; $worker->transport = "udp"; $worker->name = $processName; $worker->reusePort = true; //开启均衡负载模式 $date = date("Y-m-d"); Worker::$pidFile = "var/{$processName}.pid"; Worker::$logFile = "var/{$processName}_logFile.log"; Worker::$stdoutFile = "var/{$processName}_stdout.log"; $socket = null; $workerId = null; define("UNAUTHORIZED_KICKOFF_SECOND" , 1); define("UNAUTHORIZED_ALLOW_TRYTIME" , 10); define("UDP_SOCKET_TYPE_IS_STREAM" , $worker->count > 1 ? 1 : 0 ); $worker->onWorkerStart = function() { global $worker , $workerId , $socket , $processName; //定期与数据库握手,避免被断掉,该动作每个进程都得执行 $workerId = $worker->id ; $worker->connections = []; $worker->connections_ur = []; //根据daemon顺序延时,这是确保系统正常运行的关键 usleep(1000 * 10 * ($worker->id+1) ); echo date("Y-m-d H:i:s")." 服务进程{$worker->id}已经启动!\n"; if( $workerId >= 0 ){ //计划监听的UDP端口 if(UDP_SOCKET_TYPE_IS_STREAM == 0){ //这种模式只能运行一个进程 $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); socket_bind($socket , "0.0.0.0" , 5060); }else{ //这种模式可以多个进程共同监听同一端口 $context = stream_context_create(); $socket = stream_socket_server("udp://0.0.0.0:5060" , $error_code , $error_message , STREAM_SERVER_BIND , $context); } Worker::$globalEvent->add($socket, 1 , "acceptUdpConnection"); //加入全局事件loop /* //如果需要从服务器主动发数据给客户端,可以通过channel方式调用,具体的业务实现可以参照官方channel使用介绍 Channel\Client::connect(CHANNEL_SIP_IP , CHANNEL_SIP_PORT); $event_name = $processName; //UDP的数据回复跟TCP不一样,只要知道对方端口即可,理论上任意一个进程均可从服务器端发送数据给客户端 //到公网环境查询本机公网IP,便于生成日志 Channel\Client::on($event_name, function($event_data)use($worker , $event_name) { }); */ //每隔一段时间对长时间未进行通信的临时会话进行清理 //不宜太大,太大容易遭受DDOS攻击,也不宜太小,太小的话有可能还没完成业务逻辑就被踢掉 Timer::add( UNAUTHORIZED_KICKOFF_SECOND , function (){ global $worker , $socket; $dateTime = date("Y-m-d H:i:s"); $timeNow = time(); foreach ($worker->connections_ur as $remote_address => $remote_arr){ if( $remote_arr['lastMsgTime'] < $timeNow - UNAUTHORIZED_KICKOFF_SECOND ){ //非法接入,直接踢掉 unset($worker->connections_ur[$remote_address]); //print("{$dateTime} {$remote_address} 踢掉非法接入\n"); //sendto($remote_address ,"illegal connection!\n"); } //平台主动发送数据给终端, sendto($remote_address ,"{$dateTime} test send data!\n"); } }); //每隔一段时间对已经认证过的会话进行检查,对于长时间未通信的需要进行清理 Timer::add( 60 , function (){ global $worker , $socket; $dateTime = date("Y-m-d H:i:s"); $timeNow = time(); foreach ($worker->connections as $remote_address => $remote_arr){ if( $remote_arr['lastMsgTime'] < $timeNow - 300 ){ //超时未通信,直接踢掉 unset($worker->connections[$remote_address]); //这里编写远端会话离线的内容 //print("{$dateTime} {$remote_address} 会话超时掉线\n"); //sendto($remote_address ,"connection timeout!\n"); } } }); } }; function sendto($remote_address , $send_buffer ){ global $socket; if(UDP_SOCKET_TYPE_IS_STREAM == 0){ list($host, $port) = explode(":" , $remote_address ); socket_sendto($socket , $send_buffer , strlen($send_buffer), 0, $host, $port); }else{ stream_socket_sendto( $socket, $send_buffer, 0, $remote_address); } } function acceptUdpConnection($socket){ global $worker; $dateTime = date("Y-m-d H:i:s"); $timeNow = time(); set_error_handler(function(){}); if(UDP_SOCKET_TYPE_IS_STREAM == 0){ socket_recvfrom($socket, $recv_buffer, 65535, 0, $from, $port); $remote_address = "{$from}:{$port}"; }else{ $recv_buffer = stream_socket_recvfrom($socket, 65535 , 0, $remote_address); } restore_error_handler(); if (false === $recv_buffer || empty($remote_address)) { return false; } if( isset($worker->connections[$remote_address]['lastMsgTime']) && $worker->connections[$remote_address]['lastMsgTime'] >= $timeNow - 300 ){ //已注册成功 $worker->connections[$remote_address]['lastMsgTime'] = $timeNow; //处理业务逻辑 针对已经认证的合法连接 //针对已经认证过的连接,建议将收到的数据通过channel发布到其他服务端进行轮询处理,以最大化提升系统处理性能,此时,本程序仅仅充当gateway功能 }else{ //未注册成功 未认证的可以进行几次通信 $tryCount = $worker->connections_ur[$remote_address]['tryCount']??1; if( $tryCount < UNAUTHORIZED_ALLOW_TRYTIME ){ $worker->connections_ur[$remote_address]['lastMsgTime'] = $timeNow; $worker->connections_ur[$remote_address]['tryCount'] = $tryCount + 1; //sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,try time {$tryCount} \n"); //处理业务逻辑 对于未认证的连接,必须在超时前完成认证 完成认证后需要将连接从 connections_ur 里面移出,并且加入到 connections 里面去 }else{ //超过一定交互次数但仍未完成认证的会话将被忽视,直到被踢下线 //sendTo( $remote_address ,"{$dateTime} {$remote_address}:\nunauthorized,max try \n"); } } //收到数据,打印日志 print( "{$dateTime} {$remote_address}:\n{$recv_buffer}\n"); }; Worker::runAll(); /* 以下是程序输出样例数据 2024-03-27 01:15:52 39.129.72.95:50574: REGISTER sip:34020000002000000001@3402000000 SIP/2.0 Via: SIP/2.0/UDP 192.168.1.4:5060;rport;branch=z9hG4bK650883237 From: <sip:34020000001180870005@3402000000>;tag=698431213 To: <sip:34020000001180870005@3402000000> Call-ID: 1146222786 CSeq: 1 REGISTER Contact: <sip:34020000001180870005@192.168.1.4:5060> Max-Forwards: 70 User-Agent: Embedded Net DVR/NVR/DVS Expires: 86400 Content-Length: 0 2024-03-27 01:15:53 39.129.72.95:15363: REGISTER sip:34020000002000000001@3402000000 SIP/2.0 Via: SIP/2.0/UDP 192.168.1.2:5060;rport;branch=z9hG4bK1890963109 From: <sip:34020000001180870007@3402000000>;tag=1955740825 To: <sip:34020000001180870007@3402000000> Call-ID: 254089720 CSeq: 1 REGISTER Contact: <sip:34020000001180870007@192.168.1.2:5060> Max-Forwards: 70 User-Agent: Embedded Net DVR/NVR/DVS Expires: 86400 Content-Length: 0 2024-03-27 01:15:54 39.129.72.95:31137: REGISTER sip:34020000002000000001@3402000000 SIP/2.0 Via: SIP/2.0/UDP 192.168.1.8:5060;rport;branch=z9hG4bK1552460892 From: <sip:34020000001180870002@3402000000>;tag=1271059450 To: <sip:34020000001180870002@3402000000> Call-ID: 108704431 CSeq: 1 REGISTER Contact: <sip:34020000001180870002@192.168.1.8:5060> Max-Forwards: 70 User-Agent: Embedded Net DVR/NVR/DVS Expires: 86400 Content-Length: 0 01:38:09.293144 IP (tos 0x68, ttl 48, id 44466, offset 0, flags [DF], proto UDP (17), length 436) 39.129.72.95.64959 > 192.168.27.21.5060: [udp sum ok] SIP, length: 408 REGISTER sip:34020000002000000001@3402000000 SIP/2.0 Via: SIP/2.0/UDP 192.168.1.7:5060;rport;branch=z9hG4bK1838031091 From: <sip:34020000001180870003@3402000000>;tag=793428652 To: <sip:34020000001180870003@3402000000> Call-ID: 1815643450 CSeq: 1 REGISTER Contact: <sip:34020000001180870003@192.168.1.7:5060> Max-Forwards: 70 User-Agent: Embedded Net DVR/NVR/DVS Expires: 86400 Content-Length: 0 01:38:09.540771 IP (tos 0x0, ttl 64, id 30488, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.129.72.95.50574: [udp sum ok] SIP 01:38:09.540806 IP (tos 0x0, ttl 64, id 30489, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.129.72.95.31137: [udp sum ok] SIP 01:38:09.540815 IP (tos 0x0, ttl 64, id 30490, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.129.72.95.15363: [udp sum ok] SIP 01:38:09.540822 IP (tos 0x0, ttl 64, id 45413, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.130.87.71.34950: [udp sum ok] SIP 01:38:09.540830 IP (tos 0x0, ttl 64, id 27774, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.128.225.81.35329: [udp sum ok] SIP 01:38:09.540838 IP (tos 0x0, ttl 64, id 30491, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.129.72.95.26505: [udp sum ok] SIP 01:38:09.540845 IP (tos 0x0, ttl 64, id 2845, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.129.72.95.64959: [udp sum ok] SIP 01:38:09.540860 IP (tos 0x0, ttl 64, id 30493, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.129.72.95.36321: [udp sum ok] SIP 01:38:09.540867 IP (tos 0x0, ttl 64, id 27775, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.128.225.81.35342: [udp sum ok] SIP 01:38:09.540873 IP (tos 0x0, ttl 64, id 30494, offset 0, flags [DF], proto UDP (17), length 64) 192.168.27.21.5060 > 39.129.72.95.23047: [udp sum ok] SIP * */ ~~~ >[danger]特别说明一下,本人对网络有一定的了解,据我所知大部分网络环境中的NAT防火墙UDP会话默认超时时间大概为5分钟-10分钟左右。 因此只要UDP客户端与服务器端每隔60秒进行一次**双向的**心跳交互,可以确保通信的持续进行(当有业务需求时,任意一端可向另外一端发送所需要的数据报文),就像SIP一样,哪怕你只发送个“hello”也是可以的。