Skip to content

Commit

Permalink
[Enhancement] spark/flink connector export data without using column …
Browse files Browse the repository at this point in the history
…pool (StarRocks#30855)

Signed-off-by: trueeyu <lxhhust350@qq.com>
  • Loading branch information
trueeyu committed Sep 18, 2023
1 parent 8d4c683 commit 16bf1ef
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 7 deletions.
3 changes: 2 additions & 1 deletion be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ void LakeDataSource::close(RuntimeState* state) {
}

Status LakeDataSource::get_next(RuntimeState* state, ChunkPtr* chunk) {
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size(), true));
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size(),
_runtime_state->use_column_pool()));
auto* chunk_ptr = chunk->get();

do {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Status OlapScanNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool* eos) {
// is the first time of calling `get_next`, pass the second argument of `_fill_chunk_pool` as
// true to ensure that the newly allocated column objects will be returned back into the column
// pool.
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(1, first_call));
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(1, first_call && state->use_column_pool()));
eval_join_runtime_filters(chunk);
_num_rows_returned += (*chunk)->num_rows();
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
Expand Down Expand Up @@ -676,7 +676,7 @@ Status OlapScanNode::_start_scan_thread(RuntimeState* state) {
COUNTER_SET(_task_concurrency, (int64_t)concurrency);
int chunks = _chunks_per_scanner * concurrency;
_chunk_pool.reserve(chunks);
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(chunks, true));
TRY_CATCH_BAD_ALLOC(_fill_chunk_pool(chunks, state->use_column_pool()));
std::lock_guard<std::mutex> l(_mtx);
for (int i = 0; i < concurrency; i++) {
CHECK(_submit_scanner(_pending_scanners.pop(), true));
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
}

Status OlapChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) {
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size(), true));
chunk->reset(ChunkHelper::new_chunk_pooled(_prj_iter->output_schema(), _runtime_state->chunk_size(),
_runtime_state->use_column_pool()));
return _read_chunk_from_storage(_runtime_state, (*chunk).get());
}

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, c
query_options.query_type = TQueryType::EXTERNAL;
// For spark sql / flink sql, we dont use page cache.
query_options.use_page_cache = false;
query_options.use_column_pool = false;
exec_fragment_params.__set_query_options(query_options);
VLOG_ROW << "external exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
Expand Down
11 changes: 11 additions & 0 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ bool RuntimeState::use_page_cache() {
return true;
}

bool RuntimeState::use_column_pool() const {
if (config::disable_column_pool) {
return false;
}

if (_query_options.__isset.use_column_pool) {
return _query_options.use_column_pool;
}
return true;
}

Status RuntimeState::set_mem_limit_exceeded(MemTracker* tracker, int64_t failed_allocation_size,
const std::string* msg) {
DCHECK_GE(failed_allocation_size, 0);
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class RuntimeState {
void set_desc_tbl(DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int chunk_size() const { return _query_options.batch_size; }
void set_chunk_size(int chunk_size) { _query_options.batch_size = chunk_size; }
bool use_column_pool() const;
bool abort_on_default_limit_exceeded() const { return _query_options.abort_on_default_limit_exceeded; }
int64_t timestamp_ms() const { return _timestamp_ms; }
const std::string& timezone() const { return _timezone; }
Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/chunk_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ Chunk* ChunkHelper::new_chunk_pooled(const Schema& schema, size_t chunk_size, bo
columns.reserve(schema.num_fields());
for (size_t i = 0; i < schema.num_fields(); i++) {
const FieldPtr& f = schema.field(i);
auto column = (force && !config::disable_column_pool) ? column_from_pool<true>(*f, chunk_size)
: column_from_pool<false>(*f, chunk_size);
auto column = force ? column_from_pool<true>(*f, chunk_size) : column_from_pool<false>(*f, chunk_size);
column->reserve(chunk_size);
columns.emplace_back(std::move(column));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/chunk_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ChunkHelper {
// Create an empty chunk according to the |slots| and reserve it of size |n|.
static ChunkUniquePtr new_chunk(const std::vector<SlotDescriptor*>& slots, size_t n);

static Chunk* new_chunk_pooled(const Schema& schema, size_t n, bool force = true);
static Chunk* new_chunk_pooled(const Schema& schema, size_t n, bool force);

// Create a vectorized column from field .
// REQUIRE: |type| must be scalar type.
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ struct TQueryOptions {
102: optional bool enable_collect_table_level_scan_stats;

104: optional TOverflowMode overflow_mode = TOverflowMode.OUTPUT_NULL;
105: optional bool use_column_pool = true;
}


Expand Down

0 comments on commit 16bf1ef

Please sign in to comment.