Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
PandaLIU-1111 committed Oct 31, 2021
1 parent 3955f87 commit 491bf69
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
14 changes: 11 additions & 3 deletions src/Core/Rpc/Runtime/Swoole/SocketChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Hyperf\Seata\Core\Protocol\MessageType;
use Hyperf\Seata\Core\Protocol\RpcMessage;
use Hyperf\Seata\Core\Rpc\Address;
use Hyperf\Seata\Core\Rpc\Runtime\ProcessorManager;
use Hyperf\Seata\Core\Rpc\Runtime\SocketChannelInterface;
use Hyperf\Seata\Core\Rpc\Runtime\V1\ProtocolV1Decoder;
use Hyperf\Seata\Core\Rpc\Runtime\V1\ProtocolV1Encoder;
Expand Down Expand Up @@ -75,22 +76,29 @@ public function getAddress(): Address
protected function createRecvLoop()
{
Coroutine::create(function () {
$processorManger = ApplicationContext::getContainer()->get(ProcessorManager::class);
while (true) {
try {
$data = $this->socket->recvAll();
if (! $data) {
// Coroutines give up
usleep(1);
continue;
}
$byteBuffer = ByteBuffer::wrapBinary($data);
$rpcMessage = $this->protocolDecoder->decode($byteBuffer);

$processorManger->dispatch($this, $rpcMessage);

if (isset($this->responses[$rpcMessage->getId()])) {
$responseChannel = $this->responses[$rpcMessage->getId()];
$responseChannel->push($rpcMessage);
} elseif ($rpcMessage->getMessageType() === MessageType::TYPE_HEARTBEAT_MSG) {
var_dump('heartbeat', $rpcMessage);
} else {
} else {
var_dump('else', $rpcMessage);
}
// elseif ($rpcMessage->getMessageType() === MessageType::TYPE_HEARTBEAT_MSG) {
// var_dump('heartbeat', $rpcMessage);
// }
} catch (\InvalidArgumentException $exception) {
var_dump($exception->getMessage());
}
Expand Down
3 changes: 0 additions & 3 deletions src/Core/Rpc/Runtime/Swow/SocketChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,13 @@ class SocketChannel implements SocketChannelInterface

protected Channel $sendChannel;

protected ProcessorManager $processorManger;

public function __construct(Socket $socket, Address $address)
{
$this->socket = $socket;
$this->address = $address;
$container = ApplicationContext::getContainer();
$this->protocolEncoder = $container->get(ProtocolV1Encoder::class);
$this->protocolDecoder = $container->get(ProtocolV1Decoder::class);
// $this->processorManger = $container->get(ProcessorManager::class);
$this->sendChannel = new Channel();
$this->createRecvLoop();
//$this->createSendLoop();
Expand Down

0 comments on commit 491bf69

Please sign in to comment.