Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExtEvLoop: Add new ExtEvLoop (PECL source ext-ev) #12

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
composer.lock
phpunit.xml
vendor
244 changes: 244 additions & 0 deletions src/ExtEvLoop.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
<?php

namespace React\EventLoop;

use SplObjectStorage;
use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;

class ExtEvLoop implements LoopInterface
{
private $loop;
private $nextTickQueue;
private $futureTickQueue;
private $timers;
private $readEvents = array();
private $writeEvents = array();

private $running = false;

public function __construct()
{
$this->loop = new \EvLoop();
$this->timers = new SplObjectStorage();
$this->nextTickQueue = new NextTickQueue($this);
$this->futureTickQueue = new FutureTickQueue($this);
$this->timers = new SplObjectStorage();
}

/**
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use @inheritdoc for the phpdoc headers of methods defined in the React\EventLoop\LoopInterface.

{
$this->addStream($stream, $listener, \Ev::READ);
}

/**
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{
$this->addStream($stream, $listener, \Ev::WRITE);
}

/**
* {@inheritdoc}
*/
public function removeReadStream($stream)
{
$key = (int) $stream;
if (isset($this->readEvents[$key])) {
$this->readEvents[$key]->stop();
unset($this->readEvents[$key]);
}
}

/**
* {@inheritdoc}
*/
public function removeWriteStream($stream)
{
$key = (int) $stream;
if (isset($this->writeEvents[$key])) {
$this->writeEvents[$key]->stop();
unset($this->writeEvents[$key]);
}
}

/**
* {@inheritdoc}
*/
public function removeStream($stream)
{
$this->removeReadStream($stream);
$this->removeWriteStream($stream);
}

/**
* Wraps the listener in a callback which will pass the
* stream to the listener then registers the stream with
* the eventloop.
*
* @param resource $stream PHP Stream resource
* @param callable $listener stream callback
* @param int $flags flag bitmask
*/
private function addStream($stream, callable $listener, $flags)
{
$listener = function ($event) use ($stream, $listener) {
call_user_func($listener, $stream, $this);
};

$event = $this->loop->io($stream, $flags, $listener);

if (($flags & \Ev::READ) === $flags) {
$this->readEvents[(int)$stream] = $event;
} elseif (($flags & \Ev::WRITE) === $flags) {
$this->writeEvents[(int)$stream] = $event;
}
}

/**
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, false);
$this->setupTimer($timer);

return $timer;
}

/**
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, true);
$this->setupTimer($timer);

return $timer;
}

/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer)
{
if (isset($this->timers[$timer])) {
/* stop EvTimer */
$this->timers[$timer]->stop();

/* defer timer */
$this->nextTick(function() use ($timer) {
$this->timers->detach($timer);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey, this defer was added to work around a segfault, right? is it still needed?

}
}

/**
* Add timer object as
* @param TimerInterface $timer [description]
* @return [type] [description]
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing something here? :-) Seems incomplete....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok to me, what strikes you as incomplete? There are some work arounds necessary with the EV extension in order to avoid segfaults across different versions of PHP, this is what your seeing here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah sorry, I mean the contents of the phpdoc block :-)
See Add timer object as, [description] and type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, missed that - i'll get that updated accordingly

private function setupTimer(TimerInterface $timer)
{
$callback = function () use ($timer) {
call_user_func($timer->getCallback(), $timer);

if (!$timer->isPeriodic()) {
$timer->cancel();
}
};

$interval = $timer->getInterval();

$libevTimer = $this->loop->timer($interval, $interval, $callback);

$this->timers->attach($timer, $libevTimer);

return $timer;
}

/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer)
{
return $this->timers->contains($timer);
}


/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}

/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}

/**
* {@inheritdoc}
*/
public function tick()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing phpdoc headers with @inheritdoc, here and on top of run(), stop() and isTimerActive().

{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();

$flags = \Ev::RUN_ONCE;
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$flags |= \Ev::RUN_NOWAIT;
} elseif (!$this->readEvents && !$this->writeEvents && !$this->timers->count()) {
$this->running = false;
return;
}
$this->loop->run($flags);
}

/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;

while($this->running) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put an empty line before this while loop.

$this->tick();
}
}

/**
* {@inheritdoc}
*/
public function stop()
{
$this->running = false;
}

public function __destruct()
{
// mannually stop all watchers
foreach ($this->timers as $timer) {
$this->timers[$timer]->stop();
}

foreach ($this->readEvents as $event) {
$event->stop();
}

foreach ($this->writeEvents as $event) {
$event->stop();
}
}
}
8 changes: 4 additions & 4 deletions src/ExtEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ private function scheduleTimer(TimerInterface $timer)
/**
* Create a new ext-event Event object, or update the existing one.
*
* @param stream $stream
* @param integer $flag Event::READ or Event::WRITE
* @param resource $stream
* @param integer $flag Event::READ or Event::WRITE
*/
private function subscribeStreamEvent($stream, $flag)
{
Expand All @@ -263,8 +263,8 @@ private function subscribeStreamEvent($stream, $flag)
* Update the ext-event Event object for this stream to stop listening to
* the given event type, or remove it entirely if it's no longer needed.
*
* @param stream $stream
* @param integer $flag Event::READ or Event::WRITE
* @param resource $stream
* @param integer $flag Event::READ or Event::WRITE
*/
private function unsubscribeStreamEvent($stream, $flag)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ public static function create()
// @codeCoverageIgnoreStart
if (function_exists('event_base_new')) {
return new LibEventLoop();
} else if (class_exists('libev\EventLoop', false)) {
} elseif (class_exists('libev\EventLoop', false)) {
return new LibEvLoop;
} else if (class_exists('EventBase', false)) {
} elseif (class_exists('EventBase', false)) {
return new ExtEventLoop;
}

Expand Down
8 changes: 4 additions & 4 deletions src/LibEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ private function scheduleTimer(TimerInterface $timer)
/**
* Create a new ext-libevent event resource, or update the existing one.
*
* @param stream $stream
* @param integer $flag EV_READ or EV_WRITE
* @param resource $stream
* @param integer $flag EV_READ or EV_WRITE
*/
private function subscribeStreamEvent($stream, $flag)
{
Expand Down Expand Up @@ -267,8 +267,8 @@ private function subscribeStreamEvent($stream, $flag)
* Update the ext-libevent event resource for this stream to stop listening to
* the given event type, or remove it entirely if it's no longer needed.
*
* @param stream $stream
* @param integer $flag EV_READ or EV_WRITE
* @param resource $stream
* @param integer $flag EV_READ or EV_WRITE
*/
private function unsubscribeStreamEvent($stream, $flag)
{
Expand Down
18 changes: 9 additions & 9 deletions src/LoopInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,37 @@ interface LoopInterface
/**
* Register a listener to be notified when a stream is ready to read.
*
* @param stream $stream The PHP stream resource to check.
* @param resource $stream The PHP stream resource to check.
* @param callable $listener Invoked when the stream is ready.
*/
public function addReadStream($stream, callable $listener);

/**
* Register a listener to be notified when a stream is ready to write.
*
* @param stream $stream The PHP stream resource to check.
* @param resource $stream The PHP stream resource to check.
* @param callable $listener Invoked when the stream is ready.
*/
public function addWriteStream($stream, callable $listener);

/**
* Remove the read event listener for the given stream.
*
* @param stream $stream The PHP stream resource.
* @param resource $stream The PHP stream resource.
*/
public function removeReadStream($stream);

/**
* Remove the write event listener for the given stream.
*
* @param stream $stream The PHP stream resource.
* @param resource $stream The PHP stream resource.
*/
public function removeWriteStream($stream);

/**
* Remove all listeners for the given stream.
*
* @param stream $stream The PHP stream resource.
* @param resource $stream The PHP stream resource.
*/
public function removeStream($stream);

Expand All @@ -49,8 +49,8 @@ public function removeStream($stream);
* The execution order of timers scheduled to execute at the same time is
* not guaranteed.
*
* @param numeric $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
* @param int|float $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
*
* @return TimerInterface
*/
Expand All @@ -62,8 +62,8 @@ public function addTimer($interval, callable $callback);
* The execution order of timers scheduled to execute at the same time is
* not guaranteed.
*
* @param numeric $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
* @param int|float $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
*
* @return TimerInterface
*/
Expand Down
Loading