Skip to content

Test suite now uses socket pairs instead of memory streams #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from Feb 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 69 additions & 56 deletions tests/AbstractLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,36 @@ public function setUp()

abstract public function createLoop();

public function createStream()
{
return fopen('php://temp', 'r+');
}

public function writeToStream($stream, $content)
{
fwrite($stream, $content);
rewind($stream);
public function createSocketPair()
{
$domain = (DIRECTORY_SEPARATOR === '\\') ? STREAM_PF_INET : STREAM_PF_UNIX;
$sockets = stream_socket_pair($domain, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

foreach ($sockets as $socket) {
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($socket, 0);
}
}

return $sockets;
}

public function testAddReadStream()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableExactly(2));

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");
$this->loop->tick();

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->loop->tick();
}

public function testAddWriteStream()
{
$input = $this->createStream();
list ($input) = $this->createSocketPair();

$this->loop->addWriteStream($input, $this->expectCallableExactly(2));
$this->loop->tick();
Expand All @@ -55,33 +58,33 @@ public function testAddWriteStream()

public function testRemoveReadStreamInstantly()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableNever());
$this->loop->removeReadStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->loop->tick();
}

public function testRemoveReadStreamAfterReading()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableOnce());

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");
$this->loop->tick();

$this->loop->removeReadStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->loop->tick();
}

public function testRemoveWriteStreamInstantly()
{
$input = $this->createStream();
list ($input) = $this->createSocketPair();

$this->loop->addWriteStream($input, $this->expectCallableNever());
$this->loop->removeWriteStream($input);
Expand All @@ -90,7 +93,7 @@ public function testRemoveWriteStreamInstantly()

public function testRemoveWriteStreamAfterWriting()
{
$input = $this->createStream();
list ($input) = $this->createSocketPair();

$this->loop->addWriteStream($input, $this->expectCallableOnce());
$this->loop->tick();
Expand All @@ -101,60 +104,60 @@ public function testRemoveWriteStreamAfterWriting()

public function testRemoveStreamInstantly()
{
$input = $this->createStream();

list ($input, $output) = $this->createSocketPair();
$this->loop->addReadStream($input, $this->expectCallableNever());
$this->loop->addWriteStream($input, $this->expectCallableNever());
$this->loop->removeStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->loop->tick();
}

public function testRemoveStreamForReadOnly()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableNever());
$this->loop->addWriteStream($input, $this->expectCallableOnce());
$this->loop->addWriteStream($output, $this->expectCallableOnce());
$this->loop->removeReadStream($input);

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");
$this->loop->tick();
}

public function testRemoveStreamForWriteOnly()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");

$this->loop->addReadStream($input, $this->expectCallableOnce());
$this->loop->addWriteStream($input, $this->expectCallableNever());
$this->loop->removeWriteStream($input);
$this->loop->addWriteStream($output, $this->expectCallableNever());
$this->loop->removeWriteStream($output);

$this->loop->tick();
}

public function testRemoveStream()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$this->loop->addReadStream($input, $this->expectCallableOnce());
$this->loop->addWriteStream($input, $this->expectCallableOnce());

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->loop->tick();

$this->loop->removeStream($input);

$this->writeToStream($input, "bar\n");
fwrite($output, "bar\n");
$this->loop->tick();
}

public function testRemoveInvalid()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

// remove a valid stream from the event loop that was never added in the first place
$this->loop->removeReadStream($stream);
Expand All @@ -171,29 +174,29 @@ public function emptyRunShouldSimplyReturn()
/** @test */
public function runShouldReturnWhenNoMoreFds()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$loop = $this->loop;
$this->loop->addReadStream($input, function ($stream) use ($loop) {
$loop->removeStream($stream);
});

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");

$this->assertRunFasterThan($this->tickTimeout * 2);
}

/** @test */
public function stopShouldStopRunningLoop()
{
$input = $this->createStream();
list ($input, $output) = $this->createSocketPair();

$loop = $this->loop;
$this->loop->addReadStream($input, function ($stream) use ($loop) {
$loop->stop();
});

$this->writeToStream($input, "foo\n");
fwrite($output, "foo\n");

$this->assertRunFasterThan($this->tickTimeout * 2);
}
Expand All @@ -219,23 +222,33 @@ function () {
public function testIgnoreRemovedCallback()
{
// two independent streams, both should be readable right away
$stream1 = $this->createStream();
$stream2 = $this->createStream();
list ($input1, $output1) = $this->createSocketPair();
list ($input2, $output2) = $this->createSocketPair();

$called = false;

$loop = $this->loop;
$loop->addReadStream($stream1, function ($stream) use ($loop, $stream2) {
$loop->addReadStream($input1, function ($stream) use (& $called, $loop, $input2) {
// stream1 is readable, remove stream2 as well => this will invalidate its callback
$loop->removeReadStream($stream);
$loop->removeReadStream($stream2);
$loop->removeReadStream($input2);

$called = true;
});

// this callback would have to be called as well, but the first stream already removed us
$loop->addReadStream($stream2, $this->expectCallableNever());

$this->writeToStream($stream1, "foo\n");
$this->writeToStream($stream2, "foo\n");

$loop->addReadStream($input2, function () use (& $called) {
if ($called) {
$this->fail('Callback 2 must not be called after callback 1 was called');
}
});

fwrite($output1, "foo\n");
fwrite($output2, "foo\n");

$loop->run();

$this->assertTrue($called);
}

public function testNextTick()
Expand All @@ -258,7 +271,7 @@ public function testNextTick()

public function testNextTickFiresBeforeIO()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand All @@ -280,7 +293,7 @@ function () {

public function testRecursiveNextTick()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand All @@ -306,7 +319,7 @@ function () {

public function testRunWaitsForNextTickEvents()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand All @@ -327,7 +340,7 @@ function () {

public function testNextTickEventGeneratedByFutureTick()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->futureTick(
function () {
Expand Down Expand Up @@ -382,7 +395,7 @@ public function testFutureTick()

public function testFutureTickFiresBeforeIO()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand All @@ -404,7 +417,7 @@ function () {

public function testRecursiveFutureTick()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand Down Expand Up @@ -432,7 +445,7 @@ function () {

public function testRunWaitsForFutureTickEvents()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->addWriteStream(
$stream,
Expand All @@ -453,7 +466,7 @@ function () {

public function testFutureTickEventGeneratedByNextTick()
{
$stream = $this->createStream();
list ($stream) = $this->createSocketPair();

$this->loop->nextTick(
function () {
Expand Down
4 changes: 2 additions & 2 deletions tests/ExtEventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public function testCanUseReadableStreamWithFeatureFds()

$this->loop->addReadStream($input, $this->expectCallableExactly(2));

$this->writeToStream($input, "foo\n");
fwrite($input, "foo\n");
$this->loop->tick();

$this->writeToStream($input, "bar\n");
fwrite($input, "bar\n");
$this->loop->tick();
}
}
2 changes: 1 addition & 1 deletion tests/StreamSelectLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public function testSignalInterruptWithStream($sigName, $signal)
$this->loop->addPeriodicTimer(0.01, function() { pcntl_signal_dispatch(); });

// add stream to the loop
list($writeStream, $readStream) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
list($writeStream, $readStream) = $this->createSocketPair();
$this->loop->addReadStream($readStream, function($stream, $loop) {
/** @var $loop LoopInterface */
$read = fgets($stream);
Expand Down