Skip to content

Commit

Permalink
[Enhancement] Scale writers for connector sink when writing static pa…
Browse files Browse the repository at this point in the history
…rtition tables (#40540)

Signed-off-by: Jiao Mingye <mxdzs0612@gmail.com>
  • Loading branch information
mxdzs0612 authored Mar 27, 2024
1 parent 9b129e7 commit 2245fc3
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 6 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,9 @@ CONF_mInt32(json_flat_column_max, "20");
// Disable when pk_dump_interval_seconds <= 0
CONF_mInt64(pk_dump_interval_seconds, "3600"); // 1 hour

// Min data processed when scaling connector sink writers, default value is the same as Trino
CONF_mInt64(writer_scaling_min_size_mb, "128");

// whether enable query profile for queries initiated by spark or flink
CONF_mBool(enable_profile_for_external_plan, "false");

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/chunk_buffer_memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class ChunkBufferMemoryManager {

bool is_full() const { return _memory_usage >= _max_memory_usage || _buffered_num_rows > _max_buffered_rows; }

bool is_half_full() const { return _memory_usage * 2 >= _max_memory_usage; }

size_t get_max_input_dop() const { return _max_input_dop; }

void update_max_memory_usage(size_t max_memory_usage) {
Expand Down
26 changes: 26 additions & 0 deletions be/src/exec/pipeline/exchange/local_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,32 @@ Status PassthroughExchanger::accept(const ChunkPtr& chunk, const int32_t sink_dr
return Status::OK();
}

Status ConnectorSinkPassthroughExchanger::accept(const ChunkPtr& chunk, const int32_t sink_driver_sequence) {
size_t sources_num = _source->get_sources().size();
if (sources_num == 1) {
_source->get_sources()[0]->add_chunk(chunk);
} else {
// Scale up writers when current buffer memory utilization is more than 50% of the maximum and data processed
// is greater than current writer count * connector_sink_scaling_min_size. This also mean that we won't scale
// local writers if the writing speed can cope up with incoming data. In another word, buffer utilization is
// below 50%.
if (_writer_count < sources_num && _memory_manager->is_half_full() &&
_data_processed > _writer_count * config::writer_scaling_min_size_mb * 1024 * 1024) {
_writer_count++;
}
// set to default value in case of _source vector out of bound in multi thread
if (_writer_count > sources_num) {
_writer_count = sources_num;
}
_source->get_sources()[(_next_accept_source++) % _writer_count.load()]->add_chunk(chunk);
if (_writer_count < sources_num) {
_data_processed += chunk->bytes_usage();
}
}

return Status::OK();
}

bool LocalExchanger::need_input() const {
return !_memory_manager->is_full() && !is_all_sources_finished();
}
Expand Down
19 changes: 18 additions & 1 deletion be/src/exec/pipeline/exchange/local_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class LocalExchanger {

virtual ~LocalExchanger() = default;

enum class PassThroughType { CHUNK = 0, RANDOM = 1, ADPATIVE = 2 };
enum class PassThroughType { CHUNK = 0, RANDOM = 1, ADPATIVE = 2, SCALE = 3 };

virtual Status prepare(RuntimeState* state) { return Status::OK(); }
virtual void close(RuntimeState* state) {}
Expand Down Expand Up @@ -274,6 +274,23 @@ class PassthroughExchanger final : public LocalExchanger {
std::atomic<size_t> _next_accept_source = 0;
};

// Scale local source for connector sink
class ConnectorSinkPassthroughExchanger final : public LocalExchanger {
public:
ConnectorSinkPassthroughExchanger(const std::shared_ptr<ChunkBufferMemoryManager>& memory_manager,
LocalExchangeSourceOperatorFactory* source)
: LocalExchanger("ConnectorSinkPassthrough", memory_manager, source) {}

~ConnectorSinkPassthroughExchanger() override = default;

Status accept(const ChunkPtr& chunk, int32_t sink_driver_sequence) override;

private:
std::atomic<size_t> _next_accept_source = 0;
std::atomic<size_t> _writer_count = 1;
std::atomic<size_t> _data_processed = 0;
};

// Random shuffle for each chunk of source.
class RandomPassthroughExchanger final : public LocalExchanger {
public:
Expand Down
11 changes: 11 additions & 0 deletions be/src/exec/pipeline/pipeline_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ OpFactories PipelineBuilderContext::maybe_interpolate_local_passthrough_exchange
LocalExchanger::PassThroughType::CHUNK);
}

OpFactories PipelineBuilderContext::maybe_interpolate_local_passthrough_exchange(
RuntimeState* state, int32_t plan_node_id, OpFactories& pred_operators, int num_receivers,
LocalExchanger::PassThroughType pass_through_type) {
return _maybe_interpolate_local_passthrough_exchange(state, plan_node_id, pred_operators, num_receivers, false,
pass_through_type);
}

OpFactories PipelineBuilderContext::maybe_interpolate_local_random_passthrough_exchange(RuntimeState* state,
int32_t plan_node_id,
OpFactories& pred_operators,
Expand Down Expand Up @@ -113,6 +120,10 @@ OpFactories PipelineBuilderContext::_maybe_interpolate_local_passthrough_exchang
local_exchange = std::make_shared<AdaptivePassthroughExchanger>(mem_mgr, local_exchange_source.get());
} else if (pass_through_type == LocalExchanger::PassThroughType::RANDOM) {
local_exchange = std::make_shared<RandomPassthroughExchanger>(mem_mgr, local_exchange_source.get());
} else if (state->query_options().__isset.enable_connector_sink_writer_scaling &&
state->query_options().enable_connector_sink_writer_scaling &&
pass_through_type == LocalExchanger::PassThroughType::SCALE) {
local_exchange = std::make_shared<ConnectorSinkPassthroughExchanger>(mem_mgr, local_exchange_source.get());
} else {
local_exchange = std::make_shared<PassthroughExchanger>(mem_mgr, local_exchange_source.get());
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/pipeline_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class PipelineBuilderContext {
OpFactories maybe_interpolate_local_passthrough_exchange(RuntimeState* state, int32_t plan_node_id,
OpFactories& pred_operators, int num_receivers,
bool force = false);
OpFactories maybe_interpolate_local_passthrough_exchange(RuntimeState* state, int32_t plan_node_id,
OpFactories& pred_operators, int num_receivers,
LocalExchanger::PassThroughType pass_through_type);
OpFactories maybe_interpolate_local_random_passthrough_exchange(RuntimeState* state, int32_t plan_node_id,
OpFactories& pred_operators, int num_receivers,
bool force = false);
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/hive_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ Status HiveTableSink::decompose_to_pipeline(pipeline::OpFactories prev_operators
size_t sink_dop = context->data_sink_dop();
if (t_hive_sink.partition_column_names.size() == 0 || t_hive_sink.is_static_partition_sink) {
auto ops = context->maybe_interpolate_local_passthrough_exchange(
runtime_state, pipeline::Operator::s_pseudo_plan_node_id_for_final_sink, prev_operators, sink_dop);
runtime_state, pipeline::Operator::s_pseudo_plan_node_id_for_final_sink, prev_operators, sink_dop,
pipeline::LocalExchanger::PassThroughType::SCALE);
ops.emplace_back(std::move(op));
context->add_pipeline(std::move(ops));
} else {
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/iceberg_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ Status IcebergTableSink::decompose_to_pipeline(pipeline::OpFactories prev_operat

if (iceberg_table_desc->is_unpartitioned_table() || t_iceberg_sink.is_static_partition_sink) {
auto ops = context->maybe_interpolate_local_passthrough_exchange(
runtime_state, pipeline::Operator::s_pseudo_plan_node_id_for_final_sink, prev_operators, sink_dop);
runtime_state, pipeline::Operator::s_pseudo_plan_node_id_for_final_sink, prev_operators, sink_dop,
pipeline::LocalExchanger::PassThroughType::SCALE);
ops.emplace_back(std::move(op));
context->add_pipeline(std::move(ops));
} else {
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/table_function_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ Status TableFunctionTableSink::decompose_to_pipeline(pipeline::OpFactories prev_
size_t sink_dop = target_table.write_single_file ? 1 : context->data_sink_dop();
if (sink_ctx->partition_column_indices.empty()) {
auto ops = context->maybe_interpolate_local_passthrough_exchange(
runtime_state, pipeline::Operator::s_pseudo_plan_node_id_for_final_sink, prev_operators, sink_dop);
runtime_state, pipeline::Operator::s_pseudo_plan_node_id_for_final_sink, prev_operators, sink_dop,
pipeline::LocalExchanger::PassThroughType::SCALE);
ops.emplace_back(std::move(op));
context->add_pipeline(std::move(ops));

Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ public static MaterializedViewRewriteMode parse(String str) {
*/
public static final String CONNECTOR_HUGE_FILE_SIZE = "connector_huge_file_size";

public static final String ENABLE_CONNECTOR_SINK_WRITER_SCALING = "enable_connector_sink_writer_scaling";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(MAX_EXECUTION_TIME)
Expand Down Expand Up @@ -1319,7 +1321,7 @@ public static MaterializedViewRewriteMode parse(String str) {

@VarAttr(name = ENABLE_PARTITION_BUCKET_OPTIMIZE, flag = VariableMgr.INVISIBLE)
private boolean enablePartitionBucketOptimize = false;

@VarAttr(name = ENABLE_GROUP_EXECUTION)
private boolean enableGroupExecution = false;

Expand Down Expand Up @@ -1438,7 +1440,7 @@ public boolean isEnablePartitionBucketOptimize() {
public void setEnablePartitionBucketOptimize(boolean enablePartitionBucketOptimize) {
this.enablePartitionBucketOptimize = enablePartitionBucketOptimize;
}

public void setEnableGroupExecution(boolean enableGroupExecution) {
this.enableGroupExecution = enableGroupExecution;
}
Expand Down Expand Up @@ -1789,6 +1791,9 @@ public long getConnectorSinkTargetMaxFileSize() {
@VarAttr(name = CONNECTOR_HUGE_FILE_SIZE)
private long connectorHugeFileSize = 1024L * 1024L * 1024L;

@VarAttr(name = ENABLE_CONNECTOR_SINK_WRITER_SCALING)
private boolean enableConnectorSinkWriterScaling = true;

private int exprChildrenLimit = -1;

@VarAttr(name = CBO_PREDICATE_SUBFIELD_PATH, flag = VariableMgr.INVISIBLE)
Expand Down Expand Up @@ -3652,6 +3657,7 @@ public TQueryOptions toThrift() {
tResult.setEnable_hyperscan_vec(enableHyperscanVec);
tResult.setJit_level(jitLevel);
tResult.setEnable_result_sink_accumulate(enableResultSinkAccumulate);
tResult.setEnable_connector_sink_writer_scaling(enableConnectorSinkWriterScaling);
tResult.setEnable_wait_dependent_event(enableWaitDependentEvent);
tResult.setConnector_max_split_size(connectorMaxSplitSize);
tResult.setOrc_use_column_names(orcUseColumnNames);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ struct TQueryOptions {
119: optional bool enable_result_sink_accumulate;
120: optional bool enable_connector_split_io_tasks = false;
121: optional i64 connector_max_split_size = 0;
122: optional bool enable_connector_sink_writer_scaling = true;

130: optional bool enable_wait_dependent_event = false;

Expand Down

0 comments on commit 2245fc3

Please sign in to comment.