From 491bf690fa90080c0c3d9bda880e637d447171e4 Mon Sep 17 00:00:00 2001 From: pandaLIU <563883861@qq.com> Date: Mon, 1 Nov 2021 00:09:09 +0800 Subject: [PATCH] update --- src/Core/Rpc/Runtime/Swoole/SocketChannel.php | 14 +++++++++++--- src/Core/Rpc/Runtime/Swow/SocketChannel.php | 3 --- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/Core/Rpc/Runtime/Swoole/SocketChannel.php b/src/Core/Rpc/Runtime/Swoole/SocketChannel.php index ff57d9b..08b0ee1 100644 --- a/src/Core/Rpc/Runtime/Swoole/SocketChannel.php +++ b/src/Core/Rpc/Runtime/Swoole/SocketChannel.php @@ -15,6 +15,7 @@ use Hyperf\Seata\Core\Protocol\MessageType; use Hyperf\Seata\Core\Protocol\RpcMessage; use Hyperf\Seata\Core\Rpc\Address; +use Hyperf\Seata\Core\Rpc\Runtime\ProcessorManager; use Hyperf\Seata\Core\Rpc\Runtime\SocketChannelInterface; use Hyperf\Seata\Core\Rpc\Runtime\V1\ProtocolV1Decoder; use Hyperf\Seata\Core\Rpc\Runtime\V1\ProtocolV1Encoder; @@ -75,22 +76,29 @@ public function getAddress(): Address protected function createRecvLoop() { Coroutine::create(function () { + $processorManger = ApplicationContext::getContainer()->get(ProcessorManager::class); while (true) { try { $data = $this->socket->recvAll(); if (! $data) { + // Coroutines give up + usleep(1); continue; } $byteBuffer = ByteBuffer::wrapBinary($data); $rpcMessage = $this->protocolDecoder->decode($byteBuffer); + + $processorManger->dispatch($this, $rpcMessage); + if (isset($this->responses[$rpcMessage->getId()])) { $responseChannel = $this->responses[$rpcMessage->getId()]; $responseChannel->push($rpcMessage); - } elseif ($rpcMessage->getMessageType() === MessageType::TYPE_HEARTBEAT_MSG) { - var_dump('heartbeat', $rpcMessage); - } else { + } else { var_dump('else', $rpcMessage); } +// elseif ($rpcMessage->getMessageType() === MessageType::TYPE_HEARTBEAT_MSG) { +// var_dump('heartbeat', $rpcMessage); +// } } catch (\InvalidArgumentException $exception) { var_dump($exception->getMessage()); } diff --git a/src/Core/Rpc/Runtime/Swow/SocketChannel.php b/src/Core/Rpc/Runtime/Swow/SocketChannel.php index 8587cf0..0e30d0a 100644 --- a/src/Core/Rpc/Runtime/Swow/SocketChannel.php +++ b/src/Core/Rpc/Runtime/Swow/SocketChannel.php @@ -42,8 +42,6 @@ class SocketChannel implements SocketChannelInterface protected Channel $sendChannel; - protected ProcessorManager $processorManger; - public function __construct(Socket $socket, Address $address) { $this->socket = $socket; @@ -51,7 +49,6 @@ public function __construct(Socket $socket, Address $address) $container = ApplicationContext::getContainer(); $this->protocolEncoder = $container->get(ProtocolV1Encoder::class); $this->protocolDecoder = $container->get(ProtocolV1Decoder::class); -// $this->processorManger = $container->get(ProcessorManager::class); $this->sendChannel = new Channel(); $this->createRecvLoop(); //$this->createSendLoop();