Skip to content

Commit

Permalink
[Refactor](Sink) convert to tablet sink to tablet writer (apache#24474)
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee authored Sep 20, 2023
1 parent fc12362 commit dc9fa1a
Show file tree
Hide file tree
Showing 43 changed files with 2,600 additions and 2,481 deletions.
23 changes: 10 additions & 13 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

#include "common/config.h"
#include "vec/sink/async_writer_sink.h"
#include "vec/sink/group_commit_vtablet_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmemory_scratch_sink.h"
Expand Down Expand Up @@ -146,21 +145,20 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node) {
sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(
new stream_load::GroupCommitVOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, true));
RETURN_IF_ERROR(status);
break;
}
Expand Down Expand Up @@ -294,12 +292,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node) {
sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
Expand All @@ -313,10 +311,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
Status status;
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(
new stream_load::GroupCommitVOlapTableSink(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, true));
RETURN_IF_ERROR(status);
break;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ODBCConnector::ODBCConnector(const ODBCConnectorParam& param)
_dbc(nullptr),
_stmt(nullptr) {}

Status ODBCConnector::close() {
Status ODBCConnector::close(Status) {
// do not commit transaction, roll back
if (_is_in_transaction) {
abort_trans();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/odbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ODBCConnector : public TableConnector {
uint32_t big_column_size_buffer = config::big_column_size_buffer;
uint32_t small_column_size_buffer = config::small_column_size_buffer;

Status close() override;
Status close(Status) override;

private:
static Status error_status(const std::string& prefix, const std::string& error_msg);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/table_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TableConnector {
virtual Status abort_trans() = 0; // should be call after transaction abort
virtual Status finish_trans() = 0; // should be call after transaction commit

virtual Status close() = 0;
virtual Status close(Status) = 0;

virtual Status exec_stmt_write(vectorized::Block* block,
const vectorized::VExprContextSPtrs& _output_vexpr_ctxs,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace doris {
namespace pipeline {

class OlapTableSinkOperatorBuilder final
: public DataSinkOperatorBuilder<stream_load::VOlapTableSink> {
: public DataSinkOperatorBuilder<vectorized::VOlapTableSink> {
public:
OlapTableSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "OlapTableSinkOperator", sink) {}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_v2_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace doris {
namespace pipeline {

class OlapTableSinkV2OperatorBuilder final
: public DataSinkOperatorBuilder<stream_load::VOlapTableSinkV2> {
: public DataSinkOperatorBuilder<vectorized::VOlapTableSinkV2> {
public:
OlapTableSinkV2OperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "OlapTableSinkV2Operator", sink) {}
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace doris {
namespace vectorized {
class VDataStreamMgr;
class ScannerScheduler;
class DeltaWriterV2Pool;
using ZoneList = std::unordered_map<std::string, cctz::time_zone>;
} // namespace vectorized
namespace pipeline {
Expand All @@ -51,7 +52,6 @@ namespace taskgroup {
class TaskGroupManager;
}
namespace stream_load {
class DeltaWriterV2Pool;
class LoadStreamStubPool;
} // namespace stream_load
namespace io {
Expand Down Expand Up @@ -241,7 +241,8 @@ class ExecEnv {
stream_load::LoadStreamStubPool* load_stream_stub_pool() {
return _load_stream_stub_pool.get();
}
stream_load::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }

vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }

void wait_for_all_tasks_done();

Expand Down Expand Up @@ -344,7 +345,7 @@ class ExecEnv {
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<stream_load::LoadStreamStubPool> _load_stream_stub_pool;
std::unique_ptr<stream_load::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;

std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
std::shared_mutex _zone_cache_rw_lock;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_load_stream_stub_pool = std::make_unique<stream_load::LoadStreamStubPool>();
_delta_writer_v2_pool = std::make_unique<stream_load::DeltaWriterV2Pool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();

_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ResultWriter {

virtual Status init(RuntimeState* state) = 0;

virtual Status close() = 0;
virtual Status close(Status s = Status::OK()) = 0;

[[nodiscard]] virtual int64_t get_written_rows() const { return _written_rows; }

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ JdbcConnector::~JdbcConnector() {

#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz);

Status JdbcConnector::close() {
Status JdbcConnector::close(Status) {
SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer);
_closed = true;
if (!_is_open) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class JdbcConnector : public TableConnector {

JdbcStatistic& get_jdbc_statistic() { return _jdbc_statistic; }

Status close() override;
Status close(Status s = Status::OK()) override;

protected:
JdbcConnectorParam _conn_param;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/async_writer_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ class AsyncWriterSink : public DataSink {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
}
RETURN_IF_ERROR(_writer->close());
RETURN_IF_ERROR(_writer->close(exec_status));
}
return DataSink::close(state, exec_status);
}

Status try_close(RuntimeState* state, Status exec_status) override {
if (state->is_cancelled() || !exec_status.ok()) {
_writer->force_close();
_writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled"));
}
return Status::OK();
}
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
#include "util/thrift_rpc_helper.h"
#include "vec/sink/vtablet_block_convertor.h"

namespace doris {
namespace stream_load {
namespace doris::vectorized {

FetchAutoIncIDExecutor::FetchAutoIncIDExecutor() {
ThreadPoolBuilder("AsyncFetchAutoIncIDExecutor")
Expand Down Expand Up @@ -125,5 +124,4 @@ void AutoIncIDBuffer::_prefetch_ids(size_t length) {
});
}

} // namespace stream_load
} // namespace doris
} // namespace doris::vectorized
6 changes: 2 additions & 4 deletions be/src/vec/sink/autoinc_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
#include "common/status.h"
#include "util/threadpool.h"

namespace doris {
namespace stream_load {
namespace doris::vectorized {

class VOlapTableSink;
class OlapTableBlockConvertor;
Expand Down Expand Up @@ -126,5 +125,4 @@ class GlobalAutoIncBuffers {
std::mutex _mutex;
};

} // namespace stream_load
} // namespace doris
} // namespace doris::vectorized
4 changes: 2 additions & 2 deletions be/src/vec/sink/delta_writer_v2_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace doris {
class TExpr;

namespace stream_load {
namespace vectorized {

DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), _use_cnt(1) {}

Expand Down Expand Up @@ -83,5 +83,5 @@ std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId loa
return map;
}

} // namespace stream_load
} // namespace vectorized
} // namespace doris
4 changes: 2 additions & 2 deletions be/src/vec/sink/delta_writer_v2_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace doris {

class DeltaWriterV2;

namespace stream_load {
namespace vectorized {

class DeltaWriterV2Map {
public:
Expand Down Expand Up @@ -107,5 +107,5 @@ class DeltaWriterV2Pool {
std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
};

} // namespace stream_load
} // namespace vectorized
} // namespace doris
40 changes: 0 additions & 40 deletions be/src/vec/sink/group_commit_vtablet_sink.cpp

This file was deleted.

34 changes: 0 additions & 34 deletions be/src/vec/sink/group_commit_vtablet_sink.h

This file was deleted.

2 changes: 1 addition & 1 deletion be/src/vec/sink/varrow_flight_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ bool VArrowFlightResultWriter::can_sink() {
return _sinker->can_sink();
}

Status VArrowFlightResultWriter::close() {
Status VArrowFlightResultWriter::close(Status) {
COUNTER_SET(_sent_rows_counter, _written_rows);
COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent);
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/varrow_flight_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class VArrowFlightResultWriter final : public ResultWriter {

bool can_sink() override;

Status close() override;
Status close(Status) override;

private:
void _init_profile();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ bool VMysqlResultWriter<is_binary_format>::can_sink() {
}

template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::close() {
Status VMysqlResultWriter<is_binary_format>::close(Status) {
COUNTER_SET(_sent_rows_counter, _written_rows);
COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent);
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vmysql_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class VMysqlResultWriter final : public ResultWriter {

bool can_sink() override;

Status close() override;
Status close(Status status) override;

const ResultList& results() { return _results; }

Expand Down
1 change: 0 additions & 1 deletion be/src/vec/sink/vresult_file_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ Status VResultFileSink::init(const TDataSink& tsink) {
CHECK(sink.__isset.storage_backend_type);
_storage_type = sink.storage_backend_type;

_name = "VResultFileSink";
//for impl csv_with_name and csv_with_names_and_types
_header_type = sink.header_type;
_header = sink.header;
Expand Down
Loading

0 comments on commit dc9fa1a

Please sign in to comment.