Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: Support pipeline table scan in UT #6831

Merged
merged 19 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Int64 MockStorage::addTableDataForDeltaMerge(Context & context, const String & n

// write data to DeltaMergeStorage
ASTPtr insertptr(new ASTInsertQuery());
BlockOutputStreamPtr output = storage->write(insertptr, context.getSettingsRef());
BlockOutputStreamPtr output = storage->write(insertptr, context.getGlobalContext().getSettingsRef());
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

Block insert_block{columns};

Expand All @@ -141,7 +141,7 @@ Int64 MockStorage::addTableDataForDeltaMerge(Context & context, const String & n
return table_id;
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions)
std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> MockStorage::prepareForRead(Context & context, Int64 table_id)
{
assert(tableExistsForDeltaMerge(table_id));
auto storage = storage_delta_merge_map[table_id];
Expand All @@ -153,10 +153,18 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
column_names.push_back(column_info.first);

auto scan_context = std::make_shared<DM::ScanContext>();
QueryProcessingStage::Enum stage;

SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.keep_order = false;
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(context.getSettingsRef().resolve_locks, std::numeric_limits<UInt64>::max(), scan_context);
return {storage, column_names, query_info};
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions)
{
QueryProcessingStage::Enum stage;
auto [storage, column_names, query_info] = prepareForRead(context, table_id);
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
Expand All @@ -166,6 +174,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*filter_conditions, *analyzer);

BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams
// TODO: set num_streams, then ins.size() != 1
BlockInputStreamPtr in = ins[0];
Expand All @@ -183,6 +192,14 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
}
}


SourceOps MockStorage::getSourceOpsFromDeltaMerge(PipelineExecutorStatus & exec_status_, Context & context, Int64 table_id, size_t concurrency)
{
auto [storage, column_names, query_info] = prepareForRead(context, table_id);
// Currently don't support test for late materialization
return storage->readSourceOps(exec_status_, column_names, query_info, context, context.getSettingsRef().max_block_size, concurrency);
}

void MockStorage::addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns)
{
TableInfo table_info;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Operators/Operator.h>
#include <Storages/Transaction/TiDB.h>
#include <common/types.h>

Expand All @@ -26,7 +27,9 @@
namespace DB
{
class StorageDeltaMerge;
using StorageDeltaMergePtr = std::shared_ptr<StorageDeltaMerge>;
class Context;
struct SelectQueryInfo;

using MockColumnInfo = std::pair<String, TiDB::TP>;
using MockColumnInfoVec = std::vector<MockColumnInfo>;
Expand Down Expand Up @@ -82,7 +85,9 @@ class MockStorage

NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id);

std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> prepareForRead(Context & context, Int64 table_id);
BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions = nullptr);
SourceOps getSourceOpsFromDeltaMerge(PipelineExecutorStatus & exec_status_, Context & context, Int64 table_id, size_t concurrency = 1);

bool tableExistsForDeltaMerge(Int64 table_id);

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Interpreters/Context.h>
#include <Operators/UnorderedSourceOp.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/IManageableStorage.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ bool pushDownSelection(Context & context, const PhysicalPlanNodePtr & plan, cons
auto physical_table_scan = std::static_pointer_cast<PhysicalTableScan>(plan);
return physical_table_scan->setFilterConditions(executor_id, selection);
}
if (unlikely(plan->tp() == PlanType::MockTableScan && context.isExecutorTest()))
if (unlikely(plan->tp() == PlanType::MockTableScan && context.isExecutorTest() && !context.getSettingsRef().enable_pipeline))
{
auto physical_mock_table_scan = std::static_pointer_cast<PhysicalMockTableScan>(plan);
if (context.mockStorage()->useDeltaMerge() && context.mockStorage()->tableExistsForDeltaMerge(physical_mock_table_scan->getLogicalTableID()))
Expand Down
24 changes: 18 additions & 6 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,25 @@ void PhysicalMockTableScan::buildBlockInputStreamImpl(DAGPipeline & pipeline, Co
pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end());
}

void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/)
void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & context, size_t concurrency)
{
group_builder.init(mock_streams.size());
size_t i = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<BlockInputStreamSourceOp>(group_builder.exec_status, mock_streams[i++]));
});
if (context.mockStorage()->useDeltaMerge())
{
auto source_ops = context.mockStorage()->getSourceOpsFromDeltaMerge(group_builder.exec_status, context, table_id, concurrency);
group_builder.init(source_ops.size());
size_t i = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::move(source_ops[i++]));
});
}
else
{
group_builder.init(mock_streams.size());
size_t i = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<BlockInputStreamSourceOp>(group_builder.exec_status, mock_streams[i++]));
});
}
}

void PhysicalMockTableScan::finalize(const Names & parent_require)
Expand Down
40 changes: 40 additions & 0 deletions dbms/src/Flash/tests/gtest_executors_with_dm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class ExecutorsWithDMTestRunner : public DB::tests::ExecutorTest
{
ExecutorTest::initializeContext();
context.mockStorage()->setUseDeltaMerge(true);
context.context.getGlobalContext().getSettingsRef().dt_enable_read_thread = true;
context.context.getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = 1;
context.context.getGlobalContext().getSettingsRef().dt_segment_limit_rows = 1;
context.context.getGlobalContext().getSettingsRef().dt_segment_delta_cache_limit_rows = 1;
// note that
// 1. the first column is pk.
// 2. The decimal type is not supported.
Expand Down Expand Up @@ -62,6 +66,25 @@ class ExecutorsWithDMTestRunner : public DB::tests::ExecutorTest
toNullableVec<MyDate>("col7", col_mydate),
toNullableVec<MyDateTime>("col8", col_mydatetime),
toNullableVec<String>("col9", col_string)});

// with 200 rows.
std::vector<TypeTraits<Int64>::FieldType> key(200);
std::vector<std::optional<String>> value(200);
for (size_t i = 0; i < 200; ++i)
{
key[i] = i % 15;
value[i] = {fmt::format("val_{}", i)};
}
context.addMockDeltaMerge(
{"test_db", "big_table"},
{{"key", TiDB::TP::TypeLongLong},
{"value", TiDB::TP::TypeString}},
{toVec<Int64>("key", key), toNullableVec<String>("value", value)});

context.addMockDeltaMerge(
{"test_db", "empty_table"},
{{"col0", TiDB::TP::TypeLongLong}},
{toVec<Int32>("col0", {})});
}

ColumnWithInt64 col_id{1, 2, 3, 4, 5, 6, 7, 8, 9};
Expand Down Expand Up @@ -112,6 +135,23 @@ try
toNullableVec<MyDateTime>(col_mydatetime),
toNullableVec<String>(col_string)});

request = context
.scan("test_db", "big_table")
.build(context);
enablePlanner(false);
auto expect = executeStreams(request, 1);

executeAndAssertColumnsEqual(
request,
expect);

request = context
.scan("test_db", "empty_table")
.build(context);
executeAndAssertColumnsEqual(
request,
{});

// projection
request = context
.scan("test_db", "t1")
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Operators/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class SourceOp : public Operator
OperatorStatus awaitImpl() override { return OperatorStatus::HAS_OUTPUT; }
};
using SourceOpPtr = std::unique_ptr<SourceOp>;
using SourceOps = std::vector<SourceOpPtr>;

class TransformOp : public Operator
{
Expand Down
53 changes: 53 additions & 0 deletions dbms/src/Operators/UnorderedSourceOp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Operators/UnorderedSourceOp.h>

namespace DB
{
OperatorStatus UnorderedSourceOp::readImpl(Block & block)
{
auto await_status = awaitImpl();
if (await_status == OperatorStatus::HAS_OUTPUT)
{
std::swap(block, t_block.value());
t_block.reset();
if (action.transform(block))
{
return OperatorStatus::HAS_OUTPUT;
}
else
return OperatorStatus::FINISHED;
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
}
return await_status;
}

OperatorStatus UnorderedSourceOp::awaitImpl()
{
if (t_block.has_value())
return OperatorStatus::HAS_OUTPUT;
Block res;
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
if (!task_pool->tryPopBlock(res))
return OperatorStatus::WAITING;
if (res)
{
if (res.rows() == 0)
return OperatorStatus::WAITING;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
t_block = std::move(res);
return OperatorStatus::HAS_OUTPUT;
}
else
return OperatorStatus::FINISHED;
}
} // namespace DB
78 changes: 78 additions & 0 deletions dbms/src/Operators/UnorderedSourceOp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <DataStreams/SegmentReadTransformAction.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>

namespace DB
{
class UnorderedSourceOp : public SourceOp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding some comments here?

{
public:
UnorderedSourceOp(
PipelineExecutorStatus & exec_status_,
const DM::SegmentReadTaskPoolPtr & task_pool_,
const DM::ColumnDefines & columns_to_read_,
const int extra_table_id_index,
const TableID physical_table_id,
const String & req_id)
: SourceOp(exec_status_)
, task_pool(task_pool_)
, action(header, extra_table_id_index, physical_table_id)
, log(Logger::get(req_id))
, ref_no(0)
, task_pool_added(false)
{
setHeader(toEmptyBlock(columns_to_read_));
if (extra_table_id_index != InvalidColumnID)
{
const auto & extra_table_id_col_define = DM::getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value};
header.insert(extra_table_id_index, col);
}
ref_no = task_pool->increaseUnorderedInputStreamRefCount();
LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no);
addReadTaskPoolToScheduler();
}

String getName() const override
{
return "UnorderedSourceOp";
}

ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
OperatorStatus readImpl(Block & block) override;

OperatorStatus awaitImpl() override;

ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
void addReadTaskPoolToScheduler()
{
std::call_once(task_pool->addToSchedulerFlag(), [&]() { DM::SegmentReadTaskScheduler::instance().add(task_pool); });
task_pool_added = true;
}

private:
DM::SegmentReadTaskPoolPtr task_pool;
SegmentReadTransformAction action;
std::optional<Block> t_block;

const LoggerPtr log;
int64_t ref_no;
bool task_pool_added;
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace DB
Loading