Skip to content

Commit

Permalink
pipeline: Support pipeline model framework and simple operator (pingc…
Browse files Browse the repository at this point in the history
…ap#6520)

ref pingcap#6518

Signed-off-by: ywqzzy <592838129@qq.com>
  • Loading branch information
SeaRise authored and ywqzzy committed Feb 13, 2023
1 parent af9a930 commit d77dd16
Show file tree
Hide file tree
Showing 128 changed files with 6,356 additions and 302 deletions.
3 changes: 2 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 PingCAP, Ltd.
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,6 +62,7 @@ add_headers_and_sources(clickhouse_common_io src/IO)
add_headers_and_sources(dbms src/Analyzers)
add_headers_and_sources(dbms src/Core)
add_headers_and_sources(dbms src/DataStreams)
add_headers_and_sources(dbms src/Operators)
add_headers_and_sources(dbms src/DataTypes)
add_headers_and_sources(dbms src/Databases)
add_headers_and_sources(dbms src/Debug)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 PingCAP, Ltd.
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@ add_subdirectory (Columns)
add_subdirectory (Common)
add_subdirectory (Core)
add_subdirectory (DataStreams)
add_subdirectory (Operators)
add_subdirectory (DataTypes)
add_subdirectory (Dictionaries)
add_subdirectory (Storages)
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,7 @@ LimitBlockInputStream::LimitBlockInputStream(
size_t limit_,
const String & req_id)
: log(Logger::get(req_id))
, limit_transform_action(input->getHeader(), limit_)

, action(input->getHeader(), limit_)
{
children.push_back(input);
}
Expand All @@ -35,7 +34,7 @@ Block LimitBlockInputStream::readImpl()
{
Block res = children.back()->read();

if (limit_transform_action.transform(res))
if (action.transform(res))
{
return res;
}
Expand All @@ -47,6 +46,6 @@ Block LimitBlockInputStream::readImpl()

void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit_transform_action.getLimit());
buffer.fmtAppend(", limit = {}", action.getLimit());
}
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream

private:
LoggerPtr log;
LimitTransformAction limit_transform_action;
LocalLimitTransformAction action;
};

} // namespace DB
68 changes: 37 additions & 31 deletions dbms/src/DataStreams/LimitTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,51 +17,57 @@

namespace DB
{
LimitTransformAction::LimitTransformAction(
const Block & header_,
size_t limit_)
: header(header_)
, limit(limit_)
namespace
{
}

Block LimitTransformAction::getHeader() const
// Removes all rows outside of specified range of Block.
void cut(Block & block, size_t rows, size_t limit, size_t pos)
{
return header;
assert(rows + limit > pos);
size_t pop_back_cnt = pos - limit;
for (auto & col : block)
{
auto mutate_col = (*std::move(col.column)).mutate();
mutate_col->popBack(pop_back_cnt);
col.column = std::move(mutate_col);
}
}
} // namespace

size_t LimitTransformAction::getLimit() const
bool LocalLimitTransformAction::transform(Block & block)
{
return limit;
if (unlikely(!block))
return true;

/// pos - how many lines were read, including the last read block
if (pos >= limit)
return false;

auto rows = block.rows();
pos += rows;
if (pos > limit)
cut(block, rows, limit, pos);
// for pos <= limit, give away the whole block
return true;
}

bool LimitTransformAction::transform(Block & block)
bool GlobalLimitTransformAction::transform(Block & block)
{
if (unlikely(!block))
return true;

/// pos - how many lines were read, including the last read block
if (pos >= limit)
{
return false;
}

auto rows = block.rows();
pos += rows;
if (pos >= rows && pos <= limit)
{
// give away the whole block
return true;
}
else
{
// pos > limit
// give away a piece of the block
assert(rows + limit > pos);
size_t length = rows + limit - pos;
for (size_t i = 0; i < block.columns(); ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->cut(0, length);
return true;
}
size_t prev_pos = pos.fetch_add(rows);
if (prev_pos >= limit)
return false;

size_t cur_pos = prev_pos + rows;
if (cur_pos > limit)
cut(block, rows, limit, cur_pos);
// for pos <= limit, give away the whole block
return true;
}
} // namespace DB
48 changes: 40 additions & 8 deletions dbms/src/DataStreams/LimitTransformAction.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,22 +15,54 @@

#include <Core/Block.h>

#include <atomic>
#include <memory>

namespace DB
{
struct LimitTransformAction
struct LocalLimitTransformAction
{
public:
LimitTransformAction(
LocalLimitTransformAction(
const Block & header_,
size_t limit_);
size_t limit_)
: header(header_)
, limit(limit_)
{
}

bool transform(Block & block);
Block getHeader() const;
size_t getLimit() const;

Block getHeader() const { return header; }
size_t getLimit() const { return limit; }

private:
Block header;
size_t limit;
const Block header;
const size_t limit;
size_t pos = 0;
};

struct GlobalLimitTransformAction
{
public:
GlobalLimitTransformAction(
const Block & header_,
size_t limit_)
: header(header_)
, limit(limit_)
{
}

bool transform(Block & block);

Block getHeader() const { return header; }
size_t getLimit() const { return limit; }

private:
const Block header;
const size_t limit;
std::atomic_size_t pos{0};
};

using GlobalLimitPtr = std::shared_ptr<GlobalLimitTransformAction>;
} // namespace DB
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
16 changes: 14 additions & 2 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 PingCAP, Ltd.
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,14 @@ add_headers_and_sources(flash_service .)
add_headers_and_sources(flash_service ./Coprocessor)
add_headers_and_sources(flash_service ./Mpp)
add_headers_and_sources(flash_service ./Executor)
add_headers_and_sources(flash_service ./Pipeline)
add_headers_and_sources(flash_service ./Pipeline/Exec)
add_headers_and_sources(flash_service ./Pipeline/Schedule)
add_headers_and_sources(flash_service ./Pipeline/Schedule/Events)
add_headers_and_sources(flash_service ./Pipeline/Schedule/Tasks)
add_headers_and_sources(flash_service ./Pipeline/Schedule/TaskQueues)
add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/plans)
add_headers_and_sources(flash_service ./Planner/Plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)

Expand All @@ -28,6 +34,12 @@ target_link_libraries(flash_service dbms)

if (ENABLE_TESTS)
add_subdirectory(Coprocessor/tests)
add_subdirectory(Executor/tests)
add_subdirectory(Pipeline/tests)
add_subdirectory(Pipeline/Exec/tests)
add_subdirectory(Pipeline/Schedule/tests)
add_subdirectory(Pipeline/Schedule/Events/tests)
add_subdirectory(Pipeline/Schedule/TaskQueues/tests)
add_subdirectory(Planner/tests)
add_subdirectory(tests)
endif ()
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,7 +92,7 @@ try
auto start_time = Clock::now();
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(context, internal);
BlockIO streams = executeAsBlockIO(context, internal);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Executor/DataStreamExecutor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,7 @@ void DataStreamExecutor::cancel()
p_stream->cancel(/*kill=*/false);
}

String DataStreamExecutor::dump() const
String DataStreamExecutor::toString() const
{
FmtBuffer fb;
data_stream->dumpTree(fb);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Executor/DataStreamExecutor.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,7 @@ class DataStreamExecutor : public QueryExecutor
assert(data_stream);
}

String dump() const override;
String toString() const override;

void cancel() override;

Expand Down
43 changes: 43 additions & 0 deletions dbms/src/Flash/Executor/ExecutionResult.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <common/types.h>

namespace DB
{
struct ExecutionResult
{
bool is_success;
String err_msg;

void verify()
{
RUNTIME_CHECK(is_success, err_msg);
}

static ExecutionResult success()
{
return {true, ""};
}

static ExecutionResult fail(const String & err_msg)
{
RUNTIME_CHECK(!err_msg.empty());
return {false, err_msg};
}
};
} // namespace DB
Loading

0 comments on commit d77dd16

Please sign in to comment.