Skip to content

Commit

Permalink
Merge branch 'branch-2.4' of https://github.com/StarRocks/starrocks i…
Browse files Browse the repository at this point in the history
…nto branch-2.4-tq

* 'branch-2.4' of https://github.com/StarRocks/starrocks: (84 commits)
  Upgrade opencsv due to CVE-2022-42889 (StarRocks#12526)
  [Cherry-pick][BugFix] Fix complex window + agg prune bug
  [BugFix] Fix transaction stream load empty label return message (StarRocks#12372)
  [Enhancement] Fix log printing in be (StarRocks#12497)
  [BugFix] Fix use-after-free in when TabletScanner call destructor (StarRocks#12453)
  [Doc]routine bugfix (StarRocks#12464)
  Update SHOW ALTER.md (StarRocks#12423)
  Update the link for Coloate join (StarRocks#12431)
  [Bugfix] Publish timeout in commitPreparedTransaction should raise error (StarRocks#12217) (StarRocks#12300)
  [Docs] Remove /bin/sh when starting binary (StarRocks#12434)
  [BugFix] fix alter table in new publish; add detail error message for load/insert publish timeout (StarRocks#12237)
  [BugFix] fix incorrect memory metrics of jemalloc (StarRocks#12365)
  Update Data_model.md (StarRocks#12420)
  [Doc] Update StreamLoad.md in Branch 2.4 (StarRocks#12410)
  [Doc]Update filemanager.md (StarRocks#12401)
  [Doc] correctify the parameter name of sink.properties.columns  (StarRocks#12389)
  Cherry-pick MAXVALUE bugfixs to branch-2.4
  [BugFix] Fix fencing failed when a new leader is elected (StarRocks#12138)
  [cherry-pick][branch-2.4][BugFix] Support authentication for StarRocks external table (StarRocks#11871) (StarRocks#12011)
  [Doc] Update Stream_Load_transaction_interface.md in Branch 2.4 (StarRocks#12371)
  ...
  • Loading branch information
guangxuCheng committed Oct 27, 2022
2 parents c64892e + c49df01 commit 3868a8f
Show file tree
Hide file tree
Showing 184 changed files with 3,359 additions and 999 deletions.
4 changes: 2 additions & 2 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ StatusOr<HeartbeatServer::CmpResult> HeartbeatServer::compare_master_info(const

if (master_info.__isset.backend_ip) {
if (master_info.backend_ip != BackendOptions::get_localhost()) {
LOG(INFO) << master_info.backend_ip << " not equal to to backend localhost "
<< BackendOptions::get_localhost();
LOG(WARNING) << master_info.backend_ip << " not equal to to backend localhost "
<< BackendOptions::get_localhost();
bool fe_saved_is_valid_ip = is_valid_ip(master_info.backend_ip);
if (fe_saved_is_valid_ip && is_valid_ip(BackendOptions::get_localhost())) {
return Status::InternalError("FE saved address not match backend address");
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) {
if (priority_tasks.empty() || finish_task_requests.size() > PUBLISH_VERSION_BATCH_SIZE ||
batch_publish_latency > config::max_batch_publish_latency_ms) {
int64_t t0 = MonotonicMillis();
TxnManager::flush_dirs(affected_dirs);
StorageEngine::instance()->txn_manager()->flush_dirs(affected_dirs);
int64_t t1 = MonotonicMillis();
// notify FE when all tasks of group have been finished.
for (auto& finish_task_request : finish_task_requests) {
Expand Down
12 changes: 12 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ CONF_mInt64(tc_free_memory_rate, "0");
// tcmalloc gc period, default 60, it should be between [1, 180]
CONF_mInt64(tc_gc_period, "60");

CONF_mBool(enable_auto_adjust_pagecache, "true");
// Memory urget water level, if the memory usage exceeds this level, reduce the size of
// the Pagecache immediately, it should be between (memory_high_level, 100].
CONF_mInt64(memory_urgent_level, "85");
// Memory high water level, if the memory usage exceeds this level, reduce the size of
// the Pagecache slowly, it should be between [1, memory_urgent_level).
CONF_mInt64(memory_high_level, "75");
// Pagecache size adjust period, default 20, it should be between [1, 180].
CONF_mInt64(pagecache_adjust_period, "20");
// Sleep time in seconds between pagecache adjust iterations.
CONF_mInt64(auto_adjust_pagecache_interval_seconds, "10");

// Bound on the total amount of bytes allocated to thread caches.
// This bound is not strict, so it is possible for the cache to go over this bound
// in certain circumstances. The maximum value of this flag is capped to 1GB.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Status ExchangeNode::open(RuntimeState* state) {

Status ExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->merge(_sub_plan_query_statistics_recvr.get());
_sub_plan_query_statistics_recvr->aggregate(statistics);
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
namespace starrocks::pipeline {
Status ExchangeMergeSortSourceOperator::prepare(RuntimeState* state) {
SourceOperator::prepare(state);
auto query_statistic_recv = state->query_recv();
_stream_recvr = state->exec_env()->stream_mgr()->create_recvr(
state, _row_desc, state->fragment_instance_id(), _plan_node_id, _num_sender,
config::exchg_node_buffer_size_bytes, _unique_metrics, true, nullptr, true,
config::exchg_node_buffer_size_bytes, _unique_metrics, true, query_statistic_recv, true,
// ExchangeMergeSort will never perform pipeline level shuffle
DataStreamRecvr::INVALID_DOP_FOR_NON_PIPELINE_LEVEL_SHUFFLE, true);
return _stream_recvr->create_merger_for_pipeline(state, _sort_exec_exprs, &_is_asc_order, &_nulls_first);
Expand Down
42 changes: 25 additions & 17 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ class ExchangeSinkOperator::Channel {
Status init(RuntimeState* state);

// Send one chunk to remote, this chunk may be batched in this channel.
Status send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos);
Status send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos);

// Send one chunk to remote, this chunk may be batched in this channel.
// When the chunk is sent really rather than bachend, *is_real_sent will
// be set to true.
Status send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos, bool* is_real_sent);
Status send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos,
bool* is_real_sent);

// Channel will sent input request directly without batch it.
// This function is only used when broadcast, because request can be reused
// by all the channels.
Status send_chunk_request(PTransmitChunkParamsPtr chunk_request, const butil::IOBuf& attachment,
int64_t attachment_physical_bytes);
Status send_chunk_request(RuntimeState* state, PTransmitChunkParamsPtr chunk_request,
const butil::IOBuf& attachment, int64_t attachment_physical_bytes);

// Used when doing shuffle.
// This function will copy selective rows in chunks to batch.
Expand Down Expand Up @@ -171,7 +172,7 @@ Status ExchangeSinkOperator::Channel::add_rows_selective(vectorized::Chunk* chun
}

if (_chunks[driver_sequence]->num_rows() + size > state->chunk_size()) {
RETURN_IF_ERROR(send_one_chunk(_chunks[driver_sequence].get(), driver_sequence, false));
RETURN_IF_ERROR(send_one_chunk(state, _chunks[driver_sequence].get(), driver_sequence, false));
// we only clear column data, because we need to reuse column schema
_chunks[driver_sequence]->set_num_rows(0);
}
Expand All @@ -180,14 +181,14 @@ Status ExchangeSinkOperator::Channel::add_rows_selective(vectorized::Chunk* chun
return Status::OK();
}

Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence,
bool eos) {
Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk,
int32_t driver_sequence, bool eos) {
bool is_real_sent = false;
return send_one_chunk(chunk, driver_sequence, eos, &is_real_sent);
return send_one_chunk(state, chunk, driver_sequence, eos, &is_real_sent);
}

Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* chunk, int32_t driver_sequence, bool eos,
bool* is_real_sent) {
Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const vectorized::Chunk* chunk,
int32_t driver_sequence, bool eos, bool* is_real_sent) {
*is_real_sent = false;
if (_chunk_request == nullptr) {
_chunk_request = std::make_shared<PTransmitChunkParams>();
Expand Down Expand Up @@ -224,6 +225,9 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* ch
if (_current_request_bytes > config::max_transmit_batched_bytes || eos) {
_chunk_request->set_eos(eos);
_chunk_request->set_use_pass_through(_use_pass_through);
if (auto delta_statistic = state->intermediate_query_statistic()) {
delta_statistic->to_pb(_chunk_request->mutable_query_statistics());
}
butil::IOBuf attachment;
int64_t attachment_physical_bytes = _parent->construct_brpc_attachment(_chunk_request, attachment);
TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(_chunk_request), attachment,
Expand All @@ -237,7 +241,7 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(const vectorized::Chunk* ch
return Status::OK();
}

Status ExchangeSinkOperator::Channel::send_chunk_request(PTransmitChunkParamsPtr chunk_request,
Status ExchangeSinkOperator::Channel::send_chunk_request(RuntimeState* state, PTransmitChunkParamsPtr chunk_request,
const butil::IOBuf& attachment,
int64_t attachment_physical_bytes) {
chunk_request->set_node_id(_dest_node_id);
Expand All @@ -246,6 +250,10 @@ Status ExchangeSinkOperator::Channel::send_chunk_request(PTransmitChunkParamsPtr
chunk_request->set_eos(false);
chunk_request->set_use_pass_through(_use_pass_through);

if (auto delta_statistic = state->intermediate_query_statistic()) {
delta_statistic->to_pb(chunk_request->mutable_query_statistics());
}

TransmitChunkInfo info = {this->_fragment_instance_id, _brpc_stub, std::move(chunk_request), attachment,
attachment_physical_bytes};
_parent->_buffer->add_request(info);
Expand All @@ -262,10 +270,10 @@ Status ExchangeSinkOperator::Channel::_close_internal(RuntimeState* state, Fragm
if (!fragment_ctx->is_canceled()) {
for (auto driver_sequence = 0; driver_sequence < _chunks.size(); ++driver_sequence) {
if (_chunks[driver_sequence] != nullptr) {
RETURN_IF_ERROR(send_one_chunk(_chunks[driver_sequence].get(), driver_sequence, false));
RETURN_IF_ERROR(send_one_chunk(state, _chunks[driver_sequence].get(), driver_sequence, false));
}
}
RETURN_IF_ERROR(send_one_chunk(nullptr, ExchangeSinkOperator::DEFAULT_DRIVER_SEQUENCE, true));
RETURN_IF_ERROR(send_one_chunk(state, nullptr, ExchangeSinkOperator::DEFAULT_DRIVER_SEQUENCE, true));
}

return Status::OK();
Expand Down Expand Up @@ -441,7 +449,7 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C
int has_not_pass_through = false;
for (auto idx : _channel_indices) {
if (_channels[idx]->use_pass_through()) {
RETURN_IF_ERROR(_channels[idx]->send_one_chunk(send_chunk, DEFAULT_DRIVER_SEQUENCE, false));
RETURN_IF_ERROR(_channels[idx]->send_one_chunk(state, send_chunk, DEFAULT_DRIVER_SEQUENCE, false));
} else {
has_not_pass_through = true;
}
Expand All @@ -465,7 +473,7 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C
if (!_channels[idx]->use_pass_through()) {
PTransmitChunkParamsPtr copy = std::make_shared<PTransmitChunkParams>(*_chunk_request);
RETURN_IF_ERROR(
_channels[idx]->send_chunk_request(copy, attachment, attachment_physical_bytes));
_channels[idx]->send_chunk_request(state, copy, attachment, attachment_physical_bytes));
}
}
_current_request_bytes = 0;
Expand All @@ -489,7 +497,7 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C

auto& channel = local_channels[_curr_random_channel_idx];
bool real_sent = false;
RETURN_IF_ERROR(channel->send_one_chunk(send_chunk, DEFAULT_DRIVER_SEQUENCE, false, &real_sent));
RETURN_IF_ERROR(channel->send_one_chunk(state, send_chunk, DEFAULT_DRIVER_SEQUENCE, false, &real_sent));
if (real_sent) {
_curr_random_channel_idx = (_curr_random_channel_idx + 1) % local_channels.size();
}
Expand Down Expand Up @@ -581,7 +589,7 @@ Status ExchangeSinkOperator::set_finishing(RuntimeState* state) {
int64_t attachment_physical_bytes = construct_brpc_attachment(_chunk_request, attachment);
for (const auto& [_, channel] : _instance_id2channel) {
PTransmitChunkParamsPtr copy = std::make_shared<PTransmitChunkParams>(*_chunk_request);
channel->send_chunk_request(copy, attachment, attachment_physical_bytes);
channel->send_chunk_request(state, copy, attachment, attachment_physical_bytes);
}
_current_request_bytes = 0;
_chunk_request.reset();
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/pipeline/exchange/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ std::shared_ptr<DataStreamRecvr> ExchangeSourceOperatorFactory::create_stream_re
if (_stream_recvr != nullptr) {
return _stream_recvr;
}
auto query_statistic_recv = state->query_recv();
_stream_recvr = state->exec_env()->stream_mgr()->create_recvr(
state, _row_desc, state->fragment_instance_id(), _plan_node_id, _num_sender,
config::exchg_node_buffer_size_bytes, profile, false, nullptr, true, _degree_of_parallelism, false);
config::exchg_node_buffer_size_bytes, profile, false, query_statistic_recv, true, _degree_of_parallelism,
false);
return _stream_recvr;
}

Expand Down
14 changes: 12 additions & 2 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ Status FragmentExecutor::_prepare_pipeline_driver(ExecEnv* exec_env, const Unifi
std::unique_ptr<DataSink> datasink;
if (request.isset_output_sink()) {
const auto& tsink = request.output_sink();
if (tsink.type == TDataSinkType::RESULT_SINK) {
_query_ctx->set_result_sink(true);
}
RowDescriptor row_desc;
RETURN_IF_ERROR(DataSink::create_data_sink(runtime_state, tsink, fragment.output_exprs, params,
request.sender_id(), row_desc, &datasink));
Expand Down Expand Up @@ -481,11 +484,18 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
UnifiedExecPlanFragmentParams request(common_request, unique_request);

bool prepare_success = false;
DeferOp defer([this, &prepare_success]() {
if (!prepare_success) {
int64_t prepare_time = 0;
DeferOp defer([this, &request, &prepare_success, &prepare_time]() {
if (prepare_success) {
auto fragment_ctx = _query_ctx->fragment_mgr()->get(request.fragment_instance_id());
auto* prepare_timer = fragment_ctx->runtime_state()->runtime_profile()->add_counter(
"FragmentInstancePrepareTime", TUnit::TIME_NS);
COUNTER_SET(prepare_timer, prepare_time);
} else {
_fail_cleanup();
}
});
SCOPED_RAW_TIMER(&prepare_time);
RETURN_IF_ERROR(exec_env->query_pool_mem_tracker()->check_mem_limit("Start execute plan fragment."));

RETURN_IF_ERROR(_prepare_query_ctx(exec_env, request));
Expand Down
5 changes: 2 additions & 3 deletions be/src/exec/pipeline/nljoin/nljoin_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Status NLJoinContext::_init_runtime_filter(RuntimeState* state) {
DCHECK(one_row_chunk != nullptr);
auto* pool = state->obj_pool();
ASSIGN_OR_RETURN(auto rfs, vectorized::CrossJoinNode::rewrite_runtime_filter(
pool, _rf_descs, one_row_chunk.get(), _conjuncts_ctx));
pool, _rf_descs, one_row_chunk.get(), _rf_conjuncts_ctx));
_rf_hub->set_collector(_plan_node_id,
std::make_unique<RuntimeFilterCollector>(std::move(rfs), RuntimeBloomFilterList{}));
} else {
Expand Down Expand Up @@ -80,8 +80,6 @@ void NLJoinContext::append_build_chunk(int32_t sinker_id, vectorized::ChunkPtr c

Status NLJoinContext::finish_one_right_sinker(RuntimeState* state) {
if (_num_right_sinkers - 1 == _num_finished_right_sinkers.fetch_add(1)) {
RETURN_IF_ERROR(_init_runtime_filter(state));

// Accumulate chunks
ChunkAccumulator accumulator(state->chunk_size());
for (auto& sink_chunks : _input_chunks) {
Expand All @@ -99,6 +97,7 @@ Status NLJoinContext::finish_one_right_sinker(RuntimeState* state) {
_input_chunks.clear();
_input_chunks.shrink_to_fit();

RETURN_IF_ERROR(_init_runtime_filter(state));
_build_chunk_desired_size = state->chunk_size();
_all_right_finished = true;
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/exec/pipeline/nljoin/nljoin_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class NLJoinContext final : public ContextWithDependency {
_num_right_sinkers(params.num_right_sinkers),
_plan_node_id(params.plan_node_id),
_input_chunks(_num_right_sinkers),
_conjuncts_ctx(std::move(params.filters)),
_rf_conjuncts_ctx(std::move(params.filters)),
_rf_hub(params.rf_hub),
_rf_descs(std::move(params.rf_descs)) {}

Expand Down Expand Up @@ -81,8 +81,7 @@ class NLJoinContext final : public ContextWithDependency {
std::vector<uint8_t> _shared_build_match_flag;

// conjuncts in cross join, used for generate runtime_filter
std::vector<ExprContext*> _conjuncts_ctx;

std::vector<ExprContext*> _rf_conjuncts_ctx;
RuntimeFilterHub* _rf_hub;
std::vector<vectorized::RuntimeFilterBuildDescriptor*> _rf_descs;
};
Expand Down
41 changes: 24 additions & 17 deletions be/src/exec/pipeline/nljoin/nljoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@ ChunkPtr NLJoinProbeOperator::_init_output_chunk(RuntimeState* state) const {
}

Status NLJoinProbeOperator::_probe(RuntimeState* state, ChunkPtr chunk) {
// If join conjuncts are empty, most join type do not need to filter data
// Except left join and the right table is empty, in which it could not permute any chunk
// So here we need to permute_left_join for this case
vectorized::FilterPtr filter;
if (!_join_conjuncts.empty() && chunk && !chunk->is_empty()) {
size_t rows = chunk->num_rows();
Expand All @@ -205,6 +202,9 @@ Status NLJoinProbeOperator::_probe(RuntimeState* state, ChunkPtr chunk) {
}

if (_is_left_join()) {
// If join conjuncts are empty, most join type do not need to filter data
// Except left join and the right table is empty, in which it could not permute any chunk
// So here we need to permute_left_join for this case
if (_num_build_chunks() == 0) {
// Empty right table
DCHECK_EQ(_probe_row_current, _probe_chunk->num_rows());
Expand Down Expand Up @@ -233,22 +233,29 @@ Status NLJoinProbeOperator::_probe(RuntimeState* state, ChunkPtr chunk) {
}
}

if (_is_right_join() && filter) {
bool multi_probe_rows = _num_build_chunks() == 1;
if (multi_probe_rows) {
size_t num_build_rows = _cross_join_context->num_build_rows();
DCHECK_GE(filter->size(), num_build_rows);
for (size_t i = 0; i < filter->size(); i += num_build_rows) {
vectorized::ColumnHelper::or_two_filters(&_self_build_match_flag, filter->data() + i);
if (_is_right_join()) {
// If the filter and join_conjuncts are empty, it means join conjunct is always true
// So we need to mark the build_match_flag for all rows
if (_join_conjuncts.empty()) {
DCHECK(!filter);
_self_build_match_flag.assign(_self_build_match_flag.size(), 1);
} else if (filter) {
bool multi_probe_rows = _num_build_chunks() == 1;
if (multi_probe_rows) {
size_t num_build_rows = _cross_join_context->num_build_rows();
DCHECK_GE(filter->size(), num_build_rows);
for (size_t i = 0; i < filter->size(); i += num_build_rows) {
vectorized::ColumnHelper::or_two_filters(&_self_build_match_flag, filter->data() + i);
}
} else {
DCHECK_LE(_prev_chunk_size + _prev_chunk_start, _self_build_match_flag.size());
DCHECK_EQ(_prev_chunk_size, filter->size());
vectorized::ColumnHelper::or_two_filters(
_prev_chunk_size, _self_build_match_flag.data() + _prev_chunk_start, filter->data());
}
} else {
DCHECK_LE(_prev_chunk_size + _prev_chunk_start, _self_build_match_flag.size());
DCHECK_EQ(_prev_chunk_size, filter->size());
vectorized::ColumnHelper::or_two_filters(_prev_chunk_size,
_self_build_match_flag.data() + _prev_chunk_start, filter->data());
VLOG(3) << fmt::format("NLJoin operator {} set build_flags for right join, filter={}, flags={}",
_driver_sequence, fmt::join(*filter, ","), fmt::join(_self_build_match_flag, ","));
}
VLOG(3) << fmt::format("NLJoin operator {} set build_flags for right join, filter={}, flags={}",
_driver_sequence, fmt::join(*filter, ","), fmt::join(_self_build_match_flag, ","));
}

return Status::OK();
Expand Down
Loading

0 comments on commit 3868a8f

Please sign in to comment.