Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support protocol packages larger than 16 MiB #47 #166

Closed
wants to merge 12 commits into from
Closed
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
php-version: ${{ matrix.php }}
coverage: xdebug
- run: composer install
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 --max-allowed-packet=20M
- run: bash tests/wait-for-mysql.sh
- run: MYSQL_USER=test MYSQL_PASSWORD=test vendor/bin/phpunit --coverage-text
if: ${{ matrix.php >= 7.3 }}
Expand All @@ -47,6 +47,6 @@ jobs:
version: lts-3.30
- run: composer self-update --2.2 # downgrade Composer for HHVM
- run: hhvm $(which composer) require phpunit/phpunit:^5 --dev --no-interaction
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5
- run: docker run -d --name mysql --net=host -e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test -e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 --max-allowed-packet=20M
- run: bash tests/wait-for-mysql.sh
- run: MYSQL_USER=test MYSQL_PASSWORD=test hhvm vendor/bin/phpunit
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ For example, to create an empty test database, you can also use a temporary
```bash
docker run -it --rm --net=host \
-e MYSQL_RANDOM_ROOT_PASSWORD=yes -e MYSQL_DATABASE=test \
-e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5
-e MYSQL_USER=test -e MYSQL_PASSWORD=test mysql:5 --max-allowed-packet=20M
```

To run the test suite, go to the project root and run:
Expand Down
85 changes: 69 additions & 16 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class Parser
* @var Executor
*/
protected $executor;
/**
* Current packet for split packet paring
*/
protected $packet = null;

public function __construct(DuplexStreamInterface $stream, Executor $executor)
{
Expand Down Expand Up @@ -150,26 +154,51 @@ public function handleData($data)
$len = $this->buffer->length();
if ($len < $this->pctSize) {
$this->debug('Waiting for complete packet with ' . $len . '/' . $this->pctSize . ' bytes');

return;
Comment on lines 156 to 157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave this blank line here, improves the readability and keeps the changelog a bit smaller

}

$packet = $this->buffer->readBuffer($this->pctSize);
if ($this->packet !== null) {
/**
* We are in packet splitting
* Append data
*/
$packet = null;
$this->packet->append($this->buffer->read($this->pctSize));
if ($this->pctSize < 0xffffff) {
/**
* We're done
*/
$packet = $this->packet;
$this->packet = null;
}
} else {
$packet = $this->buffer->readBuffer($this->pctSize);
}
/**
* Remember last packet size as split packets may have ended with 0 length packet.
*/
$lastPctSize = $this->pctSize;
$this->state = self::STATE_STANDBY;
$this->pctSize = self::PACKET_SIZE_HEADER;

try {
$this->parsePacket($packet);
} catch (\UnderflowException $e) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e));
$this->stream->close();
return;
}

if ($packet->length() !== 0) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)'));
$this->stream->close();
return;
if ($this->packet === null && $packet->length() === 0xffffff && $lastPctSize > 0) {
/**
* Start reading split packets
*/
$this->packet = $packet;
} elseif ($packet !== null) {
try {
$this->parsePacket($packet);
} catch (\UnderflowException $e) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet: ' . $e->getMessage(), 0, $e));
$this->stream->close();
return;
}
if ($packet->length() !== 0) {
$this->onError(new \UnexpectedValueException('Unexpected protocol error, received malformed packet with ' . $packet->length() . ' unknown byte(s)'));
$this->stream->close();
return;
}
}
}
}
Expand Down Expand Up @@ -251,7 +280,7 @@ private function parsePacket(Buffer $packet)
$this->debug(sprintf("AffectedRows: %d, InsertId: %d, WarningCount:%d", $this->affectedRows, $this->insertId, $this->warningCount));
$this->onSuccess();
$this->nextRequest();
} elseif ($fieldCount === 0xFE) {
} elseif ($fieldCount === 0xFE && $packet->length() < 0xfffffe) {
// EOF Packet
$packet->skip(4); // warn, status
if ($this->rsState === self::RS_STATE_ROW) {
Expand Down Expand Up @@ -377,7 +406,31 @@ public function onClose()

public function sendPacket($packet)
{
return $this->stream->write($this->buffer->buildInt3(\strlen($packet)) . $this->buffer->buildInt1($this->seq++) . $packet);
/**
* If packet is longer than 0xffffff, we should split and send many packets
*
*/
$packet_len = \strlen($packet);
if ($packet_len >= 0xffffff) {
$ret = null;
while ($packet_len > 0) {
$part = \substr($packet, 0, 0xffffff);
$part_len = \strlen($part);
$ret = $this->stream->write($this->buffer->buildInt3($part_len) . $this->buffer->buildInt1($this->seq++) . $part);
$packet = \substr($packet, $part_len);
$packet_len = \strlen($packet);
/**
* If last part was exactly 0xffffff in size, we need to send an empty packet to signal end
* of packet splitting.
*/
if (\strlen($packet) === 0 && $part_len === 0xffffff) {
$ret = $this->stream->write($this->buffer->buildInt3(0) . $this->buffer->buildInt1($this->seq++));
}
}
return $ret;
} else {
return $this->stream->write($this->buffer->buildInt3($packet_len) . $this->buffer->buildInt1($this->seq++) . $packet);
}
}

protected function nextRequest($isHandshake = false)
Expand Down
160 changes: 160 additions & 0 deletions tests/ResultQueryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -588,4 +588,164 @@ public function testQueryStreamFromLazyConnectionWillErrorWhenConnectionIsClosed

$connection->close();
}

protected function checkMaxAllowedPacket($connection, $min = 0x1100000)
{
return $connection->query('SHOW VARIABLES LIKE \'max_allowed_packet\'')->then(
function ($res) use ($min, $connection) {
$current = $res->resultRows[0]['Value'];
if ($current < $min) {
throw new \Exception('max_allowed_packet too low: current: ' . $current . ' min: ' . $min);
}
return \React\Promise\resolve();
}
)->then(
function () {
return true;
}
);
}

/**
* This should not trigger splitted packets sending
*/
public function testSelectStaticTextSplitPacketsExactlyBelow16MiB()
{
$connection = $this->createConnection(Loop::get());

$promise = $this->checkMaxAllowedPacket($connection, 0x1000000); // 16MiB

$promise->then(
function () use ($connection) {
/**
* This should be exactly below 16MiB packet
*
* x03 + "select ''" = len(10)
*/
$text = str_repeat('A', 0xffffff - 11);
$connection->query('select \'' . $text . '\'')->then(function (QueryResult $command) use ($text) {
$this->assertCount(1, $command->resultRows);
$this->assertCount(1, $command->resultRows[0]);
$this->assertSame($text, reset($command->resultRows[0]));
})->done();
}
)->otherwise(
function (\Throwable $e) {
$this->markTestIncomplete('checkMaxAllowedPacket: ' . $e->getMessage());
}
Comment on lines +633 to +635
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should skip these tests here instead of marking them as incomplete. As the documentation of PHPUnit says to mark tests as incomplete:

PHPUnit\Framework\IncompleteTest is a marker interface for marking an exception that is raised by a test method as the result of the test being incomplete or currently not implemented.

The problem here isn't that some implementation is missing, it's more of a "Your configuration doesn't fit our test" case here. This fits the example for skipping tests with PHPUnit as described inside the docs:

In the test case class’ setUp() template method we check whether the MySQLi extension is available and use the markTestSkipped() method to skip the test if it is not.

Makes more sense, what do you think?

)->always(
function () use ($connection) {
$connection->quit();
}
)->done();

Loop::run();
}

/**
* This should trigger split packets sending and
* will send additional empty packet to signal to the server that split packets has ended.
*/
public function testSelectStaticTextSplitPacketsExactly16MiB()
{
$connection = $this->createConnection(Loop::get());

$promise = $this->checkMaxAllowedPacket($connection);

Comment on lines +652 to +654
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test behaves the way you intended to. As I see it, this test will be marked as skipped if someone uses a max_allowed_packet size of 16 MiB which should be allowed in this case. This is caused by the $min default of your checkMaxAllowedPacket() function, as it will set the minimum to 17MiB and throws an exception for configurations with 16 MiB.

$promise->then(
function () use ($connection) {
/**
* This should be exactly at 16MiB packet
*
* x03 + "select ''" = len(10)
*/
$text = str_repeat('A', 0xffffff - 10);
$connection->query('select \'' . $text . '\'')->then(function (QueryResult $command) use ($text) {
$this->assertCount(1, $command->resultRows);
$this->assertCount(1, $command->resultRows[0]);
$this->assertSame($text, reset($command->resultRows[0]));
})->done();
}
)->otherwise(
function (\Throwable $e) {
$this->markTestIncomplete('checkMaxAllowedPacket: ' . $e->getMessage());
}
)->always(
function () use ($connection) {
$connection->quit();
Comment on lines +669 to +675
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PromiseInterface::always() and PromiseInterface::otherwise() functions are marked as deprecated as described in reactphp/promise, I don't think it's a good idea to use them in this context as we want to avoid using deprecated functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works on react-promise:3.x ... not react-promise:2.7.

Will only react-promise:3.x be supported? In current package.json both are supported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that both versions are supported and we already know that v3 will be the way forward. I agree that it should be compatible with both versions at this point, but looking into the future here. If we already know that this functionality will be removed at a later point in time, it makes sense to avoid using these functions, so we don't have to remove them again.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but 3.x introduced that, 2.7 does not work with catch and finally. So, when 2.7 version is removed from this package, it will be time to also change those methods.

}
)->done();

Loop::run();
}

public function testSelectStaticTextSplitPacketsAbove16MiB()
{
$connection = $this->createConnection(Loop::get());

$promise = $this->checkMaxAllowedPacket($connection);

$promise->then(
function () use ($connection) {
/**
* This should be exactly at 16MiB + 10 packet
*
* x03 + "select ''" = len(10)
*/
$text = str_repeat('A', 0xffffff);
$connection->query('select \'' . $text . '\'')->then(function (QueryResult $command) use ($text) {
$this->assertCount(1, $command->resultRows);
$this->assertCount(1, $command->resultRows[0]);
$this->assertSame($text, reset($command->resultRows[0]));
})->done();
}
)->otherwise(
function (\Throwable $e) {
$this->markTestIncomplete('checkMaxAllowedPacket: ' . $e->getMessage());
}
)->always(
function () use ($connection) {
$connection->quit();
}
)->done();

Loop::run();
}

/**
* Here we force the server to send us an empty packet when split packets are to be ended.
*/
public function testSelectStaticTextSplitPacketsExactly16MiBResponse()
{
$connection = $this->createConnection(Loop::get());

$promise = $this->checkMaxAllowedPacket($connection);

$promise->then(
function () use ($connection) {
/**
* Server response will be exatctly 16MiB, so server will send another empty packet
* to signal end of split packets.
*
* x03 + "select ''" = len(10)
*/
$text = str_repeat('A', 0xffffff - 4);
$connection->query('select \'' . $text . '\'')->then(function (QueryResult $command) use ($text) {
$this->assertCount(1, $command->resultRows);
$this->assertCount(1, $command->resultRows[0]);
$this->assertSame($text, reset($command->resultRows[0]));
})->done();
}
)->otherwise(
function (\Throwable $e) {
$this->markTestIncomplete('checkMaxAllowedPacket: ' . $e->getMessage());
}
)->always(
function () use ($connection) {
$connection->quit();
}
)->done();

Loop::run();
}
}