Skip to content

Commit 192a9b7

Browse files
committed
Merge pull request QHedgeTech#5 from alolis/bulk
Bulk API implementation with the help of a BulkBuilder class
2 parents a35cc3c + 1676142 commit 192a9b7

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

src/elasticsearch/elasticsearch.cpp

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <cstring>
66
#include <cassert>
77
#include <locale>
8+
#include <vector>
89

910
ElasticSearch::ElasticSearch(const std::string& node, bool readOnly): _http(node, true), _readOnly(readOnly) {
1011

@@ -309,7 +310,7 @@ long ElasticSearch::search(const std::string& index, const std::string& type, co
309310
return result.getValue("hits").getObject().getValue("total").getLong();
310311
}
311312

312-
/// Delete given type (and all documents, mappings)
313+
/// Delete given type (and all documents, mappings)
313314
bool ElasticSearch::deleteType(const std::string& index, const std::string& type){
314315
std::ostringstream uri;
315316
uri << index << "/" << type;
@@ -339,3 +340,89 @@ void ElasticSearch::refresh(const std::string& index){
339340
Json::Object msg;
340341
_http.get(oss.str().c_str(), 0, &msg);
341342
}
343+
344+
// Bulk API of ES.
345+
bool ElasticSearch::bulk(const char* data, Json::Object& jResult) {
346+
if(_readOnly)
347+
return false;
348+
349+
return (200 == _http.post("/_bulk", data, &jResult));
350+
}
351+
352+
BulkBuilder::BulkBuilder() {}
353+
354+
void BulkBuilder::createCommand(const std::string &op, const std::string &index, const std::string &type, const std::string &id = "") {
355+
Json::Object command;
356+
Json::Object commandParams;
357+
358+
if (id != "") {
359+
commandParams.addMemberByKey("_id", id);
360+
}
361+
362+
commandParams.addMemberByKey("_index", index);
363+
commandParams.addMemberByKey("_type", type);
364+
365+
command.addMemberByKey(op, commandParams);
366+
operations.push_back(command);
367+
}
368+
369+
void BulkBuilder::index(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
370+
createCommand("index", index, type, id);
371+
operations.push_back(fields);
372+
}
373+
374+
void BulkBuilder::create(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
375+
createCommand("create", index, type, id);
376+
operations.push_back(fields);
377+
}
378+
379+
void BulkBuilder::index(const std::string &index, const std::string &type, const Json::Object &fields) {
380+
createCommand("index", index, type);
381+
operations.push_back(fields);
382+
}
383+
384+
void BulkBuilder::create(const std::string &index, const std::string &type, const Json::Object &fields) {
385+
createCommand("create", index, type);
386+
operations.push_back(fields);
387+
}
388+
389+
void BulkBuilder::update(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
390+
createCommand("update", index, type, id);
391+
392+
Json::Object updateFields;
393+
updateFields.addMemberByKey("doc", fields);
394+
395+
operations.push_back(updateFields);
396+
}
397+
398+
void BulkBuilder::del(const std::string &index, const std::string &type, const std::string &id) {
399+
createCommand("delete", index, type, id);
400+
}
401+
402+
void BulkBuilder::upsert(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
403+
createCommand("update", index, type, id);
404+
405+
Json::Object updateFields;
406+
updateFields.addMemberByKey("doc", fields);
407+
updateFields.addMemberByKey("doc_as_upsert", true);
408+
409+
operations.push_back(updateFields);
410+
}
411+
412+
std::string BulkBuilder::str() {
413+
std::stringstream json;
414+
415+
for(auto &operation : operations) {
416+
json << operation.str() << std::endl;
417+
}
418+
419+
return json.str();
420+
}
421+
422+
void BulkBuilder::clear() {
423+
operations.clear();
424+
}
425+
426+
bool BulkBuilder::isEmpty() {
427+
return operations.empty();
428+
}

src/elasticsearch/elasticsearch.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <sstream>
66
#include <list>
77
#include <mutex>
8+
#include <vector>
89

910
#include "http/http.h"
1011
#include "json/json.h"
@@ -61,6 +62,9 @@ class ElasticSearch {
6162
/// Perform a scan to get all results from a query.
6263
int fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize = 1000);
6364

65+
// Bulk API
66+
bool bulk(const char*, Json::Object& jResult);
67+
6468
public:
6569
/// Delete given type (and all documents, mappings)
6670
bool deleteType(const std::string& index, const std::string& type);
@@ -89,4 +93,24 @@ class ElasticSearch {
8993
bool _readOnly;
9094
};
9195

96+
class BulkBuilder {
97+
private:
98+
std::vector<Json::Object> operations;
99+
100+
void createCommand(const std::string &op, const std::string &index, const std::string &type, const std::string &id);
101+
102+
public:
103+
BulkBuilder();
104+
void index(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
105+
void create(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
106+
void index(const std::string &index, const std::string &type, const Json::Object &fields);
107+
void create(const std::string &index, const std::string &type, const Json::Object &fields);
108+
void update(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
109+
void del(const std::string &index, const std::string &type, const std::string &id);
110+
void upsert(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
111+
void clear();
112+
std::string str();
113+
bool isEmpty();
114+
};
115+
92116
#endif // ELASTICSEARCH_H

0 commit comments

Comments
 (0)