From 72dcd04f3f5730bb8e90e1209d84a2aab9876f14 Mon Sep 17 00:00:00 2001 From: Matthias Pigulla Date: Wed, 12 Sep 2018 13:22:31 +0200 Subject: [PATCH] Fix that MysqliStatement cannot handle streams The blob type maps BLOB (and also TEXT) columns to PHP streams. Internally, they use the ParameterType::LARGE_OBJECT (i. e. \PDO::PARAM_LOB) binding type, which suggests that efficient handling of PHP stream resources was intended. However, at least when using the mysqli driver, stream resources passed into insert() or update() are simply cast to strings. As a result, a literal string like "Resource id #126" will end up in the database. This PR fixes the issue by correctly processing streams in the MysqliStatement when they are passed with the ParameterType::LARGE_OBJECT binding type. It uses the mysqli::send_long_data() method to pass stream data in chunks to the MySQL server, thus keeping the memory footprint low. This method does not (despite claims to the contrary) allow to bypass the max_allowed_package size! The pdo_mysql driver was already capable of handling streams this way. Now this is covered by tests. Helpful documentation: - http://php.net/manual/en/mysqli-stmt.send-long-data.php - http://php.net/manual/en/mysqli-stmt.bind-param.php - see first "Note" - http://php.net/manual/en/pdo.lobs.php - https://blogs.oracle.com/oswald/phps-mysqli-extension:-storing-and-retrieving-blobs Additional notes on MySQL's max_allowed_packet: This change does not not intend to work around the max_allowed_packet setting, and quick tests show that this is not possible: When MySQL is configured to use a low max_allowed_packet value, an error will be triggered stating Parameter of prepared statement which is set through mysql_send_long_data() is longer than 'max_allowed_packet' bytes. Documentation for the underlying mysql_stmt_send_long_data() C API function suggests that max_allowed_packet is always a hard limit. References: - https://dev.mysql.com/doc/refman/8.0/en/mysql-stmt-send-long-data.html - https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_max_allowed_packet - https://bugs.mysql.com/bug.php?id=83958 What mysqli::send_long_data() seems to do is that every data chunk of data passed to it is immediately sent out to the network. I have confirmed this using tcpdump, and so the advantage might be that we can keep the memory footprint low on the PHP side while processing streams. --- .../DBAL/Driver/Mysqli/MysqliStatement.php | 68 ++++++++++++++- .../Tests/DBAL/Functional/BlobTest.php | 83 +++++++++++++++++-- 2 files changed, 144 insertions(+), 7 deletions(-) diff --git a/lib/Doctrine/DBAL/Driver/Mysqli/MysqliStatement.php b/lib/Doctrine/DBAL/Driver/Mysqli/MysqliStatement.php index 8b4f9a58006..fe7d4c1297f 100644 --- a/lib/Doctrine/DBAL/Driver/Mysqli/MysqliStatement.php +++ b/lib/Doctrine/DBAL/Driver/Mysqli/MysqliStatement.php @@ -21,11 +21,16 @@ use Doctrine\DBAL\Driver\Statement; use Doctrine\DBAL\Driver\StatementIterator; +use Doctrine\DBAL\Exception\InvalidArgumentException; use Doctrine\DBAL\FetchMode; use Doctrine\DBAL\ParameterType; use function array_combine; use function array_fill; use function count; +use function feof; +use function fread; +use function get_resource_type; +use function is_resource; use function str_repeat; /** @@ -42,7 +47,7 @@ class MysqliStatement implements \IteratorAggregate, Statement ParameterType::BOOLEAN => 'i', ParameterType::NULL => 's', ParameterType::INTEGER => 'i', - ParameterType::LARGE_OBJECT => 's', + ParameterType::LARGE_OBJECT => 'b', ]; /** @@ -169,9 +174,11 @@ public function execute($params = null) throw new MysqliException($this->_stmt->error, $this->_stmt->errno); } } else { - if (! $this->_stmt->bind_param($this->types, ...$this->_bindedValues)) { + list($types, $values, $streams) = $this->separateBoundValues(); + if (! $this->_stmt->bind_param($types, ...$values)) { throw new MysqliException($this->_stmt->error, $this->_stmt->sqlstate, $this->_stmt->errno); } + $this->sendLongData($streams); } } @@ -228,6 +235,63 @@ public function execute($params = null) return true; } + /** + * Split $this->_bindedValues into those values that need to be sent using mysqli::send_long_data() + * and those that can be bound the usual way. + * + * @return array|string> + */ + private function separateBoundValues() + { + $streams = $values = []; + $types = $this->types; + + foreach ($this->_bindedValues as $parameter => $value) { + if (! isset($types[$parameter - 1])) { + $types[$parameter - 1] = static::$_paramTypeMap[ParameterType::STRING]; + } + + if ($types[$parameter - 1] === static::$_paramTypeMap[ParameterType::LARGE_OBJECT]) { + if (is_resource($value)) { + if (get_resource_type($value) !== 'stream') { + throw new InvalidArgumentException('Resources passed with the LARGE_OBJECT parameter type must be stream resources.'); + } + $streams[$parameter] = $value; + $values[$parameter] = null; + continue; + } else { + $types[$parameter - 1] = static::$_paramTypeMap[ParameterType::STRING]; + } + } + + $values[$parameter] = $value; + } + + return [$types, $values, $streams]; + } + + /** + * Handle $this->_longData after regular query parameters have been bound + * + * @throws MysqliException + */ + private function sendLongData($streams) + { + foreach ($streams as $paramNr => $stream) { + while (! feof($stream)) { + $chunk = fread($stream, 8192); + + if ($chunk === false) { + throw new MysqliException("Failed reading the stream resource for parameter offset ${paramNr}."); + } + + if (! $this->_stmt->send_long_data($paramNr - 1, $chunk)) { + throw new MysqliException($this->_stmt->error, $this->_stmt->sqlstate, $this->_stmt->errno); + } + } + } + } + /** * Binds a array of values to bound parameters. * diff --git a/tests/Doctrine/Tests/DBAL/Functional/BlobTest.php b/tests/Doctrine/Tests/DBAL/Functional/BlobTest.php index c8249d050ee..e409f5f5f5e 100644 --- a/tests/Doctrine/Tests/DBAL/Functional/BlobTest.php +++ b/tests/Doctrine/Tests/DBAL/Functional/BlobTest.php @@ -3,11 +3,13 @@ namespace Doctrine\Tests\DBAL\Functional; use Doctrine\DBAL\Driver\PDOSqlsrv\Driver as PDOSQLSrvDriver; +use Doctrine\DBAL\FetchMode; use Doctrine\DBAL\ParameterType; use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Type; -use const CASE_LOWER; -use function array_change_key_case; +use function fopen; +use function in_array; +use function str_repeat; use function stream_get_contents; /** @@ -49,6 +51,28 @@ public function testInsert() self::assertEquals(1, $ret); } + public function testInsertProcessesStream() + { + if (in_array($this->_conn->getDatabasePlatform()->getName(), ['oracle', 'db2'], true)) { + // https://github.com/doctrine/dbal/issues/3288 for DB2 + // https://github.com/doctrine/dbal/issues/3290 for Oracle + $this->markTestIncomplete('Platform does not support stream resources as parameters'); + } + + $longBlob = str_repeat('x', 4 * 8192); // send 4 chunks + $this->_conn->insert('blob_table', [ + 'id' => 1, + 'clobfield' => 'ignored', + 'blobfield' => fopen('data://text/plain,' . $longBlob, 'r'), + ], [ + ParameterType::INTEGER, + ParameterType::STRING, + ParameterType::LARGE_OBJECT, + ]); + + $this->assertBlobContains($longBlob); + } + public function testSelect() { $this->_conn->insert('blob_table', [ @@ -86,14 +110,63 @@ public function testUpdate() $this->assertBlobContains('test2'); } + public function testUpdateProcessesStream() + { + if (in_array($this->_conn->getDatabasePlatform()->getName(), ['oracle', 'db2'], true)) { + // https://github.com/doctrine/dbal/issues/3288 for DB2 + // https://github.com/doctrine/dbal/issues/3290 for Oracle + $this->markTestIncomplete('Platform does not support stream resources as parameters'); + } + + $this->_conn->insert('blob_table', [ + 'id' => 1, + 'clobfield' => 'ignored', + 'blobfield' => 'test', + ], [ + ParameterType::INTEGER, + ParameterType::STRING, + ParameterType::LARGE_OBJECT, + ]); + + $this->_conn->update('blob_table', [ + 'id' => 1, + 'blobfield' => fopen('data://text/plain,test2', 'r'), + ], ['id' => 1], [ + ParameterType::INTEGER, + ParameterType::LARGE_OBJECT, + ]); + + $this->assertBlobContains('test2'); + } + + public function testBindParamProcessesStream() + { + if (in_array($this->_conn->getDatabasePlatform()->getName(), ['oracle', 'db2'], true)) { + // https://github.com/doctrine/dbal/issues/3288 for DB2 + // https://github.com/doctrine/dbal/issues/3290 for Oracle + $this->markTestIncomplete('Platform does not support stream resources as parameters'); + } + + $stmt = $this->_conn->prepare("INSERT INTO blob_table(id, clobfield, blobfield) VALUES (1, 'ignored', ?)"); + + $stream = null; + $stmt->bindParam(1, $stream, ParameterType::LARGE_OBJECT); + + // Bind param does late binding (bind by reference), so create the stream only now: + $stream = fopen('data://text/plain,test', 'r'); + + $stmt->execute(); + + $this->assertBlobContains('test'); + } + private function assertBlobContains($text) { - $rows = $this->_conn->fetchAll('SELECT * FROM blob_table'); + $rows = $this->_conn->query('SELECT blobfield FROM blob_table')->fetchAll(FetchMode::COLUMN); self::assertCount(1, $rows); - $row = array_change_key_case($rows[0], CASE_LOWER); - $blobValue = Type::getType('blob')->convertToPHPValue($row['blobfield'], $this->_conn->getDatabasePlatform()); + $blobValue = Type::getType('blob')->convertToPHPValue($rows[0], $this->_conn->getDatabasePlatform()); self::assertInternalType('resource', $blobValue); self::assertEquals($text, stream_get_contents($blobValue));