Skip to content

Commit

Permalink
[Feature] Unload data via table function (StarRocks#30544)
Browse files Browse the repository at this point in the history
Signed-off-by: Letian Jiang <letian.jiang@outlook.com>
  • Loading branch information
letian-jiang authored Sep 13, 2023
1 parent 07f5efd commit 4e053d3
Show file tree
Hide file tree
Showing 32 changed files with 1,224 additions and 112 deletions.
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ set(EXEC_FILES
pipeline/scan/schema_scan_context.cpp
pipeline/sink/iceberg_table_sink_operator.cpp
pipeline/sink/hive_table_sink_operator.cpp
pipeline/sink/table_function_table_sink_operator.cpp
pipeline/scan/morsel.cpp
pipeline/scan/chunk_buffer_limiter.cpp
pipeline/sink/file_sink_operator.cpp
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "runtime/result_sink.h"
#include "runtime/runtime_state.h"
#include "runtime/schema_table_sink.h"
#include "runtime/table_function_table_sink.h"

namespace starrocks {

Expand Down Expand Up @@ -160,6 +161,13 @@ Status DataSink::create_data_sink(RuntimeState* state, const TDataSink& thrift_s
*sink = std::make_unique<HiveTableSink>(state->obj_pool(), output_exprs);
break;
}
case TDataSinkType::TABLE_FUNCTION_TABLE_SINK: {
if (!thrift_sink.__isset.table_function_table_sink) {
return Status::InternalError("Missing table function table sink");
}
*sink = std::make_unique<TableFunctionTableSink>(state->obj_pool(), output_exprs);
break;
}

default:
std::stringstream error_msg;
Expand Down
20 changes: 15 additions & 5 deletions be/src/exec/parquet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ RollingAsyncParquetWriter::RollingAsyncParquetWriter(
std::function<void(starrocks::parquet::AsyncFileWriter*, RuntimeState*)> commit_func, RuntimeState* state,
int32_t driver_id)
: _table_info(std::move(tableInfo)),
_max_file_size(_table_info.max_file_size),
_output_expr_ctxs(output_expr_ctxs),
_parent_profile(parent_profile),
_commit_func(std::move(commit_func)),
Expand Down Expand Up @@ -75,11 +76,13 @@ Status RollingAsyncParquetWriter::_new_file_writer() {
}

Status RollingAsyncParquetWriter::append_chunk(Chunk* chunk, RuntimeState* state) {
RETURN_IF_ERROR(get_io_status());

if (_writer == nullptr) {
RETURN_IF_ERROR(_new_file_writer());
}
// exceed file size
if (_writer->file_size() > _max_file_size) {
if (_max_file_size != -1 && _writer->file_size() > _max_file_size) {
RETURN_IF_ERROR(close_current_writer(state));
RETURN_IF_ERROR(_new_file_writer());
}
Expand Down Expand Up @@ -109,18 +112,25 @@ Status RollingAsyncParquetWriter::close(RuntimeState* state) {

bool RollingAsyncParquetWriter::closed() {
for (auto& writer : _pending_commits) {
if (writer != nullptr && writer->closed()) {
writer = nullptr;
}
if (writer != nullptr && (!writer->closed())) {
if (!writer->closed()) {
return false;
}

auto st = writer->get_io_status();
if (!st.ok()) {
set_io_status(st);
}
}

if (_writer != nullptr) {
return _writer->closed();
}

auto st = _writer->get_io_status();
if (!st.ok()) {
set_io_status(st);
}

return true;
}

Expand Down
12 changes: 11 additions & 1 deletion be/src/exec/parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct TableInfo {
bool enable_dictionary = true;
std::string partition_location = "";
std::shared_ptr<::parquet::schema::GroupNode> schema;
int64_t max_file_size = 1024 * 1024 * 1024; // 1GB
TCloudConfiguration cloud_conf;
};

Expand All @@ -60,6 +61,14 @@ class RollingAsyncParquetWriter {
bool writable() const { return _writer == nullptr || _writer->writable(); }
bool closed();

void set_io_status(const Status& status) {
if (_io_status.ok()) {
_io_status = status;
}
}

Status get_io_status() const { return _io_status; }

private:
std::string _new_file_location();

Expand All @@ -75,8 +84,9 @@ class RollingAsyncParquetWriter {
TableInfo _table_info;
int32_t _file_cnt = 0;
std::string _outfile_location;
Status _io_status;
std::vector<std::shared_ptr<starrocks::parquet::AsyncFileWriter>> _pending_commits;
int64_t _max_file_size = 1 * 1024 * 1024 * 1024; // 1GB
int64_t _max_file_size;
std::vector<ExprContext*> _output_expr_ctxs;
RuntimeProfile* _parent_profile;
std::function<void(starrocks::parquet::AsyncFileWriter*, RuntimeState*)> _commit_func;
Expand Down
60 changes: 60 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "exec/pipeline/sink/iceberg_table_sink_operator.h"
#include "exec/pipeline/sink/memory_scratch_sink_operator.h"
#include "exec/pipeline/sink/mysql_table_sink_operator.h"
#include "exec/pipeline/sink/table_function_table_sink_operator.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/scan_node.h"
#include "exec/tablet_sink.h"
Expand All @@ -59,6 +60,7 @@
#include "runtime/result_sink.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/transaction_mgr.h"
#include "runtime/table_function_table_sink.h"
#include "util/debug/query_trace.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -969,6 +971,64 @@ Status FragmentExecutor::_decompose_data_sink_to_operator(RuntimeState* runtime_
runtime_state, Operator::s_pseudo_plan_node_id_for_final_sink, hive_table_sink_op,
partition_expr_ctxs, source_operator_dop, desired_hive_sink_dop);
}
} else if (typeid(*datasink) == typeid(starrocks::TableFunctionTableSink)) {
DCHECK(thrift_sink.table_function_table_sink.__isset.target_table);
DCHECK(thrift_sink.table_function_table_sink.__isset.cloud_configuration);

const auto& target_table = thrift_sink.table_function_table_sink.target_table;
DCHECK(target_table.__isset.path);
DCHECK(target_table.__isset.file_format);
DCHECK(target_table.__isset.columns);
DCHECK(target_table.__isset.write_single_file);
DCHECK(target_table.columns.size() == output_exprs.size());

std::vector<std::string> column_names;
for (const auto& column : target_table.columns) {
column_names.push_back(column.column_name);
}

std::vector<TExpr> partition_exprs;
std::vector<std::string> partition_column_names;
if (target_table.__isset.partition_column_ids) {
for (auto id : target_table.partition_column_ids) {
partition_exprs.push_back(output_exprs[id]);
partition_column_names.push_back(target_table.columns[id].column_name);
}
}
std::vector<ExprContext*> partition_expr_ctxs;
RETURN_IF_ERROR(Expr::create_expr_trees(runtime_state->obj_pool(), partition_exprs, &partition_expr_ctxs,
runtime_state));

std::vector<ExprContext*> output_expr_ctxs;
RETURN_IF_ERROR(
Expr::create_expr_trees(runtime_state->obj_pool(), output_exprs, &output_expr_ctxs, runtime_state));

auto op = std::make_shared<TableFunctionTableSinkOperatorFactory>(
context->next_operator_id(), target_table.path, target_table.file_format, target_table.compression_type,
output_expr_ctxs, partition_expr_ctxs, column_names, partition_column_names,
target_table.write_single_file, thrift_sink.table_function_table_sink.cloud_configuration,
fragment_ctx);

size_t source_dop = fragment_ctx->pipelines().back()->source_operator_factory()->degree_of_parallelism();
size_t sink_dop = request.pipeline_sink_dop();

if (target_table.write_single_file) {
sink_dop = 1;
}

if (partition_expr_ctxs.empty()) {
if (sink_dop != source_dop) {
context->maybe_interpolate_local_passthrough_exchange_for_sink(
runtime_state, Operator::s_pseudo_plan_node_id_for_final_sink, std::move(op), source_dop,
sink_dop);
} else {
fragment_ctx->pipelines().back()->get_op_factories().emplace_back(std::move(op));
}
} else {
context->maybe_interpolate_local_key_partition_exchange_for_sink(
runtime_state, Operator::s_pseudo_plan_node_id_for_final_sink, op, partition_expr_ctxs, source_dop,
sink_dop);
}
}

return Status::OK();
Expand Down
Loading

0 comments on commit 4e053d3

Please sign in to comment.