Skip to content

Commit

Permalink
Pipeline: Support pipeline table scan fullstack part 1 (#7225)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
ywqzzy authored Apr 20, 2023
1 parent 3ef3154 commit 68412d1
Show file tree
Hide file tree
Showing 48 changed files with 1,425 additions and 270 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
67 changes: 67 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceHolderTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/GeneratedColumnPlaceHolderTransformAction.h>

namespace DB
{
GeneratedColumnPlaceHolderTransformAction::GeneratedColumnPlaceHolderTransformAction(
const Block & header_,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_)
: generated_column_infos(generated_column_infos_)
{
header = header_;
insertColumns(header, false);
}

Block GeneratedColumnPlaceHolderTransformAction::getHeader() const
{
return header;
}

void GeneratedColumnPlaceHolderTransformAction::checkColumn() const
{
RUNTIME_CHECK(!generated_column_infos.empty());
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]));
}
}

void GeneratedColumnPlaceHolderTransformAction::insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

void GeneratedColumnPlaceHolderTransformAction::transform(Block & block)
{
insertColumns(block, true);
}

} // namespace DB
40 changes: 40 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceHolderTransformAction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <Core/Block.h>

namespace DB
{
class GeneratedColumnPlaceHolderTransformAction
{
public:
GeneratedColumnPlaceHolderTransformAction(
const Block & header_,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_);

void transform(Block & block);

Block getHeader() const;

void checkColumn() const;

private:
void insertColumns(Block & block, bool insert_data) const;

private:
Block header;
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};
} // namespace DB
38 changes: 7 additions & 31 deletions dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/GeneratedColumnPlaceHolderTransformAction.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
Expand All @@ -32,18 +33,17 @@ class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputSt
const BlockInputStreamPtr & input,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_,
const String & req_id_)
: generated_column_infos(generated_column_infos_)
: action(input->getHeader(), generated_column_infos_)
, log(Logger::get(req_id_))
{
children.push_back(input);
}

String getName() const override { return NAME; }

Block getHeader() const override
{
Block block = children.back()->getHeader();
insertColumns(block, /*insert_data=*/false);
return block;
return action.getHeader();
}

static String getColumnName(UInt64 col_index)
Expand All @@ -54,43 +54,19 @@ class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputSt
protected:
void readPrefix() override
{
RUNTIME_CHECK(!generated_column_infos.empty());
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]));
}
action.checkColumn();
}

Block readImpl() override
{
Block block = children.back()->read();
insertColumns(block, /*insert_data=*/true);
action.transform(block);
return block;
}

private:
void insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

static constexpr auto NAME = "GeneratedColumnPlaceholder";
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
GeneratedColumnPlaceHolderTransformAction action;
const LoggerPtr log;
};

Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Debug/MockExecutor/TableScanBinder.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,12 +88,13 @@ void TableScanBinder::buildTable(tipb::Executor * tipb_executor)
tipb_executor->set_tp(tipb::ExecType::TypeTableScan);
tipb_executor->set_executor_id(name);
auto * ts = tipb_executor->mutable_tbl_scan();
ts->set_keep_order(keep_order);
ts->set_table_id(table_info.id);
for (const auto & info : output_schema)
setTipbColumnInfo(ts->add_columns(), info);
}

ExecutorBinderPtr compileTableScan(size_t & executor_index, TableInfo & table_info, const String & db, const String & table_name, bool append_pk_column)
ExecutorBinderPtr compileTableScan(size_t & executor_index, TableInfo & table_info, const String & db, const String & table_name, bool append_pk_column, bool keep_order)
{
DAGSchema ts_output;
for (const auto & column_info : table_info.columns)
Expand Down Expand Up @@ -121,6 +122,6 @@ ExecutorBinderPtr compileTableScan(size_t & executor_index, TableInfo & table_in
ts_output.emplace_back(std::make_pair(MutableSupport::tidb_pk_column_name, std::move(ci)));
}

return std::make_shared<mock::TableScanBinder>(executor_index, ts_output, table_info);
return std::make_shared<mock::TableScanBinder>(executor_index, ts_output, table_info, keep_order);
}
} // namespace DB::mock
12 changes: 9 additions & 3 deletions dbms/src/Debug/MockExecutor/TableScanBinder.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,9 +23,14 @@ using TableInfo = TiDB::TableInfo;
class TableScanBinder : public ExecutorBinder
{
public:
TableScanBinder(size_t & index_, const DAGSchema & output_schema_, const TableInfo & table_info_)
TableScanBinder(
size_t & index_,
const DAGSchema & output_schema_,
const TableInfo & table_info_,
bool keep_order_)
: ExecutorBinder(index_, "table_scan_" + std::to_string(index_), output_schema_)
, table_info(table_info_)
, keep_order(keep_order_)
{}

void columnPrune(std::unordered_set<String> & used_columns) override;
Expand All @@ -40,12 +45,13 @@ class TableScanBinder : public ExecutorBinder

private:
TableInfo table_info; /// used by column pruner
bool keep_order;

private:
void setTipbColumnInfo(tipb::ColumnInfo * ci, const DAGColumnInfo & dag_column_info) const;
void buildPartionTable(tipb::Executor * tipb_executor);
void buildTable(tipb::Executor * tipb_executor);
};

ExecutorBinderPtr compileTableScan(size_t & executor_index, TableInfo & table_info, const String & db, const String & table_name, bool append_pk_column);
ExecutorBinderPtr compileTableScan(size_t & executor_index, TableInfo & table_info, const String & db, const String & table_name, bool append_pk_column, bool keep_order = false);
} // namespace DB::mock
28 changes: 20 additions & 8 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
Expand Down Expand Up @@ -141,7 +142,7 @@ Int64 MockStorage::addTableDataForDeltaMerge(Context & context, const String & n
return table_id;
}

std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> MockStorage::prepareForRead(Context & context, Int64 table_id)
std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> MockStorage::prepareForRead(Context & context, Int64 table_id, bool keep_order)
{
assert(tableExistsForDeltaMerge(table_id));
auto storage = storage_delta_merge_map[table_id];
Expand All @@ -156,15 +157,15 @@ std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> MockStorage::prepareFor
auto scan_context = std::make_shared<DM::ScanContext>();
SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.keep_order = false;
query_info.keep_order = keep_order;
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(context.getSettingsRef().resolve_locks, std::numeric_limits<UInt64>::max(), scan_context);
return {storage, column_names, query_info};
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions)
BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions, bool keep_order)
{
QueryProcessingStage::Enum stage;
auto [storage, column_names, query_info] = prepareForRead(context, table_id);
auto [storage, column_names, query_info] = prepareForRead(context, table_id, keep_order);
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
Expand Down Expand Up @@ -193,11 +194,22 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
}


SourceOps MockStorage::getSourceOpsFromDeltaMerge(PipelineExecutorStatus & exec_status_, Context & context, Int64 table_id, size_t concurrency)
SourceOps MockStorage::getSourceOpsFromDeltaMerge(
PipelineExecutorStatus & exec_status_,
Context & context,
Int64 table_id,
size_t concurrency,
bool keep_order)
{
auto [storage, column_names, query_info] = prepareForRead(context, table_id);
auto [storage, column_names, query_info] = prepareForRead(context, table_id, keep_order);
// Currently don't support test for late materialization
return storage->readSourceOps(exec_status_, column_names, query_info, context, context.getSettingsRef().max_block_size, concurrency);
return storage->readSourceOps(
exec_status_,
column_names,
query_info,
context,
context.getSettingsRef().max_block_size,
concurrency);
}

void MockStorage::addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns)
Expand Down
22 changes: 18 additions & 4 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,9 +90,23 @@ class MockStorage

NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id);

std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> prepareForRead(Context & context, Int64 table_id);
BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions = nullptr);
SourceOps getSourceOpsFromDeltaMerge(PipelineExecutorStatus & exec_status_, Context & context, Int64 table_id, size_t concurrency = 1);
std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> prepareForRead(
Context & context,
Int64 table_id,
bool keep_order = false);

BlockInputStreamPtr getStreamFromDeltaMerge(
Context & context,
Int64 table_id,
const FilterConditions * filter_conditions = nullptr,
bool keep_order = false);

SourceOps getSourceOpsFromDeltaMerge(
PipelineExecutorStatus & exec_status_,
Context & context,
Int64 table_id,
size_t concurrency = 1,
bool keep_order = false);

bool tableExistsForDeltaMerge(Int64 table_id);

Expand Down
Loading

0 comments on commit 68412d1

Please sign in to comment.