Skip to content

Update master from 0.4 #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/ExtEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ public function run()
break;
}

// @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
@$this->eventBase->loop($flags);
$this->eventBase->loop($flags);
}
}

Expand Down
103 changes: 58 additions & 45 deletions tests/AbstractLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,36 @@ public function setUp()

abstract public function createLoop();

public function createStream()
public function createSocketPair()
{
return fopen('php://temp', 'r+');
}
$domain = (DIRECTORY_SEPARATOR === '\\') ? STREAM_PF_INET : STREAM_PF_UNIX;
$sockets = stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

public function writeToStream($stream, $content)
{
fwrite($stream, $content);
rewind($stream);
foreach ($sockets as $socket) {
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($socket, 0);
}
}

return $sockets;
}

public function testAddReadStream()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableExactly(2));

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");
$this->tickLoop($this->loop);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->tickLoop($this->loop);
}

public function testAddWriteStream()
{
$input = $this->createStream();
list ($input) = $this->createSocketPair();

$this->loop->addWriteStream($input, $this->expectCallableExactly(2));
$this->tickLoop($this->loop);
Expand All @@ -55,33 +58,33 @@ public function testAddWriteStream()

public function testRemoveReadStreamInstantly()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableNever());
$this->loop->removeReadStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->tickLoop($this->loop);
}

public function testRemoveReadStreamAfterReading()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableOnce());

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");
$this->tickLoop($this->loop);

$this->loop->removeReadStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->tickLoop($this->loop);
}

public function testRemoveWriteStreamInstantly()
{
$input = $this->createStream();
list ($input) = $this->createSocketPair();

$this->loop->addWriteStream($input, $this->expectCallableNever());
$this->loop->removeWriteStream($input);
Expand All @@ -90,7 +93,7 @@ public function testRemoveWriteStreamInstantly()

public function testRemoveWriteStreamAfterWriting()
{
$input = $this->createStream();
list ($input) = $this->createSocketPair();

$this->loop->addWriteStream($input, $this->expectCallableOnce());
$this->tickLoop($this->loop);
Expand All @@ -101,60 +104,60 @@ public function testRemoveWriteStreamAfterWriting()

public function testRemoveStreamInstantly()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableNever());
$this->loop->addWriteStream($input, $this->expectCallableNever());
$this->loop->removeStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->tickLoop($this->loop);
}

public function testRemoveStreamForReadOnly()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableNever());
$this->loop->addWriteStream($input, $this->expectCallableOnce());
$this->loop->addWriteStream($output, $this->expectCallableOnce());
$this->loop->removeReadStream($input);

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");
$this->tickLoop($this->loop);
}

public function testRemoveStreamForWriteOnly()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");

$this->loop->addReadStream($input, $this->expectCallableOnce());
$this->loop->addWriteStream($input, $this->expectCallableNever());
$this->loop->removeWriteStream($input);
$this->loop->addWriteStream($output, $this->expectCallableNever());
$this->loop->removeWriteStream($output);

$this->tickLoop($this->loop);
}

public function testRemoveStream()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableOnce());
$this->loop->addWriteStream($input, $this->expectCallableOnce());

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->tickLoop($this->loop);

$this->loop->removeStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->tickLoop($this->loop);
}

public function testRemoveInvalid()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

// remove a valid stream from the event loop that was never added in the first place
$this->loop->removeReadStream($stream);
Expand All @@ -171,29 +174,29 @@ public function emptyRunShouldSimplyReturn()
/** @test */
public function runShouldReturnWhenNoMoreFds()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$loop = $this->loop;
$this->loop->addReadStream($input, function ($stream) use ($loop) {
$loop->removeStream($stream);
});

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");

$this->assertRunFasterThan($this->tickTimeout * 2);
}

/** @test */
public function stopShouldStopRunningLoop()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$loop = $this->loop;
$this->loop->addReadStream($input, function ($stream) use ($loop) {
$loop->stop();
});

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");

$this->assertRunFasterThan($this->tickTimeout * 2);
}
Expand All @@ -219,23 +222,33 @@ function () {
public function testIgnoreRemovedCallback()
{
// two independent streams, both should be readable right away
$stream1 = $this->createStream();
$stream2 = $this->createStream();
list ($input1, $output1) = $this->createSocketPair();
list ($input2, $output2) = $this->createSocketPair();

$called = false;

$loop = $this->loop;
$loop->addReadStream($stream1, function ($stream) use ($loop, $stream2) {
$loop->addReadStream($input1, function ($stream) use (& $called, $loop, $input2) {
// stream1 is readable, remove stream2 as well => this will invalidate its callback
$loop->removeReadStream($stream);
$loop->removeReadStream($stream2);
$loop->removeReadStream($input2);

$called = true;
});

// this callback would have to be called as well, but the first stream already removed us
$loop->addReadStream($stream2, $this->expectCallableNever());
$loop->addReadStream($input2, function () use (& $called) {
if ($called) {
$this->fail('Callback 2 must not be called after callback 1 was called');
}
});

$this->writeToStream($stream1, "foo\n");
$this->writeToStream($stream2, "foo\n");
fwrite($output1, "foo\n");
fwrite($output2, "foo\n");

$loop->run();

$this->assertTrue($called);
}

public function testFutureTickEventGeneratedByFutureTick()
Expand Down Expand Up @@ -275,7 +288,7 @@ public function testFutureTick()

public function testFutureTickFiresBeforeIO()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand All @@ -297,7 +310,7 @@ function () {

public function testRecursiveFutureTick()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand Down Expand Up @@ -325,7 +338,7 @@ function () {

public function testRunWaitsForFutureTickEvents()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand Down
4 changes: 2 additions & 2 deletions tests/ExtEventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public function testCanUseReadableStreamWithFeatureFds()

$this->loop->addReadStream($input, $this->expectCallableExactly(2));

$this->writeToStream($input, "foo\n");
fwrite($input, "foo\n");
$this->tickLoop($this->loop);

$this->writeToStream($input, "bar\n");
fwrite($input, "bar\n");
$this->tickLoop($this->loop);
}
}
18 changes: 9 additions & 9 deletions tests/StreamSelectLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public function testStreamSelectTimeoutEmulation()
public function signalProvider()
{
return [
['SIGUSR1', SIGUSR1],
['SIGHUP', SIGHUP],
['SIGTERM', SIGTERM],
['SIGUSR1'],
['SIGHUP'],
['SIGTERM'],
];
}

Expand All @@ -52,7 +52,7 @@ public function signalProvider()
* Test signal interrupt when no stream is attached to the loop
* @dataProvider signalProvider
*/
public function testSignalInterruptNoStream($sigName, $signal)
public function testSignalInterruptNoStream($signal)
{
if (!extension_loaded('pcntl')) {
$this->markTestSkipped('"pcntl" extension is required to run this test.');
Expand All @@ -78,7 +78,7 @@ public function testSignalInterruptNoStream($sigName, $signal)
* Test signal interrupt when a stream is attached to the loop
* @dataProvider signalProvider
*/
public function testSignalInterruptWithStream($sigName, $signal)
public function testSignalInterruptWithStream($signal)
{
if (!extension_loaded('pcntl')) {
$this->markTestSkipped('"pcntl" extension is required to run this test.');
Expand All @@ -88,7 +88,7 @@ public function testSignalInterruptWithStream($sigName, $signal)
$this->loop->addPeriodicTimer(0.01, function() { pcntl_signal_dispatch(); });

// add stream to the loop
list($writeStream, $readStream) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
list($writeStream, $readStream) = $this->createSocketPair();
$this->loop->addReadStream($readStream, function($stream, $loop) {
/** @var $loop LoopInterface */
$read = fgets($stream);
Expand Down Expand Up @@ -116,7 +116,7 @@ public function testSignalInterruptWithStream($sigName, $signal)
protected function setUpSignalHandler($signal)
{
$this->_signalHandled = false;
$this->assertTrue(pcntl_signal($signal, function() { $this->_signalHandled = true; }));
$this->assertTrue(pcntl_signal(constant($signal), function() { $this->_signalHandled = true; }));
}

/**
Expand All @@ -125,7 +125,7 @@ protected function setUpSignalHandler($signal)
protected function resetSignalHandlers()
{
foreach($this->signalProvider() as $signal) {
pcntl_signal($signal[1], SIG_DFL);
pcntl_signal(constant($signal[0]), SIG_DFL);
}
}

Expand All @@ -141,7 +141,7 @@ protected function forkSendSignal($signal)
} else if ($childPid === 0) {
// this is executed in the child process
usleep(20000);
posix_kill($currentPid, $signal);
posix_kill($currentPid, constant($signal));
die();
}
}
Expand Down