Skip to content

Commit

Permalink
Add Elasticsearch scan reader (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuyunfeng authored and lide committed Apr 18, 2019
1 parent b9054f5 commit e149a1f
Show file tree
Hide file tree
Showing 11 changed files with 741 additions and 5 deletions.
10 changes: 8 additions & 2 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,18 @@ size_t HttpClient::on_response_data(const void* data, size_t length) {
// return execute(callback);
// }

Status HttpClient::execute_post_request(const std::string& post_data, std::string* response) {
Status HttpClient::execute_post_request(const std::string& payload, std::string* response) {
set_method(POST);
set_post_body(post_data);
set_payload(payload);
return execute(response);
}

Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) {
set_method(DELETE);
set_payload(payload);
return execute(response);
}

Status HttpClient::execute(const std::function<bool(const void* data, size_t length)>& callback) {
_callback = &callback;
auto code = curl_easy_perform(_curl);
Expand Down
7 changes: 4 additions & 3 deletions be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class HttpClient {
curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list);
}

// you must set CURLOPT_POSTFIELDSIZE before CURLOPT_COPYPOSTFIELDS options, otherwise will cause request hanging up
void set_post_body(const std::string& post_body) {
void set_payload(const std::string& post_body) {
curl_easy_setopt(_curl, CURLOPT_POSTFIELDSIZE, (long)post_body.length());
curl_easy_setopt(_curl, CURLOPT_COPYPOSTFIELDS, post_body.c_str());
}
Expand Down Expand Up @@ -114,7 +113,9 @@ class HttpClient {
// a file to local_path
Status download(const std::string& local_path);

Status execute_post_request(const std::string& post_data, std::string* response);
Status execute_post_request(const std::string& payload, std::string* response);

Status execute_delete_request(const std::string& payload, std::string* response);

// execute a simple method, and its response is saved in response argument
Status execute(std::string* response);
Expand Down
4 changes: 4 additions & 0 deletions be/src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ add_library(Util STATIC
aes_util.cpp
string_util.cpp
md5.cpp
es_scan_reader.cpp
es_scroll_query.cpp
es_scroll_parser.cpp
)

#ADD_BE_TEST(integer-array-test)
Expand All @@ -87,3 +90,4 @@ add_library(Util STATIC
#ADD_BE_TEST(bit-util-test)
#ADD_BE_TEST(rle-test)
##ADD_BE_TEST(perf-counters-test)
##ADD_BE_TEST(es-scan-reader-test)
132 changes: 132 additions & 0 deletions be/src/util/es_scan_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <string>
#include <sstream>
#include "es_scan_reader.h"
#include "es_scroll_query.h"
#include "common/logging.h"
#include "common/status.h"
#include <map>

namespace doris {
const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields";
const std::string REQUEST_SCROLL_PATH = "_scroll";
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=shards:";
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
const std::string REQUEST_SEPARATOR = "/";
const std::string REQUEST_SCROLL_TIME = "5m";

ESScanReader::ESScanReader(const std::string& target, uint16_t size, const std::map<std::string, std::string>& props) {
LOG(INFO) << "ESScanReader ";
_target = target;
_batch_size = size;
_index = props.at(KEY_INDEX);
_type = props.at(KEY_TYPE);
if (props.find(KEY_USER_NAME) != props.end()) {
_user_name = props.at(KEY_USER_NAME);
}
if (props.find(KEY_PASS_WORD) != props.end()){
_passwd = props.at(KEY_PASS_WORD);
}
if (props.find(KEY_SHARDS) != props.end()) {
_shards = props.at(KEY_SHARDS);
}
if (props.find(KEY_QUERY) != props.end()) {
_query = props.at(KEY_QUERY);
}
_init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
_eos = false;
_parser.set_batch_size(size);
}

ESScanReader::~ESScanReader() {
}

Status ESScanReader::open() {
_is_first = true;
RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
// phase open, we cached the first response for `get_next` phase
_network_client.execute_post_request(_query, &_cached_response);
long status = _network_client.get_http_status();
if (status != 200) {
LOG(WARNING) << "invalid response http status for open: " << status;
return Status(_cached_response);
}
VLOG(1) << "open _cached response: " << _cached_response;
RETURN_IF_ERROR(_parser.parse(_cached_response));
_eos = _parser.has_next();
return Status::OK;
}

Status ESScanReader::get_next(bool* eos, std::string* response) {
// if is first scroll request, should return the cached response
if (_is_first) {
// maybe the index or shard is empty
if (_eos) {
*eos = true;
return Status::OK;
}
_is_first = false;
*eos = _eos;
*response = _cached_response;
return Status::OK;
}
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(5 * 1000);
RETURN_IF_ERROR(_network_client.execute_post_request(ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, REQUEST_SCROLL_TIME), response));
long status = _network_client.get_http_status();
if (status == 404) {
LOG(WARNING) << "request scroll search failure 404["
<< ", response: " << (response->empty() ? "empty response" : *response);
return Status("No search context found for " + _scroll_id);
}
if (status != 200) {
LOG(WARNING) << "request scroll search failure["
<< "http status" << status
<< ", response: " << (response->empty() ? "empty response" : *response);
if (status == 404) {
return Status("No search context found for " + _scroll_id);
}
return Status("request scroll search failure: " + (response->empty() ? "empty response" : *response));
}
RETURN_IF_ERROR(_parser.parse(*response));
*eos = _eos = _parser.has_next();
return Status::OK;
}

Status ESScanReader::close() {
std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH;
RETURN_IF_ERROR(_network_client.init(scratch_target));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_method(DELETE);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(5 * 1000);
std::string response;
RETURN_IF_ERROR(_network_client.execute_delete_request(ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response));
if (_network_client.get_http_status() == 200) {
return Status::OK;
} else {
return Status("es_scan_reader delete scroll context failure");
}
}
}
73 changes: 73 additions & 0 deletions be/src/util/es_scan_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>
#include "http/http_client.h"
#include "es_scroll_parser.h"

using std::string;

namespace doris {

class Status;

class ESScanReader {

public:
static constexpr const char* KEY_USER_NAME = "user";
static constexpr const char* KEY_PASS_WORD = "passwd";
static constexpr const char* KEY_INDEX = "index";
static constexpr const char* KEY_TYPE = "type";
static constexpr const char* KEY_SHARDS = "shards";
static constexpr const char* KEY_QUERY = "query";
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
ESScanReader(const std::string& target, uint16_t size, const std::map<std::string, std::string>& props);
~ESScanReader();

// launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next
Status open();
// invoke get_next to get next batch documents from elasticsearch
Status get_next(bool *eos, std::string* response);
// clear scroll context from elasticsearch
Status close();

private:
std::string _target;
std::string _user_name;
std::string _passwd;
std::string _scroll_id;
HttpClient _network_client;
std::string _index;
std::string _type;
// push down filter
std::string _query;
// elaticsearch shards to fetch document
std::string _shards;
// distinguish the first scroll phase and the following scroll
bool _is_first;
std::string _init_scroll_url;
std::string _next_scroll_url;
bool _eos;
uint16_t _batch_size;

std::string _cached_response;
ScrollParser _parser;
};
}

78 changes: 78 additions & 0 deletions be/src/util/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "es_scroll_parser.h"
#include "rapidjson/document.h"
#include "common/logging.h"
#include "common/status.h"

namespace doris {

const char* FIELD_SCROLL_ID = "_scroll_id";
const char* FIELD_HITS = "hits";
const char* FIELD_INNER_HITS = "hits";
const char* FIELD_SOURCE = "_source";
const char* FIELD_TOTAL = "total";

ScrollParser::ScrollParser() {
_eos = false;
_total = 0;
}

ScrollParser::~ScrollParser() {
}


Status ScrollParser::parse(const std::string& scroll_result) {
rapidjson::Document document_node;
document_node.Parse<0>(scroll_result.c_str());
if (!document_node.HasMember(FIELD_SCROLL_ID)) {
return Status("maybe not a scroll request");
}
rapidjson::Value &scroll_node = document_node[FIELD_SCROLL_ID];
_scroll_id = scroll_node.GetString();
// { hits: { total : 2, "hits" : [ {}, {}, {} ]}}
rapidjson::Value &outer_hits_node = document_node[FIELD_HITS];
rapidjson::Value &total = document_node[FIELD_TOTAL];
_total = total.GetInt();
if (_total == 0) {
_eos = true;
return Status::OK;
}
VLOG(1) << "es_scan_reader total hits: " << _total << " documents";
rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS];
if (!inner_hits_node.IsArray()) {
return Status("invalid response from elasticsearch");
}
_size = inner_hits_node.Size();
if (_size < _batch_size) {
_eos = true;
}
return Status::OK;
}

bool ScrollParser::has_next() {
return _eos;
}

bool ScrollParser::count() {
return _size;
}

std::string ScrollParser::get_scroll_id() {
return _scroll_id;
}
}
44 changes: 44 additions & 0 deletions be/src/util/es_scroll_parser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include<string>

namespace doris {

class Status;
class ScrollParser {

public:
ScrollParser();
~ScrollParser();
std::string get_scroll_id();
bool count();
uint32_t total();
Status parse(const std::string& scroll_result);
bool has_next();
void set_batch_size(int batch_size) {
_batch_size = batch_size;
}

private:
std::string _scroll_id;
bool _eos;
int _total;
int _size;
int _batch_size;
};
}
Loading

0 comments on commit e149a1f

Please sign in to comment.