Skip to content

Support throttling streaming result sets from queryStream() #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ like this:
$connection->queryStream('SELECT * FROM user')->pipe($formatter)->pipe($logger);
```

Note that as per the underlying stream definition, calling `pause()` and
`resume()` on this stream is advisory-only, i.e. the stream MAY continue
emitting some data until the underlying network buffer is drained. Also
notice that the server side limits how long a connection is allowed to be
in a state that has outgoing data. Special care should be taken to ensure
the stream is resumed in time. This implies that using `pipe()` with a
slow destination stream may cause the connection to abort after a while.

The given `$sql` parameter MUST contain a single statement. Support
for multiple statements is disabled for security reasons because it
could allow for possible SQL injection attacks and this API is not
Expand Down
5 changes: 4 additions & 1 deletion examples/02-query-stream.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

// $ php examples/02-query-stream.php "SHOW VARIABLES"

use React\MySQL\ConnectionInterface;
use React\MySQL\Factory;

Expand All @@ -14,8 +16,9 @@
//create a mysql connection for executing query
$factory->createConnection($uri)->then(function (ConnectionInterface $connection) use ($query) {
$stream = $connection->queryStream($query);

$stream->on('data', function ($row) {
var_dump($row);
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;
});

$stream->on('error', function (Exception $e) {
Expand Down
77 changes: 77 additions & 0 deletions examples/12-slow-stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

// $ php examples/12-slow-stream.php "SHOW VARIABLES"

use React\MySQL\ConnectionInterface;
use React\MySQL\Factory;

require __DIR__ . '/../vendor/autoload.php';

$loop = React\EventLoop\Factory::create();
$factory = new Factory($loop);

$uri = 'test:test@localhost/test';
$query = isset($argv[1]) ? $argv[1] : 'select * from book';

//create a mysql connection for executing query
$factory->createConnection($uri)->then(function (ConnectionInterface $connection) use ($query, $loop) {
// The protocol parser reads rather large chunked from the underlying connection
// and as such can yield multiple (dozens to hundreds) rows from a single data
// chunk. We try to artifically limit the stream chunk size here to try to
// only ever read a single row so we can demonstrate throttling this stream.
// It goes without saying this is only a hack! Real world applications rarely
// have the need to limit the chunk size. As an alternative, consider using
// a stream decorator that rate-limits and buffers the resulting flow.
try {
// accept private "stream" (instanceof React\Socket\ConnectionInterface)
$ref = new ReflectionProperty($connection, 'stream');
$ref->setAccessible(true);
$conn = $ref->getValue($connection);

// access private "input" (instanceof React\Stream\DuplexStreamInterface)
$ref = new ReflectionProperty($conn, 'input');
$ref->setAccessible(true);
$stream = $ref->getValue($conn);

// reduce private bufferSize to just a few bytes to slow things down
$ref = new ReflectionProperty($stream, 'bufferSize');
$ref->setAccessible(true);
$ref->setValue($stream, 8);
} catch (Exception $e) {
echo 'Warning: Unable to reduce buffer size: ' . $e->getMessage() . PHP_EOL;
}

$stream = $connection->queryStream($query);

$throttle = null;
$stream->on('data', function ($row) use ($loop, &$throttle, $stream) {
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;

// simple throttle mechanism: explicitly pause the result stream and
// resume it again after some time.
if ($throttle === null) {
$throttle = $loop->addTimer(1.0, function () use ($stream, &$throttle) {
$throttle = null;
$stream->resume();
});
$stream->pause();
}
});

$stream->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () use ($loop, &$throttle) {
echo 'CLOSED' . PHP_EOL;

if ($throttle) {
$loop->cancelTimer($throttle);
$throttle = null;
}
});

$connection->quit();
}, 'printf');

$loop->run();
8 changes: 8 additions & 0 deletions src/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ public function query($sql, array $params = array());
* $connection->queryStream('SELECT * FROM user')->pipe($formatter)->pipe($logger);
* ```
*
* Note that as per the underlying stream definition, calling `pause()` and
* `resume()` on this stream is advisory-only, i.e. the stream MAY continue
* emitting some data until the underlying network buffer is drained. Also
* notice that the server side limits how long a connection is allowed to be
* in a state that has outgoing data. Special care should be taken to ensure
* the stream is resumed in time. This implies that using `pipe()` with a
* slow destination stream may cause the connection to abort after a while.
*
* The given `$sql` parameter MUST contain a single statement. Support
* for multiple statements is disabled for security reasons because it
* could allow for possible SQL injection attacks and this API is not
Expand Down
22 changes: 1 addition & 21 deletions src/Io/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Socket\ConnectionInterface as SocketConnectionInterface;
use React\Stream\ThroughStream;

/**
* @internal
Expand Down Expand Up @@ -115,26 +114,7 @@ public function queryStream($sql, $params = array())
$command->setQuery($query);
$this->_doCommand($command);

$stream = new ThroughStream();

// forward result set rows until result set end
$command->on('result', function ($row) use ($stream) {
$stream->write($row);
});
$command->on('end', function () use ($stream) {
$stream->end();
});

// status reply (response without result set) ends stream without data
$command->on('success', function () use ($stream) {
$stream->end();
});
$command->on('error', function ($err) use ($stream) {
$stream->emit('error', array($err));
$stream->close();
});

return $stream;
return new QueryStream($command, $this->stream);
}

public function ping()
Expand Down
94 changes: 94 additions & 0 deletions src/Io/QueryStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

namespace React\MySQL\Io;

use Evenement\EventEmitter;
use React\MySQL\Commands\QueryCommand;
use React\Socket\ConnectionInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* @internal
* @see Connection::queryStream()
*/
class QueryStream extends EventEmitter implements ReadableStreamInterface
{
private $query;
private $connection;
private $started = false;
private $closed = false;
private $paused = false;

public function __construct(QueryCommand $command, ConnectionInterface $connection)
{
$this->command = $command;
$this->connection = $connection;

// forward result set rows until result set end
$command->on('result', function ($row) {
if (!$this->started && $this->paused) {
$this->connection->pause();
}
$this->started = true;

$this->emit('data', array($row));
});
$command->on('end', function () {
$this->emit('end');
$this->close();
});

// status reply (response without result set) ends stream without data
$command->on('success', function () {
$this->emit('end');
$this->close();
});
$command->on('error', function ($err) {
$this->emit('error', array($err));
$this->close();
});
}

public function isReadable()
{
return !$this->closed;
}

public function pause()
{
$this->paused = true;
if ($this->started && !$this->closed) {
$this->connection->pause();
}
}

public function resume()
{
$this->paused = false;
if ($this->started && !$this->closed) {
$this->connection->resume();
}
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;
if ($this->started && $this->paused) {
$this->connection->resume();
}

$this->emit('close');
$this->removeAllListeners();
}

public function pipe(WritableStreamInterface $dest, array $options = [])
{
return Util::pipe($this, $dest, $options);
}
}
Loading