Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
ba170aa
Fix NPE of DataDescription (#1735)
morningman Sep 2, 2019
81ca3e3
Free olap scanner out of lock (#1733)
imay Sep 2, 2019
6f4feca
Add rowset id generator to FE and BE (#1678)
yiguolei Sep 2, 2019
8034d83
Add scroll keepalive and http timeout configuration (#1731)
wuyunfeng Sep 2, 2019
b4f6f75
Add exchange in MemPool to reduce alloc/free operation (#1732)
imay Sep 2, 2019
a80e999
Move version to high 8 bit (#1736)
yiguolei Sep 2, 2019
9f5e571
Unify the msg of 'Memory exceed limit' (#1737)
EmmyMiao87 Sep 3, 2019
f76dad2
Basic implementation for BetaRowsetReader (#1718)
Sep 3, 2019
03b3991
Not add alter task to tablet in alter tablet request v2 (#1741)
yiguolei Sep 3, 2019
fddfffe
Fix bug that failed to create a new partition when no partition in a …
worker24h Sep 4, 2019
a63989c
Use RowsetFactory to create and init RowsetWriter (#1740)
Sep 4, 2019
726509e
Add MIN/MAX aggregate function compatible with char/varchar (#1739)
lxqfy Sep 4, 2019
a84c647
Shuffle partitioned instance to avoid skew (#1744)
imay Sep 4, 2019
0dc0dad
Reduce unnecessary memory allocat and copy in OlapScanNode (#1742)
imay Sep 4, 2019
85940a2
RowsetFactory as a single entry for Rowset creation (#1748)
Sep 5, 2019
3f22238
Add check for to_bitmap function argument (#1747)
kangkaisen Sep 5, 2019
da69812
Fix compile error (#1749)
imay Sep 5, 2019
54fd365
Fix bug in BetaRowsetReader which results in empty result (#1754)
Sep 6, 2019
65dcabf
Use crc32c checksum for segment v2 (#1753)
Sep 6, 2019
981e0fe
Check rowset is useful atomicly (#1750)
yiguolei Sep 6, 2019
2f52ae7
Add PreAgg Hint (#1617)
worker24h Sep 6, 2019
f23ac0e
Planner support push down predicates past agg, win and sort (#1471)
chenhao7253886 Sep 8, 2019
b85cb00
Bug-fix: error result of union stmt (#1758)
EmmyMiao87 Sep 8, 2019
fd29373
Get rid of external_sorting when rowsets have already been filtered (…
Sep 8, 2019
5acdeee
Assign schema_size from other Schema (#1768)
imay Sep 9, 2019
ca23b7a
Should create init rowset for alter task v2 (#1767)
yiguolei Sep 9, 2019
a349409
Move compare from RowCursor to row (#1764)
imay Sep 9, 2019
cd5cfea
Encapsulate HLL logic (#1756)
kangkaisen Sep 9, 2019
0f44ce9
Fix segment v2 comment (#1769)
kangkaisen Sep 9, 2019
8b663bf
Fix bug: unknown column from the inline view (#1770)
EmmyMiao87 Sep 9, 2019
044489b
Optimize some kinds of load jobs (#1762)
morningman Sep 9, 2019
5653822
Writer magic number in footer instead of header (#1771)
Sep 10, 2019
40a11c4
Fix BE crash when schema changing with HLL column (#1772)
imay Sep 10, 2019
235cdb0
Commit kafka offset (#1734)
HangyuanLiu Sep 10, 2019
bf37375
Make CpuInfo::get_current_core work (#1773)
imay Sep 10, 2019
dcdfc5f
Update .gitignore: ignore cmake dir (#1779)
kangkaisen Sep 10, 2019
5a12a1d
Fix compile error (#1780)
kangkaisen Sep 10, 2019
b327643
Fix bug that failed to limit the mem usage of HLL column when loading…
morningman Sep 11, 2019
a85ffa1
Fix FE log error (#1785)
kangkaisen Sep 11, 2019
afa9b6e
Add meta store service thrift definition (#1783)
yiguolei Sep 12, 2019
348e212
Initialize tablet uid not using default constructor for performance r…
yiguolei Sep 12, 2019
c354f30
Fix mistake in docs (#1796)
EmmyMiao87 Sep 12, 2019
f58a222
Fix bug that the calculation of disk usage percent is wrong (#1791)
morningman Sep 12, 2019
dad4def
Support estimate size for v2 segment writer (#1787)
wubiaoi Sep 12, 2019
9aa2045
Refactor alter job (#1695)
morningman Sep 12, 2019
11eafe5
Add ChunkAllocator to accelerate chunk allocation (#1792)
imay Sep 13, 2019
dcea6da
Fix Cluster meta write error (#1802)
kangkaisen Sep 13, 2019
86feddb
Fix bug that dead lock may happen when drop table during alter table …
morningman Sep 15, 2019
a232a56
Add parallel_exchange_instance_num to set parallel after exchange (#1…
xionglei0 Sep 16, 2019
973eff2
Fix tablet meta tool command argument bug (#1810)
WingsGo Sep 16, 2019
ede51da
Resolve reduce/reduce conflict in our syntax (#1811)
imay Sep 16, 2019
054a3f4
Add where expr in broker load (#1812)
EmmyMiao87 Sep 17, 2019
dc813e6
Limit the max version to cumulative compaction (#1813)
Sep 17, 2019
c4e28f0
Update FeConstants meta version to VERSION_62 (#1822)
morningman Sep 17, 2019
3f63bde
Fix 'Invalid Column Name' error when loading parquet file (#1820)
morningman Sep 17, 2019
714dca8
Support table comment and column comment for view (#1799)
morningman Sep 18, 2019
e70e48c
Add a ALTER operation to change distribution type from RANDOM to HASH…
morningman Sep 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ thirdparty/src
.settings/
.idea/
/Default/
be/cmake-build
be/cmake-build-debug
be/cmake-build-release
21 changes: 14 additions & 7 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ execute_process(
OUTPUT_VARIABLE LLVM_MODULE_LIBS
OUTPUT_STRIP_TRAILING_WHITESPACE
)

# Check if functions are supported in this platform. All flags will generated
# in gensrc/build/common/env_config.h.
# You can check funcion here which depends on platform. Don't forget add this
# to be/src/common/env_config.h.in
include(CheckFunctionExists)
check_function_exists(sched_getcpu HAVE_SCHED_GETCPU)

# compiler flags that are common across debug/release builds
# -Wall: Enable all warnings.
# -Wno-sign-compare: suppress warnings for comparison between signed and unsigned
Expand Down Expand Up @@ -461,7 +469,6 @@ include_directories(
${THIRDPARTY_DIR}/include/event/
)


set(WL_START_GROUP "-Wl,--start-group")
set(WL_END_GROUP "-Wl,--end-group")

Expand All @@ -488,12 +495,6 @@ set(DORIS_LINK_LIBS
${WL_END_GROUP}
)

if (WITH_MYSQL)
set(DORIS_DEPENDENCIES
mysql
)
endif()

# Set thirdparty libraries
set(DORIS_DEPENDENCIES
${DORIS_DEPENDENCIES}
Expand Down Expand Up @@ -540,6 +541,12 @@ if(WITH_LZO)
)
endif()

if (WITH_MYSQL)
set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
mysql
)
endif()

# Add all external dependencies. They should come after the palo libs.
# static link gcc's lib
set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS}
Expand Down
14 changes: 11 additions & 3 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ void AgentServer::submit_tasks(
break;
case TTaskType::ROLLUP:
case TTaskType::SCHEMA_CHANGE:
case TTaskType::ALTER_TASK:
if (task.__isset.alter_tablet_req) {
case TTaskType::ALTER:
if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
_alter_tablet_workers->submit_task(task);
} else {
status_code = TStatusCode::ANALYSIS_ERROR;
Expand Down Expand Up @@ -426,7 +426,15 @@ void AgentServer::make_snapshot(TAgentResult& return_value,
TStatus status;
vector<string> error_msgs;
TStatusCode::type status_code = TStatusCode::OK;
return_value.__set_snapshot_version(PREFERRED_SNAPSHOT_VERSION);
int32_t return_snapshot_version = PREFERRED_SNAPSHOT_VERSION;
// if the request's snapshot version is less than current be's snapshot version
// it means the request be is under old version. just set the request version to 1
// current be will generate snapshot files like tabletid_schemahash_startversion_endversion
// format. Every be is able to parse this format snapshot files.
if (snapshot_request.preferred_snapshot_version < PREFERRED_SNAPSHOT_VERSION) {
return_snapshot_version = 1;
}
return_value.__set_snapshot_version(return_snapshot_version);
string snapshot_path;
OLAPStatus make_snapshot_status =
SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path);
Expand Down
10 changes: 5 additions & 5 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) {
int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
if (time_elapsed > config::report_task_interval_seconds * 20) {
LOG(INFO) << "task elapsed " << time_elapsed
<< " since it is inserted to queue, it is timeout";
<< " seconds since it is inserted to queue, it is timeout";
is_task_timeout = true;
}
}
Expand All @@ -564,7 +564,7 @@ void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) {
switch (task_type) {
case TTaskType::SCHEMA_CHANGE:
case TTaskType::ROLLUP:
case TTaskType::ALTER_TASK:
case TTaskType::ALTER:
worker_pool_this->_alter_tablet(worker_pool_this,
agent_task_req,
signatrue,
Expand Down Expand Up @@ -602,8 +602,8 @@ void TaskWorkerPool::_alter_tablet(
case TTaskType::SCHEMA_CHANGE:
process_name = "schema change";
break;
case TTaskType::ALTER_TASK:
process_name = "alter table";
case TTaskType::ALTER:
process_name = "alter";
break;
default:
std::string task_name;
Expand All @@ -621,7 +621,7 @@ void TaskWorkerPool::_alter_tablet(
TSchemaHash new_schema_hash = 0;
if (status == DORIS_SUCCESS) {
OLAPStatus sc_status = OLAP_SUCCESS;
if (task_type == TTaskType::ALTER_TASK) {
if (task_type == TTaskType::ALTER) {
new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2, signature, task_type, &error_msgs, process_name);
Expand Down
3 changes: 2 additions & 1 deletion be/src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ add_library(Common STATIC
configbase.cpp
)

#ADD_BE_TEST(resource_tls_test)
# Generate env_config.h according to env_config.h.in
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/env_config.h.in ${GENSRC_DIR}/common/env_config.h)
27 changes: 24 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ namespace config {

// cumulative compaction policy: max delta file's size unit:B
CONF_Int32(cumulative_compaction_check_interval_seconds, "10");
CONF_Int64(cumulative_compaction_num_singleton_deltas, "5");
CONF_Int64(min_cumulative_compaction_num_singleton_deltas, "5");
CONF_Int64(max_cumulative_compaction_num_singleton_deltas, "1000");
CONF_Int32(cumulative_compaction_num_threads, "1");
CONF_Int32(cumulative_compaction_num_threads_per_disk, "1");
CONF_Int64(cumulative_compaction_budgeted_bytes, "104857600");
Expand Down Expand Up @@ -303,6 +304,19 @@ namespace config {

CONF_Bool(disable_mem_pools, "false");

// Whether to allocate chunk using mmap. If you enable this, you'd better to
// increase vm.max_map_count's value whose default value is 65530.
// you can do it as root via "sysctl -w vm.max_map_count=262144" or
// "echo 262144 > /proc/sys/vm/max_map_count"
// NOTE: When this is set to true, you must set chunk_reserved_bytes_limit
// to a relative large number or the performace is very very bad.
CONF_Bool(use_mmap_allocate_chunk, "false");

// Chunk Allocator's reserved bytes limit,
// Default value is 2GB, increase this variable can improve performance, but will
// aquire more free memory which can not be used by other modules
CONF_Int64(chunk_reserved_bytes_limit, "2147483648");

// The probing algorithm of partitioned hash table.
// Enable quadratic probing hash table
CONF_Bool(enable_quadratic_probing, "false");
Expand Down Expand Up @@ -344,8 +358,9 @@ namespace config {

CONF_Bool(enable_prefetch, "true");

// cpu count
CONF_Int32(flags_num_cores, "32");
// Number of cores Doris will used, this will effect only when it's greater than 0.
// Otherwise, Doris will use all cores returned from "/proc/cpuinfo".
CONF_Int32(num_cores, "0");

CONF_Bool(thread_creation_fault_injection, "false");

Expand Down Expand Up @@ -424,6 +439,12 @@ namespace config {
// note: unit is minute, default is 5min
CONF_Int32(scan_context_gc_interval_min, "5");

// es scroll keep-alive
CONF_String(es_scroll_keepalive, "5m");

// HTTP connection timeout for es
CONF_Int32(es_http_timeout_ms, "5000");

// the max client cache number per each host
// There are variety of client cache in BE, but currently we use the
// same cache size configuration.
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "util/doris_metrics.h"
#include "runtime/bufferpool/buffer_pool.h"
#include "runtime/exec_env.h"
#include "runtime/memory/chunk_allocator.h"
#include "runtime/mem_tracker.h"
#include "runtime/user_function_cache.h"
#include "exprs/operators.h"
Expand All @@ -52,6 +53,7 @@
#include "exprs/hll_hash_function.h"
#include "exprs/timezone_db.h"
#include "exprs/bitmap_function.h"
#include "exprs/hll_function.h"
#include "geo/geo_functions.h"
#include "olap/options.h"
#include "util/time.h"
Expand Down Expand Up @@ -272,6 +274,7 @@ void init_daemon(int argc, char** argv, const std::vector<StorePath>& paths) {
GeoFunctions::init();
TimezoneDatabase::init();
BitmapFunctions::init();
HllFunctions::init();

pthread_t tc_malloc_pid;
pthread_create(&tc_malloc_pid, NULL, tcmalloc_gc_thread, NULL);
Expand All @@ -284,6 +287,8 @@ void init_daemon(int argc, char** argv, const std::vector<StorePath>& paths) {
LOG(INFO) << MemInfo::debug_string();
init_doris_metrics(paths);
init_signals();

ChunkAllocator::init_instance(config::chunk_reserved_bytes_limit);
}

}
24 changes: 24 additions & 0 deletions be/src/common/env_config.h.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

namespace doris {

#cmakedefine HAVE_SCHED_GETCPU @HAVE_SCHED_GETCPU@

}
1 change: 0 additions & 1 deletion be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ set(EXEC_FILES
olap_rewrite_node.cpp
olap_scan_node.cpp
olap_scanner.cpp
olap_meta_reader.cpp
olap_common.cpp
tablet_info.cpp
tablet_sink.cpp
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Status AggregationNode::open(RuntimeState* state) {
while (true) {
bool eos = false;
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, before getting next from child 0."));
RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
// SCOPED_TIMER(_build_timer);
if (VLOG_ROW_IS_ON) {
Expand Down Expand Up @@ -227,7 +227,7 @@ Status AggregationNode::open(RuntimeState* state) {
}

// RETURN_IF_LIMIT_EXCEEDED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, after hashing the child 0."));

COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets());
COUNTER_SET(memory_used_counter(),
Expand All @@ -238,7 +238,7 @@ Status AggregationNode::open(RuntimeState* state) {

batch.reset();

RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, after setting the counter."));
if (eos) {
break;
}
Expand All @@ -262,7 +262,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, before evaluating conjuncts."));
SCOPED_TIMER(_get_results_timer);

if (reached_limit()) {
Expand All @@ -280,7 +280,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
// maintenance every N iterations.
if (count++ % N == 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(state->check_query_state("Aggregation, while evaluating conjuncts."));
}
int row_idx = row_batch->add_row();
TupleRow* row = row_batch->get_row(row_idx);
Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool*
ExprContext* ctx = _dest_expr_ctx[dest_index];
void* value = ctx->get_value(_src_tuple_row);
if (value == nullptr) {
// Only when the expr return value is null, we will check the error message.
std::string expr_error = ctx->get_error_msg();
if (!expr_error.empty()) {
_state->append_error_msg_to_file(_src_tuple_row->to_string(*(_row_desc.get())), expr_error);
_counter->num_rows_filtered++;
// The ctx is reused, so must clear the error state and message.
ctx->clear_error_msg();
return false;
}
SlotDescriptor* slot_descriptor = _src_slot_descs_order_by_dest[dest_index];
if (_strict_mode && (slot_descriptor != nullptr)&& !_src_tuple->is_null(slot_descriptor->null_indicator_offset())) {
//Type of the slot is must be Varchar in _src_tuple.
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));

// to prevent use too many memory
RETURN_IF_LIMIT_EXCEEDED(state,
"Cross join was getting next from the child.");
RETURN_IF_LIMIT_EXCEEDED(state, "Cross join, while getting next from the child 1.");

SCOPED_TIMER(_build_timer);
_build_batches.add_row_batch(batch);
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,14 +672,14 @@ void CsvScanNode::hll_hash(const char* src, int len, std::string* result) {
std::string str(src, len);
if (str != "\\N") {
uint64_t hash = HashUtil::murmur_hash64A(src, len, HashUtil::MURMUR_SEED);
char buf[HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE];
char buf[10];
// expliclit set
buf[0] = HLL_DATA_EXPLICIT;
buf[1] = 1;
*((uint64_t*)(buf + 2)) = hash;
*result = std::string(buf, sizeof(buf));
} else {
char buf[HllHashFunctions::HLL_EMPTY_SET_SIZE];
char buf[1];
// empty set
buf[0] = HLL_DATA_EMPTY;
*result = std::string(buf, sizeof(buf));
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <sstream>

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "exec/es/es_scroll_query.h"
Expand All @@ -31,9 +32,8 @@ const std::string REQUEST_SCROLL_PATH = "_scroll";
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
const std::string REQUEST_SEPARATOR = "/";
const std::string REQUEST_SCROLL_TIME = "5m";

ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props) {
ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props) : _scroll_keep_alive(config::es_scroll_keepalive), _http_timeout_ms(config::es_http_timeout_ms) {
_target = target;
_index = props.at(KEY_INDEX);
_type = props.at(KEY_TYPE);
Expand All @@ -51,7 +51,7 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string
}
std::string batch_size_str = props.at(KEY_BATCH_SIZE);
_batch_size = atoi(batch_size_str.c_str());
_init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
_init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + _scroll_keep_alive + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
_eos = false;
}
Expand Down Expand Up @@ -91,9 +91,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
_network_client.set_timeout_ms(5 * 1000);
_network_client.set_timeout_ms(_http_timeout_ms);
RETURN_IF_ERROR(_network_client.execute_post_request(
ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, REQUEST_SCROLL_TIME), &response));
ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive), &response));
long status = _network_client.get_http_status();
if (status == 404) {
LOG(WARNING) << "request scroll search failure 404["
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/es/es_scan_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class ESScanReader {
int _batch_size;

std::string _cached_response;
// keep-alive for es scroll
std::string _scroll_keep_alive;
// timeout for es http connetion
int _http_timeout_ms;
};
}

Loading