From e1939a6853e270f4520b4dcc6f4c17ddf1ee4de7 Mon Sep 17 00:00:00 2001 From: lin <465382251@qq.com> Date: Tue, 13 Apr 2021 13:53:16 +0800 Subject: [PATCH] private channel reconnection --- src/Api/WebSocket/SocketClient.php | 8 ++-- src/Api/WebSocket/SocketFunction.php | 12 +++-- src/Api/WebSocket/SocketServer.php | 69 ++++++++++++++++------------ 3 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/Api/WebSocket/SocketClient.php b/src/Api/WebSocket/SocketClient.php index 6549d60..e715185 100644 --- a/src/Api/WebSocket/SocketClient.php +++ b/src/Api/WebSocket/SocketClient.php @@ -62,7 +62,7 @@ public function subscribe(array $sub=[]){ 'connection'=>0, ]); } - + //print_r($this->resub($sub)); $this->save('add_sub',$this->resub($sub)); } @@ -206,7 +206,9 @@ function test_reconnection(){ ]; } - function test_reconnection2(){ - $this->client->debug2=1; + function test_reconnection2($key){ + $this->client->debug=[ + 'private'=>[$key=>'close'], + ]; } } diff --git a/src/Api/WebSocket/SocketFunction.php b/src/Api/WebSocket/SocketFunction.php index fb6c300..0d846fb 100644 --- a/src/Api/WebSocket/SocketFunction.php +++ b/src/Api/WebSocket/SocketFunction.php @@ -179,7 +179,7 @@ private function channelType($sub){ /** * 重新订阅 */ - private function reconnection($global,$type='public'){ + private function reconnection($global,$type='public',array $keysecret=[]){ $all_sub=$global->get('all_sub'); if(empty($all_sub)) return; @@ -188,10 +188,16 @@ private function reconnection($global,$type='public'){ foreach ($all_sub as $v){ if(!is_array($v)) $temp[]=$v; } - - $global->save('add_sub',$this->resub($temp)); }else{ + $this->keysecret=$keysecret; + $temp=[]; + foreach ($all_sub[$keysecret['key']] as $v){ + $t=explode(self::$USER_DELIMITER,$v); + $temp[]=$t[1]; + } } + + $global->save('add_sub',$this->resub($temp)); } } diff --git a/src/Api/WebSocket/SocketServer.php b/src/Api/WebSocket/SocketServer.php index d33ff8e..a049354 100644 --- a/src/Api/WebSocket/SocketServer.php +++ b/src/Api/WebSocket/SocketServer.php @@ -173,12 +173,12 @@ private function onMessage($global){ $global->saveQueue($table,$data); }else{ $global->save($table,$data); - } - //最后数据更新时间 - $con->tag_data_time=time(); - //成功接收数据重连次数回归0 - $con->tag_reconnection_num=0; + //最后数据更新时间 + $con->tag_data_time=time(); + //成功接收数据重连次数回归0 + $con->tag_reconnection_num=0; + } return; } @@ -230,12 +230,12 @@ private function onMessage($global){ $global->saveQueue($table,$data); }else{ $global->save($table,$data); - } - //最后数据更新时间 - $con->tag_data_time=time(); - //成功接收数据重连次数回归0 - $con->tag_reconnection_num=0; + //最后数据更新时间 + $con->tag_data_time=time(); + //成功接收数据重连次数回归0 + $con->tag_reconnection_num=0; + } return; } @@ -251,13 +251,21 @@ private function onClose($global){ $this->log($con->tag.' reconnection'); $this->reconnection($global,'public'); - - $con->reConnect(10); }else{ - $this->log('connection close '.$con->tag_keysecret['key']); + $this->log('private connection close '.$con->tag_keysecret['key']); - Timer::del($con->timer_other); + //更改为掉线状态 + $this->keysecretInit($con->tag_keysecret,[ + 'connection'=>2, + 'connection_close'=>0, + 'auth'=>0, + ]); + + //重新订阅私有频道 + $this->reconnection($global,'private',$con->tag_keysecret); } + + $con->reConnect(10); }; } @@ -285,23 +293,18 @@ private function other($con,$global){ $this->log('listen '.$con->tag); - /*if(isset($con->tag_data_time)){ - echo time()-$con->tag_data_time; - echo PHP_EOL; - }*/ - //公共数据如果60秒内无数据更新,则断开连接重新订阅,重试次数不超过10次 - if(isset($con->tag_data_time) && time()-$con->tag_data_time>60*($con->tag_reconnection_num+1) && $con->tag_reconnection_num<=10){ - if(in_array($con->tag,$this->public_url)) { - //public + if(in_array($con->tag,$this->public_url)) { + //public + if (isset($con->tag_data_time) && time() - $con->tag_data_time > 60 * ($con->tag_reconnection_num + 1) && $con->tag_reconnection_num <= 10) { $con->close(); - }else{ - //private - } - $con->tag_reconnection_num++; + $con->tag_reconnection_num++; - $this->log('listen '.$con->tag.' reconnection_num:'.$con->tag_reconnection_num.' tag_data_time:'.$con->tag_data_time); + $this->log('listen ' . $con->tag . ' reconnection_num:' . $con->tag_reconnection_num . ' tag_data_time:' . $con->tag_data_time); + } + }else{ + //private } }); } @@ -312,10 +315,10 @@ private function other($con,$global){ * @param $global */ private function debug($con,$global){ + $debug=$global->get('debug'); + if(in_array($con->tag,$this->public_url)) { //public - $debug=$global->get('debug'); - if(isset($debug['public']) && $debug['public'][$con->tag]=='close'){ $this->log($con->tag.' debug '.json_encode($debug)); @@ -326,6 +329,14 @@ private function debug($con,$global){ } }else{ //private + if(isset($debug['private'][$con->tag_keysecret['key']]) && $debug['private'][$con->tag_keysecret['key']]=='close'){ + $this->log($con->tag_keysecret['key'].' debug '.json_encode($debug)); + + $debug['private'][$con->tag_keysecret['key']]='recon'; + $global->save('debug',$debug); + + $con->close(); + } } }