Skip to content

Commit

Permalink
[pipeline](exec) Refactor the table sink code in remove unless code (a…
Browse files Browse the repository at this point in the history
…pache#23223)

Refactor the table sink code in remove unless code
  • Loading branch information
HappenLee authored Aug 22, 2023
1 parent 2ad46c5 commit 5c2fae7
Show file tree
Hide file tree
Showing 33 changed files with 287 additions and 555 deletions.
5 changes: 1 addition & 4 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,14 @@
#include <memory>
#include <ostream>
#include <string>
#include <utility>

#include "common/config.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vjdbc_table_sink.h"
#include "vec/sink/vmemory_scratch_sink.h"
#include "vec/sink/vmysql_table_sink.h" // IWYU pragma: keep
#include "vec/sink/vodbc_table_sink.h"
#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vresult_sink.h"
#include "vec/sink/vtable_sink.h"
#include "vec/sink/vtablet_sink.h"

namespace doris {
Expand Down
11 changes: 8 additions & 3 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <vector>

#include "common/status.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"

Expand All @@ -36,7 +37,6 @@ namespace doris {
class ObjectPool;
class RuntimeState;
class TPlanFragmentExecParams;
class RowDescriptor;
class DescriptorTbl;
class QueryStatistics;
class TDataSink;
Expand All @@ -50,7 +50,7 @@ class Block;
// Superclass of all data sinks.
class DataSink {
public:
DataSink() : _closed(false) {}
DataSink(const RowDescriptor& desc) : _row_desc(desc) {}
virtual ~DataSink() {}

virtual Status init(const TDataSink& thrift_sink);
Expand Down Expand Up @@ -110,11 +110,16 @@ class DataSink {
_query_statistics = statistics;
}

const RowDescriptor& row_desc() { return _row_desc; }

virtual bool can_write() { return true; }

protected:
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed;
bool _closed = false;
std::string _name;
const RowDescriptor& _row_desc;

// Maybe this will be transferred to BufferControlBlock.
std::shared_ptr<QueryStatistics> _query_statistics;
Expand Down
7 changes: 5 additions & 2 deletions be/src/exec/odbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <sqlext.h>
#include <wchar.h>

#include <algorithm>
#include <ostream>

#include "runtime/define_primitive_type.h"
Expand Down Expand Up @@ -59,7 +58,7 @@ ODBCConnector::ODBCConnector(const ODBCConnectorParam& param)
_dbc(nullptr),
_stmt(nullptr) {}

ODBCConnector::~ODBCConnector() {
Status ODBCConnector::close() {
// do not commit transaction, roll back
if (_is_in_transaction) {
abort_trans();
Expand All @@ -77,6 +76,8 @@ ODBCConnector::~ODBCConnector() {
if (_env != nullptr) {
SQLFreeHandle(SQL_HANDLE_ENV, _env);
}

return Status::OK();
}

Status ODBCConnector::append(vectorized::Block* block,
Expand Down Expand Up @@ -267,6 +268,8 @@ Status ODBCConnector::abort_trans() {
ODBC_DISPOSE(_dbc, SQL_HANDLE_DBC, SQLEndTran(SQL_HANDLE_DBC, _dbc, SQL_ROLLBACK),
"Abort transcation");

_is_in_transaction = false;

return Status::OK();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/odbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ struct DataBinding {
class ODBCConnector : public TableConnector {
public:
explicit ODBCConnector(const ODBCConnectorParam& param);
~ODBCConnector() override;

Status open(RuntimeState* state, bool read = false);
// query for ODBC table
Expand All @@ -92,12 +91,14 @@ class ODBCConnector : public TableConnector {
TOdbcTableType::type table_type = TOdbcTableType::MYSQL) override;

const DataBinding& get_column_data(int i) const { return *_columns_data.at(i).get(); }
Status init_to_write(RuntimeProfile* profile);
Status init_to_write(RuntimeProfile* profile) override;

// Now we only treat HLL, CHAR, VARCHAR as big column
uint32_t big_column_size_buffer = config::big_column_size_buffer;
uint32_t small_column_size_buffer = config::small_column_size_buffer;

Status close() override;

private:
static Status error_status(const std::string& prefix, const std::string& error_msg);

Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/table_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TableConnector {
std::string_view table_name, const std::string& sql_str);
virtual ~TableConnector() = default;

virtual Status init_to_write(RuntimeProfile*) = 0;
// exec query for table
virtual Status query() = 0;

Expand All @@ -54,6 +55,8 @@ class TableConnector {
virtual Status abort_trans() = 0; // should be call after transaction abort
virtual Status finish_trans() = 0; // should be call after transaction commit

virtual Status close() = 0;

virtual Status exec_stmt_write(vectorized::Block* block,
const vectorized::VExprContextSPtrs& _output_vexpr_ctxs,
uint32_t* num_rows_sent) = 0;
Expand All @@ -75,8 +78,6 @@ class TableConnector {
const vectorized::DataTypePtr& type_ptr, const TypeDescriptor& type,
int row, TOdbcTableType::type table_type);

virtual Status close() { return Status::OK(); }

// Default max buffer size use in insert to: 50MB, normally a batch is smaller than the size
static constexpr uint32_t INSERT_BUFFER_SIZE = 1024l * 1024 * 50;

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

#pragma once

#include "exec/data_sink.h"
#include "operator.h"
#include "vec/sink/vtable_sink.h"

namespace doris {

namespace pipeline {

// used for VMysqlTableSink, VJdbcTableSink and VOdbcTableSink.
class TableSinkOperatorBuilder final : public DataSinkOperatorBuilder<vectorized::VTableSink> {
class TableSinkOperatorBuilder final : public DataSinkOperatorBuilder<DataSink> {
public:
TableSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "TableSinkOperator", sink) {}
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,17 @@ class JdbcConnector : public TableConnector {
Status abort_trans() override; // should be call after transaction abort
Status finish_trans() override; // should be call after transaction commit

Status init_to_write(doris::RuntimeProfile* profile) override {
init_profile(profile);
return Status::OK();
}

JdbcStatistic& get_jdbc_statistic() { return _jdbc_statistic; }

Status close() override;

protected:
const JdbcConnectorParam& _conn_param;
JdbcConnectorParam _conn_param;

private:
Status _register_func_id(JNIEnv* env);
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/sink/multi_cast_data_stream_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace doris::vectorized {
class MultiCastDataStreamSink : public DataSink {
public:
MultiCastDataStreamSink(std::shared_ptr<pipeline::MultiCastDataStreamer>& streamer)
: _multi_cast_data_streamer(streamer) {};
: DataSink(streamer->row_desc()), _multi_cast_data_streamer(streamer) {};

~MultiCastDataStreamSink() override = default;

Expand All @@ -37,12 +37,10 @@ class MultiCastDataStreamSink : public DataSink {
Status open(doris::RuntimeState* state) override { return Status::OK(); };

// use sink to check can_write, now always true after we support spill to disk
bool can_write() { return _multi_cast_data_streamer->can_write(); }
bool can_write() override { return _multi_cast_data_streamer->can_write(); }

RuntimeProfile* profile() override { return _multi_cast_data_streamer->profile(); }

const RowDescriptor& row_desc() { return _multi_cast_data_streamer->row_desc(); }

std::shared_ptr<pipeline::MultiCastDataStreamer>& get_multi_cast_data_streamer() {
return _multi_cast_data_streamer;
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
const RowDescriptor& row_desc, const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch)
: _sender_id(sender_id),
: DataSink(row_desc),
_sender_id(sender_id),
_state(state),
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
_part_type(sink.output_partition.type),
_profile(nullptr),
Expand Down Expand Up @@ -358,10 +358,10 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
bool send_query_statistics_with_every_batch)
: _sender_id(sender_id),
: DataSink(row_desc),
_sender_id(sender_id),
_state(state),
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
_part_type(TPartitionType::UNPARTITIONED),
_profile(nullptr),
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ class VDataStreamSender : public DataSink {

bool channel_all_can_write();

const RowDescriptor& row_desc() { return _row_desc; }

int sender_id() const { return _sender_id; }

RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
Expand Down Expand Up @@ -173,7 +171,6 @@ class VDataStreamSender : public DataSink {

RuntimeState* _state;
ObjectPool* _pool;
const RowDescriptor& _row_desc;

int _current_channel_idx; // index of current channel to send to if _random == true

Expand Down
99 changes: 0 additions & 99 deletions be/src/vec/sink/vjdbc_table_sink.cpp

This file was deleted.

Loading

0 comments on commit 5c2fae7

Please sign in to comment.