Skip to content

Commit 9a28ade

Browse files
authored
Merge pull request QHedgeTech#11 from frodegill/scroll-support
Scroll support
2 parents f57154d + 733c1ab commit 9a28ade

File tree

2 files changed

+76
-43
lines changed

2 files changed

+76
-43
lines changed

src/elasticsearch/elasticsearch.cpp

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -99,45 +99,6 @@ bool ElasticSearch::deleteAll(const char* index, const char* type){
9999
return (msg["_indices"].getObject()[index].getObject()["_shards"].getObject()["failed"].getInt() == 0);
100100
}
101101

102-
int ElasticSearch::fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize) {
103-
104-
// Get the scroll id
105-
std::stringstream scrollUrl;
106-
scrollUrl << index << "/" << type << "/_search?search_type=scan&scroll=10m&size=" << scrollSize;
107-
108-
Json::Object scrollObject;
109-
_http.post(scrollUrl.str().c_str(),query.c_str(),&scrollObject);
110-
111-
if(!scrollObject.member("hits"))
112-
EXCEPTION("Result corrupted, no member \"hits\".");
113-
114-
if(!scrollObject.getValue("hits").getObject().member("total"))
115-
EXCEPTION("Result corrupted, no member \"total\" nested in \"hits\".");
116-
117-
int total = scrollObject.getValue("hits").getObject().getValue("total").getInt();
118-
119-
std::string scrollId = scrollObject["_scroll_id"].getString();
120-
int count = 0;
121-
while(count < total) {
122-
123-
Json::Object result;
124-
_http.rawpost("_search/scroll?scroll=10m", scrollId.c_str(), &result);
125-
126-
// Kepp the new scroll id we received to inject in the next iteration.
127-
scrollId = result["_scroll_id"].getString();
128-
129-
for(const Json::Value& value : result["hits"].getObject()["hits"].getArray()){
130-
resultArray.addElement(value);
131-
++count;
132-
}
133-
}
134-
135-
if(count != total)
136-
EXCEPTION("Result corrupted, total is different from count.");
137-
138-
return total;
139-
}
140-
141102
// Request the document number of type T in index I.
142103
long unsigned int ElasticSearch::getDocumentCount(const char* index, const char* type){
143104
std::ostringstream oss;
@@ -341,6 +302,65 @@ void ElasticSearch::refresh(const std::string& index){
341302
_http.get(oss.str().c_str(), 0, &msg);
342303
}
343304

305+
bool ElasticSearch::initScroll(std::string& scrollId, const std::string& index, const std::string& type, const std::string& query, int scrollSize) {
306+
std::ostringstream oss;
307+
oss << index << "/" << type << "/_search?scroll=1m&search_type=scan&size=" << scrollSize;
308+
309+
Json::Object msg;
310+
if (200 != _http.post(oss.str().c_str(), query.c_str(), &msg))
311+
return false;
312+
313+
scrollId = msg["_scroll_id"].getString();
314+
return true;
315+
}
316+
317+
bool ElasticSearch::scrollNext(std::string& scrollId, Json::Array& resultArray) {
318+
Json::Object msg;
319+
if (200 != _http.post("/_search/scroll?scroll=1m", scrollId.c_str(), &msg))
320+
return false;
321+
322+
scrollId = msg["_scroll_id"].getString();
323+
324+
appendHitsToArray(msg, resultArray);
325+
return true;
326+
}
327+
328+
void ElasticSearch::clearScroll(const std::string& scrollId) {
329+
_http.remove("/_search/scroll", scrollId.c_str(), 0);
330+
}
331+
332+
int ElasticSearch::fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize) {
333+
resultArray.clear();
334+
335+
std::string scrollId;
336+
if (!initScroll(scrollId, index, type, query, scrollSize))
337+
return 0;
338+
339+
size_t currentSize=0, newSize;
340+
while (scrollNext(scrollId, resultArray))
341+
{
342+
newSize = resultArray.size();
343+
if (currentSize == newSize)
344+
break;
345+
346+
currentSize = newSize;
347+
}
348+
return currentSize;
349+
}
350+
351+
void ElasticSearch::appendHitsToArray(const Json::Object& msg, Json::Array& resultArray) {
352+
353+
if(!msg.member("hits"))
354+
EXCEPTION("Result corrupted, no member \"hits\".");
355+
356+
if(!msg.getValue("hits").getObject().member("hits"))
357+
EXCEPTION("Result corrupted, no member \"hits\" nested in \"hits\".");
358+
359+
for(const Json::Value& value : msg["hits"].getObject()["hits"].getArray()) {
360+
resultArray.addElement(value);
361+
}
362+
}
363+
344364
// Bulk API of ES.
345365
bool ElasticSearch::bulk(const char* data, Json::Object& jResult) {
346366
if(_readOnly)

src/elasticsearch/elasticsearch.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ class ElasticSearch {
5959
/// Search API of ES. Specify the doc type.
6060
long search(const std::string& index, const std::string& type, const std::string& query, Json::Object& result);
6161

62-
/// Perform a scan to get all results from a query.
63-
int fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize = 1000);
64-
6562
// Bulk API
6663
bool bulk(const char*, Json::Object& jResult);
6764

@@ -81,7 +78,23 @@ class ElasticSearch {
8178

8279
/// Refresh the index.
8380
void refresh(const std::string& index);
84-
81+
82+
public:
83+
/// Initialize a scroll search. Use the returned scroll id when calling scrollNext. Size is based on shardSize. Returns false on error
84+
bool initScroll(std::string& scrollId, const std::string& index, const std::string& type, const std::string& query, int scrollSize = 1000);
85+
86+
/// Scroll to next matches of an initialized scroll search. scroll_id may be updated. End is reached when resultArray.empty() is true (in which scroll is automatically cleared). Returns false on error.
87+
bool scrollNext(std::string& scrollId, Json::Array& resultArray);
88+
89+
/// Clear an initialized scroll search prior to its automatically 1 minute timeout
90+
void clearScroll(const std::string& scrollId);
91+
92+
/// Perform a scan to get all results from a query.
93+
int fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize = 1000);
94+
95+
private:
96+
void appendHitsToArray(const Json::Object& msg, Json::Array& resultArray);
97+
8598
private:
8699
/// Private constructor.
87100
ElasticSearch();

0 commit comments

Comments
 (0)