Skip to content

Commit

Permalink
[pipelineX](refactor) Delete subclasses inherited from Dependency (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Feb 22, 2024
1 parent bd78243 commit 83ad7a6
Show file tree
Hide file tree
Showing 79 changed files with 336 additions and 622 deletions.
12 changes: 2 additions & 10 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,12 @@ class AggSinkOperator final : public StreamingOperator<vectorized::AggregationNo
bool can_write() override { return true; }
};

class AggSinkDependency final : public Dependency {
public:
using SharedState = AggSharedState;
AggSinkDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AggSinkDependency", true, query_ctx) {}
~AggSinkDependency() override = default;
};

class AggSinkOperatorX;

class AggSinkLocalState : public PipelineXSinkLocalState<AggSinkDependency> {
class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
public:
ENABLE_FACTORY_CREATOR(AggSinkLocalState);
using Base = PipelineXSinkLocalState<DependencyType>;
using Base = PipelineXSinkLocalState<AggSharedState>;
AggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
~AggSinkLocalState() override = default;

Expand Down
12 changes: 2 additions & 10 deletions be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,11 @@ class AggSourceOperator final : public SourceOperator<vectorized::AggregationNod
Status open(RuntimeState*) override { return Status::OK(); }
};

class AggSourceDependency final : public Dependency {
public:
using SharedState = AggSharedState;
AggSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AggSourceDependency", query_ctx) {}
~AggSourceDependency() override = default;
};

class AggSourceOperatorX;

class AggLocalState final : public PipelineXLocalState<AggSourceDependency> {
class AggLocalState final : public PipelineXLocalState<AggSharedState> {
public:
using Base = PipelineXLocalState<AggSourceDependency>;
using Base = PipelineXLocalState<AggSharedState>;
ENABLE_FACTORY_CREATOR(AggLocalState);
AggLocalState(RuntimeState* state, OperatorXBase* parent);
~AggLocalState() override = default;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator)

Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSinkDependency>::init(state, info));
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<AnalyticSinkOperatorX>();
Expand Down
12 changes: 2 additions & 10 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,14 @@ class AnalyticSinkOperator final : public StreamingOperator<vectorized::VAnalyti
bool can_write() override { return _node->can_write(); }
};

class AnalyticSinkDependency final : public Dependency {
public:
using SharedState = AnalyticSharedState;
AnalyticSinkDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AnalyticSinkDependency", true, query_ctx) {}
~AnalyticSinkDependency() override = default;
};

class AnalyticSinkOperatorX;

class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSinkDependency> {
class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedState> {
ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);

public:
AnalyticSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}
: PipelineXSinkLocalState<AnalyticSharedState>(parent, state) {}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;

Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(AnalyticSourceOperator, SourceOperator)

AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<AnalyticSourceDependency>(state, parent),
: PipelineXLocalState<AnalyticSharedState>(state, parent),
_output_block_index(0),
_window_end_position(0),
_next_partition(false),
Expand Down Expand Up @@ -159,7 +159,7 @@ bool AnalyticLocalState::_whether_need_next_partition(
}

Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<AnalyticSourceDependency>::init(state, info));
RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_agg_arena_pool = std::make_unique<vectorized::Arena>();
Expand Down Expand Up @@ -564,7 +564,7 @@ Status AnalyticLocalState::close(RuntimeState* state) {

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
return PipelineXLocalState<AnalyticSourceDependency>::close(state);
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
Expand Down
10 changes: 1 addition & 9 deletions be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,8 @@ class AnalyticSourceOperator final : public SourceOperator<vectorized::VAnalytic
Status open(RuntimeState*) override { return Status::OK(); }
};

class AnalyticSourceDependency final : public Dependency {
public:
using SharedState = AnalyticSharedState;
AnalyticSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AnalyticSourceDependency", query_ctx) {}
~AnalyticSourceDependency() override = default;
};

class AnalyticSourceOperatorX;
class AnalyticLocalState final : public PipelineXLocalState<AnalyticSourceDependency> {
class AnalyticLocalState final : public PipelineXLocalState<AnalyticSharedState> {
public:
ENABLE_FACTORY_CREATOR(AnalyticLocalState);
AnalyticLocalState(RuntimeState* state, OperatorXBase* parent);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/assert_num_rows_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ class AssertNumRowsOperator final : public StreamingOperator<vectorized::VAssert
: StreamingOperator(operator_builder, node) {}
};

class AssertNumRowsLocalState final : public PipelineXLocalState<FakeDependency> {
class AssertNumRowsLocalState final : public PipelineXLocalState<FakeSharedState> {
public:
ENABLE_FACTORY_CREATOR(AssertNumRowsLocalState);

AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<FakeDependency>(state, parent) {}
: PipelineXLocalState<FakeSharedState>(state, parent) {}
~AssertNumRowsLocalState() = default;
};

Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ class DataQueue {
void set_source_block();

private:
friend class AggSourceDependency;
friend class UnionSourceDependency;
friend class AggSinkDependency;
friend class UnionSinkDependency;

std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace doris::pipeline {

DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: PipelineXLocalState<FakeDependency>(state, parent),
: PipelineXLocalState<FakeSharedState>(state, parent),
dummy_mapped_data(std::make_shared<char>('A')),
_agg_arena_pool(std::make_unique<vectorized::Arena>()),
_agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ namespace pipeline {

class DistinctStreamingAggOperatorX;

class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeDependency> {
class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeSharedState> {
public:
using Parent = DistinctStreamingAggOperatorX;
using Base = PipelineXLocalState<FakeDependency>;
using Base = PipelineXLocalState<FakeSharedState>;
ENABLE_FACTORY_CREATOR(DistinctStreamingAggLocalState);
DistinctStreamingAggLocalState(RuntimeState* state, OperatorXBase* parent);

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/empty_set_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ class EmptySetSourceOperator final : public SourceOperator<vectorized::VEmptySet
bool can_read() override { return true; }
};

class EmptySetLocalState final : public PipelineXLocalState<FakeDependency> {
class EmptySetLocalState final : public PipelineXLocalState<FakeSharedState> {
public:
ENABLE_FACTORY_CREATOR(EmptySetLocalState);

EmptySetLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<FakeDependency>(state, parent) {}
: PipelineXLocalState<FakeSharedState>(state, parent) {}
~EmptySetLocalState() = default;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/es_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status EsScanLocalState::_process_conjuncts() {
Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
_scan_dependency->set_ready();
_dependency->set_ready();
return Status::OK();
}

Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class TUniqueId;
using InstanceLoId = int64_t;

namespace pipeline {
class ExchangeSinkQueueDependency;
class Dependency;
} // namespace pipeline

Expand Down Expand Up @@ -211,7 +210,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_profile(RuntimeProfile* profile);

void set_dependency(std::shared_ptr<ExchangeSinkQueueDependency> queue_dependency,
void set_dependency(std::shared_ptr<Dependency> queue_dependency,
std::shared_ptr<Dependency> finish_dependency) {
_queue_dependency = queue_dependency;
_finish_dependency = finish_dependency;
Expand Down Expand Up @@ -272,7 +271,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {

std::atomic<int> _total_queue_size = 0;
static constexpr int QUEUE_CAPACITY_FACTOR = 64;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<Dependency> _queue_dependency;
std::shared_ptr<Dependency> _finish_dependency;
std::atomic<bool> _should_stop {false};
};
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf

register_channels(_sink_buffer.get());
auto* _exchange_sink_dependency = _dependency;
_queue_dependency = ExchangeSinkQueueDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
_queue_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true, state->get_query_ctx());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
Expand Down
62 changes: 23 additions & 39 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,44 +64,9 @@ class ExchangeSinkOperator final : public DataSinkOperator<vectorized::VDataStre
int _mult_cast_id = -1;
};

class ExchangeSinkQueueDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
ExchangeSinkQueueDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "ResultQueueDependency", true, query_ctx) {}
~ExchangeSinkQueueDependency() override = default;
};

/**
* We use this to control the execution for local exchange.
* +---------------+ +---------------+ +---------------+
* | ExchangeSink1 | | ExchangeSink2 | | ExchangeSink3 |
* +---------------+ +---------------+ +---------------+
* | | |
* | +----------------------------+----------------------------------+ |
* +----+------------------|------------------------------------------+ | |
* | | +------------------------|--------------------|------------+-----+
* Dependency 1-1 | Dependency 2-1 | Dependency 3-1 | Dependency 1-2 | Dependency 2-2 | Dependency 3-2 |
* +----------------------------------------------+ +----------------------------------------------+
* | queue1 queue2 queue3 | | queue1 queue2 queue3 |
* | LocalRecvr | | LocalRecvr |
* +----------------------------------------------+ +----------------------------------------------+
* +-----------------+ +------------------+
* | ExchangeSource1 | | ExchangeSource2 |
* +-----------------+ +------------------+
*/
class LocalExchangeChannelDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {}
~LocalExchangeChannelDependency() override = default;
// TODO(gabriel): blocked by memory
};

class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndSharedState> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;
using Base = PipelineXSinkLocalState<AndSharedState>;

public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
Expand Down Expand Up @@ -189,9 +154,28 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependenc

vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;

std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<Dependency> _queue_dependency;
std::shared_ptr<Dependency> _broadcast_dependency;
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;

/**
* We use this to control the execution for local exchange.
* +---------------+ +---------------+ +---------------+
* | ExchangeSink1 | | ExchangeSink2 | | ExchangeSink3 |
* +---------------+ +---------------+ +---------------+
* | | |
* | +----------------------------+----------------------------------+ |
* +----+------------------|------------------------------------------+ | |
* | | +------------------------|--------------------|------------+-----+
* Dependency 1-1 | Dependency 2-1 | Dependency 3-1 | Dependency 1-2 | Dependency 2-2 | Dependency 3-2 |
* +----------------------------------------------+ +----------------------------------------------+
* | queue1 queue2 queue3 | | queue1 queue2 queue3 |
* | LocalRecvr | | LocalRecvr |
* +----------------------------------------------+ +----------------------------------------------+
* +-----------------+ +------------------+
* | ExchangeSource1 | | ExchangeSource2 |
* +-----------------+ +------------------+
*/
std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
deps.resize(queues.size());
metrics.resize(queues.size());
for (size_t i = 0; i < queues.size(); i++) {
deps[i] = ExchangeDataDependency::create_shared(_parent->operator_id(), _parent->node_id(),
state->get_query_ctx(), queues[i]);
deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"SHUFFLE_DATA_DEPENDENCY", state->get_query_ctx());
queues[i]->set_dependency(deps[i]);
source_dependency->add_child(deps[i]);
}
Expand Down
26 changes: 3 additions & 23 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,12 @@ class ExchangeSourceOperator final : public SourceOperator<vectorized::VExchange
bool is_pending_finish() const override;
};

struct ExchangeDataDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
ExchangeDataDependency(int id, int node_id, QueryContext* query_ctx,
vectorized::VDataStreamRecvr::SenderQueue* sender_queue)
: Dependency(id, node_id, "DataDependency", query_ctx), _queue(sender_queue) {}

std::string debug_string(int indentation_level) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _is_cancelled = {}, _block_queue size = {},_num_remaining_senders = {}",
Dependency::debug_string(indentation_level), _queue->_is_cancelled,
_queue->_block_queue.size(), _queue->_num_remaining_senders);
return fmt::to_string(debug_string_buffer);
}

private:
vectorized::VDataStreamRecvr::SenderQueue* _queue;
};

class ExchangeSourceOperatorX;
class ExchangeLocalState final : public PipelineXLocalState<AndDependency> {
class ExchangeLocalState final : public PipelineXLocalState<AndSharedState> {
ENABLE_FACTORY_CREATOR(ExchangeLocalState);

public:
using Base = PipelineXLocalState<AndDependency>;
using Base = PipelineXLocalState<AndSharedState>;
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);

Status init(RuntimeState* state, LocalStateInfo& info) override;
Expand All @@ -87,7 +67,7 @@ class ExchangeLocalState final : public PipelineXLocalState<AndDependency> {
int64_t num_rows_skipped;
bool is_ready;

std::vector<std::shared_ptr<ExchangeDataDependency>> deps;
std::vector<std::shared_ptr<Dependency>> deps;

std::vector<RuntimeProfile::Counter*> metrics;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace doris::pipeline {
Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
_scan_dependency->set_ready();
_dependency->set_ready();
return Status::OK();
}

Expand Down
Loading

0 comments on commit 83ad7a6

Please sign in to comment.