Skip to content

Commit 4d628d2

Browse files
committed
CrateDB bulk operations: Add support for improved DML efficiency
https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations - In order to use the bulk operations interface, a `PDOStatement` needs to be prepared using the `bulkMode` option, like `->prepare($sql, ["bulkMode" => true])`. - The interface of `BulkResponse` has been made compatible with `Collection`, specifically wrt. the `getRows()` method, in order to return data from the driver without needing other proprietary methods. - In order to propagate the non-standard bulk response shape back, the user has to select the `PDO::FETCH_NUM` fetch style. - Documentation: Add two example programs about insert operations
1 parent 7fe0a6e commit 4d628d2

File tree

15 files changed

+687
-53
lines changed

15 files changed

+687
-53
lines changed

CHANGES.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ Changelog for crate-pdo
55
Unreleased
66
==========
77

8+
- Added support for `CrateDB bulk operations`_, for improved efficiency on
9+
DML operations.
10+
11+
.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
12+
813
2022/11/29 2.1.4
914
================
1015

DEVELOP.rst

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ If the environment is outdated, you can upgrade it::
4848

4949
composer update
5050

51+
If you see messages like ``Warning: No code coverage driver available`` when running
52+
the tests, you will need to install the ``xdebug`` extension::
53+
54+
pecl install xdebug
55+
56+
It may happen that you will have to re-install it, for example after your PHP
57+
version has been upgraded by your package manager.
58+
5159

5260
Running the Tests
5361
=================
@@ -60,16 +68,26 @@ You can run the tests like::
6068
# Run test suite
6169
composer run test
6270

63-
# Run code style checks
64-
composer run style
65-
6671
# Output coverage report as HTML
6772
composer run -- test --coverage-html ./report
73+
open report/index.html
6874

6975
# Run specific tests
7076
composer run -- test --filter "testFetchColumn"
7177

7278

79+
Invoke code style checks
80+
========================
81+
82+
::
83+
84+
# Run code style checks
85+
composer run check-style
86+
87+
# Some code style quirks can be automatically fixed
88+
composer run fix-style
89+
90+
7391

7492
************
7593
Using Docker
@@ -121,19 +139,6 @@ Running the Tests
121139
open build/multicover/html/index.html
122140

123141

124-
Invoke code style checks
125-
========================
126-
127-
::
128-
129-
# Run code style checks
130-
composer run check-style
131-
132-
# Some code style quirks can be automatically fixed
133-
composer run fix-style
134-
135-
136-
137142
****************************
138143
Working on the documentation
139144
****************************

docs/connect.rst

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,40 @@ method used.
221221
| | ``PDO::FETCH_OBJ`` |
222222
+----------------------------+-----------------------+
223223

224+
Bulk operations
225+
===============
226+
227+
With CrateDB :ref:`crate-reference:http-bulk-ops`, suitable for ``INSERT``,
228+
``UPDATE``, and ``DELETE`` statements, you can submit multiple records, aka.
229+
batches, to CrateDB within a single operation. By using this way of communication,
230+
both the client and the server will not waste resources on building and decoding
231+
huge SQL statements, and data will also propagate more efficiently between CrateDB
232+
cluster nodes.
233+
234+
To use this mode, the ``PDOStatement`` offers a corresponding ``bulkMode`` option.
235+
When creating a statement instance with it, the ``$parameters`` data will be
236+
obtained as a **list of records**, like demonstrated in the example below.
237+
238+
Please note that you **must** use ``PDO::FETCH_NUM`` on the fetch operation,
239+
because the response object type ``BulkResponse`` is different than the regular
240+
response type ``Collection``.
241+
242+
.. code-block:: php
243+
244+
// Run insert operation.
245+
$parameters = [[5, 'foo', 1], [6, 'bar', 2], [7, 'foo', 3], [8, 'bar', 4]];
246+
$statement = $connection->prepare(
247+
'INSERT INTO test_table (id, name, int_type) VALUES (?, ?, ?)',
248+
array("bulkMode" => true));
249+
$statement->execute($parameters);
250+
251+
// Evaluate response.
252+
// MUST use `PDO::FETCH_NUM` for returning bulk operation responses.
253+
print("Total count: {$statement->rowCount()}\n");
254+
$response = $statement->fetchAll(PDO::FETCH_NUM);
255+
print_r($response);
256+
257+
224258
Next steps
225259
==========
226260

examples/insert_basic.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
/*
3+
* Basic example demonstrating how to connect to CrateDB using PHP PDO.
4+
*
5+
* Prerequisites:
6+
*
7+
* docker run --rm -it --publish=4200:4200 crate
8+
*
9+
* Synopsis:
10+
*
11+
* php examples/insert_basic.php
12+
*/
13+
include("./vendor/autoload.php");
14+
15+
error_reporting(E_ALL ^ E_DEPRECATED);
16+
17+
// Connect to CrateDB.
18+
use Crate\PDO\PDO;
19+
$connection = new PDO("crate:localhost:4200", "crate");
20+
21+
// Create database table.
22+
$connection->exec("DROP TABLE IF EXISTS test_table;");
23+
$connection->exec("CREATE TABLE test_table (id INTEGER, name STRING, int_type INTEGER);");
24+
25+
// Run insert operation.
26+
$statement = $connection->prepare('INSERT INTO test_table (id, name, int_type) VALUES (?, ?, ?)');
27+
$statement->execute([5, 'foo', 1]);
28+
$statement->execute([6, 'bar', 2]);
29+
30+
// Evaluate response.
31+
print("Total count: {$statement->rowCount()}\n");
32+
$response = $statement->fetchAll(PDO::FETCH_NUM);
33+
print_r($response);
34+
35+
// Disconnect from database.
36+
// https://www.php.net/manual/en/pdo.connections.php
37+
// https://stackoverflow.com/questions/18277233/pdo-closing-connection
38+
$statement = null;
39+
$connection = null;

examples/insert_bulk.php

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
/*
3+
* Example demonstrating how to use CrateDB's bulk operations interface for
4+
* inserting large amounts of data efficiently, using PHP PDO.
5+
*
6+
* Prerequisites:
7+
*
8+
* docker run --rm -it --publish=4200:4200 crate
9+
*
10+
* Synopsis:
11+
*
12+
* php examples/insert_bulk.php
13+
*/
14+
include("./vendor/autoload.php");
15+
16+
error_reporting(E_ALL ^ E_DEPRECATED);
17+
18+
// Connect to CrateDB.
19+
use Crate\PDO\PDO;
20+
$connection = new PDO("crate:localhost:4200", "crate");
21+
22+
// Create database table.
23+
$connection->exec("DROP TABLE IF EXISTS test_table;");
24+
$connection->exec("CREATE TABLE test_table (id INTEGER, name STRING, int_type INTEGER);");
25+
26+
// Run insert operation.
27+
$parameters = [[5, 'foo', 1], [6, 'bar', 2], [7, 'foo', 3], [8, 'bar', 4]];
28+
$statement = $connection->prepare(
29+
'INSERT INTO test_table (id, name, int_type) VALUES (?, ?, ?)',
30+
array("bulkMode" => true));
31+
$statement->execute($parameters);
32+
33+
// Evaluate response.
34+
// MUST use `PDO::FETCH_NUM` for returning bulk operation responses.
35+
print("Total count: {$statement->rowCount()}\n");
36+
$response = $statement->fetchAll(PDO::FETCH_NUM);
37+
print_r($response);
38+
39+
// Disconnect from database.
40+
// https://www.php.net/manual/en/pdo.connections.php
41+
// https://stackoverflow.com/questions/18277233/pdo-closing-connection
42+
$statement = null;
43+
$connection = null;

src/Crate/PDO/Http/ServerInterface.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
namespace Crate\PDO\Http;
2626

2727
use Crate\PDO\PDOInterface;
28+
use Crate\Stdlib\BulkResponseInterface;
2829
use Crate\Stdlib\CollectionInterface;
2930

3031
interface ServerInterface
@@ -51,6 +52,21 @@ public function configure(PDOInterface $PDO): void;
5152
*/
5253
public function execute(string $queryString, array $parameters): CollectionInterface;
5354

55+
/**
56+
* Execute the PDOStatement in "bulk operations" and return the response from server
57+
* wrapped inside a BulkResponseInterface.
58+
*
59+
* Bulk operations are only supported for `INSERT`, `UPDATE`, and `DELETE` statements.
60+
*
61+
* https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
62+
*
63+
* @param string $queryString
64+
* @param array $parameters
65+
*
66+
* @return BulkResponseInterface
67+
*/
68+
public function executeBulk(string $queryString, array $parameters): BulkResponseInterface;
69+
5470
/**
5571
* @return array
5672
*/

src/Crate/PDO/Http/ServerPool.php

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
use Crate\PDO\Exception\RuntimeException;
2828
use Crate\PDO\PDO;
2929
use Crate\PDO\PDOInterface;
30+
use Crate\Stdlib\BulkResponse;
31+
use Crate\Stdlib\BulkResponseInterface;
3032
use Crate\Stdlib\Collection;
3133
use Crate\Stdlib\CollectionInterface;
3234
use GuzzleHttp\Client;
@@ -67,7 +69,7 @@ final class ServerPool implements ServerInterface
6769
/**
6870
* Client constructor.
6971
*
70-
* @param array $servers
72+
* @param array $servers
7173
* @param ClientInterface|null $client
7274
*/
7375
public function __construct(array $servers, ClientInterface $client = null)
@@ -92,6 +94,25 @@ public function __construct(array $servers, ClientInterface $client = null)
9294
*/
9395
public function execute(string $query, array $parameters): CollectionInterface
9496
{
97+
return $this->executeGeneric($query, $parameters, false);
98+
}
99+
100+
/**
101+
* {@Inheritdoc}
102+
* @throws \GuzzleHttp\Exception\ConnectException
103+
*/
104+
public function executeBulk(string $query, array $parameters): BulkResponseInterface
105+
{
106+
return $this->executeGeneric($query, $parameters, true);
107+
}
108+
109+
/**
110+
* {@Inheritdoc}
111+
* @throws \GuzzleHttp\Exception\ConnectException
112+
*/
113+
private function executeGeneric(string $query, array $parameters, bool $bulk_mode = false)
114+
{
115+
$exception = null;
95116
$numServers = count($this->availableServers) - 1;
96117

97118
for ($i = 0; $i <= $numServers; $i++) {
@@ -101,24 +122,8 @@ public function execute(string $query, array $parameters): CollectionInterface
101122
// Move the selected server to the end of the stack
102123
$this->availableServers[] = array_shift($this->availableServers);
103124

104-
$options = array_merge($this->httpOptions, [
105-
'base_uri' => sprintf('%s://%s', $this->protocol, $server),
106-
'json' => [
107-
'stmt' => $query,
108-
'args' => $parameters,
109-
],
110-
]);
111-
112125
try {
113-
$response = $this->httpClient->request('POST', '/_sql', $options);
114-
$responseBody = json_decode((string)$response->getBody(), true);
115-
116-
return new Collection(
117-
$responseBody['rows'],
118-
$responseBody['cols'],
119-
$responseBody['duration'],
120-
$responseBody['rowcount']
121-
);
126+
return $this->sendRequest($server, $query, $parameters, $bulk_mode);
122127
} catch (ConnectException $exception) {
123128
// Catch it before the BadResponseException but do nothing.
124129
continue;
@@ -130,18 +135,53 @@ public function execute(string $query, array $parameters): CollectionInterface
130135
throw new RuntimeException(sprintf('Server returned non-JSON response: %s', $body), 0, $exception);
131136
}
132137

133-
$errorCode = $json['error']['code'];
138+
$errorCode = $json['error']['code'];
134139
$errorMessage = $json['error']['message'];
135140

136141
throw new RuntimeException($errorMessage, $errorCode, $exception);
137142
}
138143
}
139144

140-
throw new ConnectException(
141-
sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
142-
$exception->getRequest(),
143-
$exception
144-
);
145+
if ($exception !== null) {
146+
throw new ConnectException(
147+
sprintf('No more servers available, exception from last server: %s', $exception->getMessage()),
148+
$exception->getRequest(),
149+
$exception
150+
);
151+
}
152+
}
153+
154+
private function sendRequest(string $server, string $query, array $parameters, bool $bulk_mode = false)
155+
{
156+
$args_name = 'args';
157+
if ($bulk_mode) {
158+
$args_name = 'bulk_args';
159+
}
160+
$options = array_merge($this->httpOptions, [
161+
'base_uri' => sprintf('%s://%s', $this->protocol, $server),
162+
'json' => [
163+
'stmt' => $query,
164+
$args_name => $parameters,
165+
],
166+
]);
167+
168+
$response = $this->httpClient->request('POST', '/_sql', $options);
169+
$responseBody = json_decode((string)$response->getBody(), true);
170+
171+
if ($bulk_mode) {
172+
return new BulkResponse(
173+
$responseBody['results'],
174+
$responseBody['cols'],
175+
$responseBody['duration']
176+
);
177+
} else {
178+
return new Collection(
179+
$responseBody['rows'],
180+
$responseBody['cols'],
181+
$responseBody['duration'],
182+
$responseBody['rowcount']
183+
);
184+
}
145185
}
146186

147187
/**

src/Crate/PDO/PDO.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class PDO extends BasePDO implements PDOInterface
7474
'timeout' => 0.0,
7575
'auth' => [],
7676
'defaultSchema' => 'doc',
77+
'bulkMode' => false,
7778
];
7879

7980
/**
@@ -125,7 +126,11 @@ public function __construct($dsn, $username = null, $passwd = null, $options = [
125126
$this->lastStatement = $statement;
126127

127128
try {
128-
return $this->server->execute($sql, $parameters);
129+
if ($statement->isBulkMode()) {
130+
return $this->server->executeBulk($sql, $parameters);
131+
} else {
132+
return $this->server->execute($sql, $parameters);
133+
}
129134
} catch (Exception\RuntimeException $e) {
130135
if ($this->getAttribute(self::ATTR_ERRMODE) === self::ERRMODE_EXCEPTION) {
131136
throw new Exception\PDOException($e->getMessage(), $e->getCode());

0 commit comments

Comments
 (0)