Skip to content

Commit

Permalink
Update Channel.php
Browse files Browse the repository at this point in the history
  • Loading branch information
VennDev committed Aug 31, 2024
1 parent 733be2e commit 553029e
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions src/vennv/vapm/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,31 @@ interface ChannelInterface
*
* This function is used to send a message to the channel.
*/
public function send($message): Generator;
public function sendGen($message): Generator;

/**
* @param mixed $message
* @return void
*
* This function is used to send a message to the channel.
*/
public function send($message): void;

/**
* @param callable $callback
* @return Generator
*
* This function is used to receive a message from the channel.
*/
public function receiveGen(): Generator;
public function receiveGen(callable $callback): Generator;

/**
* @return mixed
* @param callable $callback
* @return void
*
* This function is used to receive a message from the channel.
*/
public function receive(): mixed;
public function receive(callable $callback): void;

/**
* @return bool
Expand Down Expand Up @@ -87,7 +97,7 @@ final class Channel implements ChannelInterface

private bool $closed = false;

public function send($message): Generator
public function sendGen($message): Generator
{
$this->exceptionIfClosed();
while ($this->locked) yield;
Expand All @@ -96,29 +106,33 @@ public function send($message): Generator
$this->locked = false;
}

public function receiveGen(): Generator
public function send($message): void
{
$this->exceptionIfClosed();
while ($this->locked) {
CoroutineGen::run();
yield;
}
$this->locked = true;
$message = array_shift($this->queue);
$this->queue[] = $message;
$this->locked = false;
return yield $message;
}

public function receive(): mixed
public function receiveGen(callable $callback): Generator
{
$this->exceptionIfClosed();
while ($this->locked) {
while (!$this->closed || !empty($this->queue)) {
$message = array_shift($this->queue);
if ($message !== null) $callback($message);
yield;
}
}

public function receive(callable $callback): void
{
while (!$this->closed || !empty($this->queue)) {
$message = array_shift($this->queue);
if ($message !== null) $callback($message);
CoroutineGen::run();
}
$this->locked = true;
$message = array_shift($this->queue);
$this->locked = false;
return $message;
}

public function isEmpty(): bool
Expand Down

0 comments on commit 553029e

Please sign in to comment.