Skip to content

Commit

Permalink
Reformat code + fix Thread!
Browse files Browse the repository at this point in the history
  • Loading branch information
VennDev authored Aug 21, 2024
1 parent ab2e83c commit 915afe3
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 48 deletions.
4 changes: 2 additions & 2 deletions src/vennv/vapm/ClosureThread.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public function onRun(): void
} elseif ($callback instanceof \Generator) {
$callback = json_encode(iterator_to_array($callback));
} else {
$callback = (string) $callback;
$callback = (string)$callback;
}
if (is_bool($callback)) $callback = (string) $callback;
if (is_bool($callback)) $callback = (string)$callback;
self::post($callback);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/vennv/vapm/PHPUtils.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ final class PHPUtils
{

/**
* @throws Throwable
* @param array<int|float|string|object> $array
* @param callable $callback
* @return Async
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*/
public static function forEach(array $array, callable $callback): Async
{
Expand All @@ -47,12 +47,12 @@ public static function forEach(array $array, callable $callback): Async
}

/**
* @throws Throwable
* @param array<int|float|string|object> $array
* @param callable $callback
* @return Async
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*/
public static function arrayMap(array $array, callable $callback): Async
{
Expand All @@ -67,12 +67,12 @@ public static function arrayMap(array $array, callable $callback): Async
}

/**
* @throws Throwable
* @param array<int|float|string|object> $array
* @param callable $callback
* @return Async
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*/
public static function arrayFilter(array $array, callable $callback): Async
{
Expand Down
12 changes: 5 additions & 7 deletions src/vennv/vapm/Promise.php
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ public function __construct(callable $callback, bool $justGetResult = false)
return $result;
};

$this->callbackFinally = function (): void {};
$this->callbackFinally = function (): void {
};

EventLoop::addQueue($this);
}
Expand Down Expand Up @@ -396,7 +397,6 @@ public function finally(callable $callback): Promise
*/
public function useCallbacks(): void
{
$hasCallbacks = false;
$result = $this->result;

if ($this->isResolved()) {
Expand All @@ -411,18 +411,16 @@ public function useCallbacks(): void
}

if (count($callbacks) > 0) {
$hasCallbacks = true;
/** @var callable $callback */
$callback = $callbacks[0];
$resultFirstCallback = call_user_func($callback, $this->result);
$this->result = $resultFirstCallback;
$this->return = $resultFirstCallback;
$this->checkStatus($callbacks, $this->return);
}
} else if ($this->isRejected()) {
} elseif ($this->isRejected()) {
if (is_callable($this->callbackReject)) $this->result = call_user_func($this->callbackReject, $result);
}
if (!$hasCallbacks && is_callable($this->callbackFinally)) call_user_func($this->callbackFinally);
}

/**
Expand Down Expand Up @@ -455,7 +453,7 @@ private function checkStatus(ArrayObject $callbacks, mixed $return): void
$queue1->then($callback);
if (is_callable($this->callbackReject)) $queue1->catch($this->callbackReject);
$lastPromise = $queue1;
} else if (!is_null($queue2)) {
} elseif (!is_null($queue2)) {
$queue2->then($callback);
if (is_callable($this->callbackReject)) $queue2->catch($this->callbackReject);
$lastPromise = $queue2;
Expand Down Expand Up @@ -653,4 +651,4 @@ public static function race(array $promises): Promise
return $promise;
}

}
}
75 changes: 48 additions & 27 deletions src/vennv/vapm/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -411,57 +411,78 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise

$process = proc_open($command, $mode, $pipes);

$output = '';
$error = '';
if (is_resource($process)) {
$data = json_encode(self::getDataMainThread());

if (is_string($data)) fwrite($pipes[0], $data);
fclose($pipes[0]);

stream_set_blocking($pipes[1], false);
stream_set_blocking($pipes[2], false);

$data = json_encode(self::getDataMainThread());

if (is_string($data)) {
fwrite($pipes[0], $data);
fclose($pipes[0]);
$status = proc_get_status($process);
$pid = $status['pid'];
if (!isset(self::$threads[$pid])) {
$this->setPid($pid);
self::$threads[$pid] = $this;
}

while (proc_get_status($process)['running']) {
$thread = self::$threads[$pid];
$thread->setExitCode($status['exitcode']);
$thread->setRunning($status['running']);
$thread->setSignaled($status['signaled']);
$thread->setStopped($status['stopped']);
while ($thread->isRunning()) {
$status = proc_get_status($process);

if (!isset(self::$threads[$status['pid']])) {
$this->setPid($status['pid']);
self::$threads[$status['pid']] = $this;
}

$thread = self::$threads[$status['pid']];

$thread->setExitCode($status['exitcode']);
$thread->setRunning($status['running']);
$thread->setSignaled($status['signaled']);
$thread->setStopped($status['stopped']);

if ($thread->isStopped()) {
if ($thread->isRunning()) {
$read = [$pipes[1], $pipes[2]];
$write = null;
$except = null;
$timeout = 0;

$n = stream_select($read, $write, $except, $timeout);
if ($n === false) break;
if ($n > 0) {
foreach ($read as $stream) {
if (!feof($stream)) {
$data = stream_get_contents($stream, 1024);
if ($data === false || $data === '') continue;
$stream === $pipes[1] ? $output .= $data : $error .= $data;
}
FiberManager::wait();
}
}
} elseif ($thread->isStopped() || $thread->isSignaled()) {
proc_terminate($process);
break;
} else {
proc_terminate($process);
break;
}

FiberManager::wait();
}

$output = stream_get_contents($pipes[1]);
$error = stream_get_contents($pipes[2]);
$outputStream = stream_get_contents($pipes[1]);
$errorStream = stream_get_contents($pipes[2]);
if (!is_bool($outputStream)) $output .= str_contains($output, $outputStream) ? '' : $outputStream;
if (!is_bool($errorStream)) $error .= str_contains($error, $errorStream) ? '' : $errorStream;

fclose($pipes[1]);
fclose($pipes[2]);

if ($error !== '' && is_string($error)) {
if ($error !== '') {
return $reject(new ThreadException($error));
} else {
if (!is_bool($output)) {
$explode = explode(PHP_EOL, $output);
foreach ($explode as $item) {
if ($item !== '' && self::isPostMainThread($item)) self::loadSharedData($item);
elseif ($item !== '' && self::isPostThread($item)) {
$output = Utils::getStringAfterSign($item, self::POST_THREAD . '=>');
}
}
if ($output !== '' && self::isPostMainThread($output)) self::loadSharedData($output);
elseif ($output !== '' && self::isPostThread($output)) {
$output = Utils::getStringAfterSign($output, self::POST_THREAD . '=>');
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/vennv/vapm/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public function run(callable $callback): Async
if (!$this->isLocked()) {
if (count($promises) < $threads && $work->count() > 0) {
$callbackQueue = $work->dequeue();

if (!is_callable($callbackQueue)) continue;

$thread = new ClosureThread($callbackQueue);
Expand Down
3 changes: 2 additions & 1 deletion src/vennv/vapm/utils/Property.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public function update(object $data, array $options): object
* @var mixed $value
*/
foreach ($options as $key => $value) {
if (property_exists($data, $key)) $data->{$key} = $value; /* @phpstan-ignore-line */
if (property_exists($data, $key)) $data->{$key} = $value;
/* @phpstan-ignore-line */
}

return $data;
Expand Down
14 changes: 7 additions & 7 deletions src/vennv/vapm/utils/Utils.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public static function isClass(string $class): bool;

/**
* @return string
*
*
* Get string after sign
*/
public static function getStringAfterSign(string $string, string $sign): string;
Expand Down Expand Up @@ -213,9 +213,10 @@ public static function fixInputCommand(string $text): array|string
*/
public static function removeComments(string $text): null|string|array
{
$text = preg_replace('/\/\/.*?(\r\n|\n|$)/', '', $text);
$text = preg_replace('/(?<!:)\/\/.*?(\r\n|\n|$)/', '', $text);
if ($text === null || is_array($text)) return null;
return preg_replace('/\/\*.*?\*\//ms', '', $text);
$text = preg_replace('/\/\*[\s\S]*?\*\//', '', $text);
return $text;
}

/**
Expand Down Expand Up @@ -326,14 +327,13 @@ public static function isClass(string $class): bool

/**
* @return string
*
*
* Get string after sign
*/
public static function getStringAfterSign(string $string, string $sign): string
{
$position = strpos($string, $sign);
if ($position === false) return '';
return substr($string, $position + strlen($sign));
if (preg_match('/' . preg_quote($sign, '/') . '(.*)/s', $string, $matches)) return $matches[1];
return '';
}

}

0 comments on commit 915afe3

Please sign in to comment.