Skip to content

Commit 59e95a8

Browse files
committed
Merge pull request #3 from csoutherland/master
Added bulk queue support.
2 parents d1889dc + fd2263e commit 59e95a8

9 files changed

+454
-9
lines changed

ElasticSearchClient.php

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
require_once 'lib/builder/ElasticSearchDSLBuilder.php';
66

7+
require_once 'lib/ElasticSearchBulkQueue.php';
8+
79
require_once 'lib/transport/ElasticSearchTransport.php';
810
require_once 'lib/transport/ElasticSearchTransportHTTP.php';
911
require_once 'lib/transport/ElasticSearchTransportMemcached.php';
@@ -14,8 +16,8 @@
1416

1517
class ElasticSearchClient {
1618

17-
private $transport, $index, $type;
18-
19+
private $transport, $index, $type, $bulk_queue;
20+
1921
/**
2022
* Construct search client
2123
*
@@ -30,8 +32,9 @@ public function __construct($transport, $index, $type) {
3032
$this->transport = $transport;
3133
$this->setIndex($index);
3234
$this->setType($type);
35+
$this->bulk_queue = new ElasticSearchBulkQueue();
3336
}
34-
37+
3538
/**
3639
* Change what index to go against
3740
* @return void
@@ -43,7 +46,7 @@ public function setIndex($index) {
4346
$this->index = $index;
4447
$this->transport->setIndex($index);
4548
}
46-
49+
4750
/**
4851
* Change what types to act against
4952
* @return void
@@ -55,7 +58,7 @@ public function setType($type) {
5558
$this->type = $type;
5659
$this->transport->setType($type);
5760
}
58-
61+
5962
/**
6063
* Fetch a document by its id
6164
*
@@ -70,7 +73,7 @@ public function get($id, $verbose=false) {
7073
? $response
7174
: $response['_source'];
7275
}
73-
76+
7477
/**
7578
* Perform a request
7679
*
@@ -111,7 +114,7 @@ public function search($query) {
111114
$result['time'] = $this->getMicroTime() - $start;
112115
return $result;
113116
}
114-
117+
115118
/**
116119
* Flush this index/type combination
117120
*
@@ -123,7 +126,7 @@ public function search($query) {
123126
public function delete($id=false, array $options = array()) {
124127
return $this->transport->delete($id, $options);
125128
}
126-
129+
127130
/**
128131
* Flush this index/type combination
129132
*
@@ -135,6 +138,52 @@ public function deleteByQuery($query, array $options = array()) {
135138
return $this->transport->deleteByQuery($query, $options);
136139
}
137140

141+
/**
142+
* Add an index operation to the bulk queue.
143+
* Use just as you would the regular index,
144+
* but be sure to submit the queue!
145+
*/
146+
public function bulkIndex($document,
147+
$id = false,
148+
array $params = array()) {
149+
150+
foreach (explode(',', $this->index) as $index) {
151+
foreach (explode(',', $this->type) as $type) {
152+
$this->bulk_queue->index($document,
153+
$index,
154+
$this->type,
155+
$id,
156+
$params);
157+
}
158+
}
159+
}
160+
161+
/**
162+
* Add a delete operation to the bulk queue.
163+
* Use just as you would the regular delete,
164+
* but be sure to submit the queue!
165+
*/
166+
public function bulkDelete($id, array $params = array()) {
167+
foreach (explode(',', $this->index) as $index) {
168+
foreach (explode(',', $this->type) as $type) {
169+
$this->bulk_queue->delete($index,
170+
$this->type,
171+
$id,
172+
$params);
173+
}
174+
}
175+
}
176+
177+
/**
178+
* Submit the bulk queue for processing.
179+
*/
180+
public function bulkSubmit(array $params = array()) {
181+
$this->bulk_queue->setParams($params);
182+
$result = $this->transport->bulk($this->bulk_queue);
183+
$this->bulk_queue = new ElasticSearchBulkQueue();
184+
return $result;
185+
}
186+
138187
private function getMicroTime() {
139188
list($usec, $sec) = explode(" ", microtime());
140189
return ((float)$usec + (float)$sec);

lib/ElasticSearchBulkQueue.php

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
3+
class ElasticSearchBulkQueue
4+
{
5+
private $_bulk_queue = array();
6+
private $_params = array();
7+
8+
public function setParams(array $params) {
9+
foreach ($params as $key => $value) {
10+
$this->_params[$key] = $value;
11+
}
12+
}
13+
14+
public function getParams() {
15+
return $this->_params;
16+
}
17+
18+
public function getHTTPParamString() {
19+
return http_build_query($this->_params);
20+
}
21+
22+
public function index($document,
23+
$index,
24+
$type,
25+
$id = false,
26+
array $metadata = array()) {
27+
28+
if (($index == null) || ($type == null)) {
29+
$msg = "An index and a type must be specified in ";
30+
$msg .= "order to index a document.";
31+
throw new ElasticSearchException($msg);
32+
}
33+
34+
$metadata['_index'] = $index;
35+
$metadata['_type'] = $type;
36+
if ($id) {
37+
$metadata['_id'] = $id;
38+
}
39+
40+
$this->_bulk_array[] = array('action' => 'index',
41+
'metadata' => $metadata,
42+
'document' => $document);
43+
44+
}
45+
46+
public function delete($index,
47+
$type,
48+
$id,
49+
array $metadata = array()) {
50+
51+
if (($index == null) || ($type == null) || ($id == null)) {
52+
$msg = "An index, a type, and an ID must be specified";
53+
$msg .= " in order to delete a document.";
54+
throw new ElasticSearchException($msg);
55+
}
56+
57+
$metadata['_index'] = $index;
58+
$metadata['_type'] = $type;
59+
$metadata['_id'] = $id;
60+
61+
$this->_bulk_array[] = array('action' => 'delete',
62+
'metadata' => $metadata);
63+
}
64+
65+
public function getPayload() {
66+
$bulk_doc = '';
67+
68+
foreach ($this->_bulk_array as $bulk_item) {
69+
$bulk_doc .= '{"';
70+
$bulk_doc .= $bulk_item['action'];
71+
$bulk_doc .= '":{';
72+
73+
$metadata = array();
74+
foreach ($bulk_item['metadata'] as $var => $val) {
75+
$metadata[] = '"'.$var.'":"'.$val.'"';
76+
}
77+
78+
$bulk_doc .= implode(',', $metadata);
79+
$bulk_doc .= "}}\n";
80+
if ($bulk_item['action'] == 'index') {
81+
$bulk_doc .=
82+
json_encode($bulk_item['document'],
83+
JSON_FORCE_OBJECT);
84+
$bulk_doc .= "\n";
85+
}
86+
}
87+
88+
return $bulk_doc;
89+
}
90+
}
91+
92+
?>

lib/transport/ElasticSearchTransport.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ abstract public function request($path,
1212
array $params = array());
1313
abstract public function delete($id = false, array $params = array());
1414
abstract public function search($query, array $params = array());
15+
abstract public function bulk($bulk_queue);
1516

1617
public function setIndex($index) {
1718
$this->index = $index;

lib/transport/ElasticSearchTransportHTTP.php

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,83 @@ protected function call($url, $method="GET", $payload=false) {
327327
return $data;
328328
}
329329

330+
public function bulk($bulk_queue) {
331+
$url = "/_bulk";
332+
$method = 'POST';
333+
$payload = $bulk_queue->getPayload();
334+
if ($bulk_queue->getParams()) {
335+
$url .= '?'.http_build_query($bulk_queue->getParams());
336+
}
337+
338+
$conn = $this->ch;
339+
$protocol = "http";
340+
$requestURL = $protocol . "://" . $this->host . $url;
341+
342+
curl_setopt($conn, CURLOPT_URL, $requestURL);
343+
curl_setopt($conn, CURLOPT_TIMEOUT, $this->timeout);
344+
curl_setopt($conn, CURLOPT_PORT, $this->port);
345+
curl_setopt($conn, CURLOPT_RETURNTRANSFER, 1);
346+
curl_setopt($conn, CURLOPT_CUSTOMREQUEST, strtoupper($method));
347+
curl_setopt($conn, CURLOPT_FORBID_REUSE , 0);
348+
349+
curl_setopt($conn, CURLOPT_POSTFIELDS, $payload);
350+
351+
$data = curl_exec($conn);
352+
353+
if ($data !== false) {
354+
$data = json_decode($data, true);
355+
} else {
356+
/**
357+
* cUrl error code reference can be found here:
358+
* http://curl.haxx.se/libcurl/c/libcurl-errors.html
359+
*/
360+
$errno = curl_errno($conn);
361+
switch ($errno)
362+
{
363+
case CURLE_UNSUPPORTED_PROTOCOL:
364+
$error = "Unsupported protocol [$protocol]";
365+
break;
366+
case CURLE_FAILED_INIT:
367+
$error = "Internal cUrl error?";
368+
break;
369+
case CURLE_URL_MALFORMAT:
370+
$error = "Malformed URL [$requestURL] -d " . json_encode($payload);
371+
break;
372+
case CURLE_COULDNT_RESOLVE_PROXY:
373+
$error = "Couldnt resolve proxy";
374+
break;
375+
case CURLE_COULDNT_RESOLVE_HOST:
376+
$error = "Couldnt resolve host";
377+
break;
378+
case CURLE_COULDNT_CONNECT:
379+
$error = "Couldnt connect to host [{$this->host}], ElasticSearch down?";
380+
break;
381+
case CURLE_OPERATION_TIMEDOUT:
382+
$error = "Operation timed out on [$requestURL]";
383+
break;
384+
default:
385+
$error = "Unknown error";
386+
if ($errno == 0)
387+
$error .= ". Non-cUrl error";
388+
break;
389+
}
390+
throw new ElasticSearchTransportHTTPException($requestURL,
391+
$method,
392+
$payload,
393+
null,
394+
$this->host,
395+
$this->port,
396+
$protocol,
397+
$error);
398+
}
399+
400+
if (array_key_exists('error', $data)) {
401+
$this->handleError($url, $method, $payload, $data);
402+
}
403+
404+
return $data;
405+
}
406+
330407
protected function handleError($url,
331408
$method,
332409
$payload,

lib/transport/ElasticSearchTransportMemcached.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,36 @@ private function call($url, $method="GET", $payload=false) {
161161
return $data;
162162
}
163163

164+
public function bulk($bulk_queue) {
165+
$url = '/_bulk';
166+
$method = 'POST';
167+
$payload = $bulk_queue->getPayload();
168+
if ($bulk_queue->getParams()) {
169+
$url .= '?'.http_build_query($bulk_queue->getParams());
170+
}
171+
172+
$conn = curl_init();
173+
curl_setopt($conn, CURLOPT_URL, "http://" . $this->host . $url);
174+
curl_setopt($conn, CURLOPT_PORT, $this->port);
175+
curl_setopt($conn, CURLOPT_RETURNTRANSFER, 1) ;
176+
curl_setopt($conn, CURLOPT_CUSTOMREQUEST, strtoupper($method));
177+
178+
curl_setopt($conn, CURLOPT_POSTFIELDS, $payload);
179+
180+
$data = curl_exec($conn);
181+
if ($data !== false) {
182+
$data = json_decode($data, true);
183+
} else {
184+
throw new Exception("Transport call to API failed");
185+
}
186+
187+
if (array_key_exists('error', $data)) {
188+
$this->handleError($url, $method, $payload, $data);
189+
}
190+
191+
return $data;
192+
}
193+
164194
private function handleError($url, $method, $payload, $response) {
165195
$err = "Request: \n";
166196
$err .= "curl -X$method http://{$this->host}:{$this->port}$url";

0 commit comments

Comments
 (0)