Skip to content

Commit

Permalink
Another way of getting codec flag
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Aug 5, 2019
1 parent 08b7142 commit bc25942
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 69 deletions.
37 changes: 17 additions & 20 deletions dbms/src/DataStreams/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@

#include <DataStreams/DAGBlockOutputStream.h>

#include <DataTypes/DataTypeNullable.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>


namespace DB
{

namespace ErrorCodes
{
extern const int UNSUPPORTED_PARAMETER;
}

struct TypeMapping;
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

DAGBlockOutputStream::DAGBlockOutputStream(
tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_, Block header_)
: dag_response(dag_response_), records_per_chunk(records_per_chunk_), encodeType(encodeType_), header(header_)
DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
FieldTpAndFlags && field_tp_and_flags_, Block header_)
: dag_response(dag_response_),
records_per_chunk(records_per_chunk_),
encodeType(encodeType_),
field_tp_and_flags(field_tp_and_flags_),
header(header_)
{
if (encodeType == tipb::EncodeType::TypeArrow)
{
Expand All @@ -43,9 +44,13 @@ void DAGBlockOutputStream::writeSuffix()
}
}


void DAGBlockOutputStream::write(const Block & block)
{
if (block.columns() != field_tp_and_flags.size())
throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR);

// TODO: Check compatibility between field_tp_and_flags and block column types.

// Encode data to chunk
size_t rows = block.rows();
for (size_t i = 0; i < rows; i++)
Expand All @@ -63,17 +68,9 @@ void DAGBlockOutputStream::write(const Block & block)
}
for (size_t j = 0; j < block.columns(); j++)
{
// TODO: No need to encode column id?
auto field = (*block.getByPosition(j).column.get())[i];
const DataTypePtr & data_type = block.getByPosition(j).type;
if (data_type->isNullable())
{
const DataTypePtr nested = dynamic_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
EncodeDatum(field, getCodecFlagByDataType(nested), current_ss);
}
else
{
EncodeDatum(field, getCodecFlagByDataType(block.getByPosition(j).type), current_ss);
}
EncodeDatum(field, field_tp_and_flags[j].getCodecFlag(), current_ss);
}
// Encode current row
records_per_chunk++;
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/DataStreams/DAGBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/DAGQuerySource.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
Expand All @@ -17,7 +18,8 @@ namespace DB
class DAGBlockOutputStream : public IBlockOutputStream
{
public:
DAGBlockOutputStream(tipb::SelectResponse & response, Int64 records_per_chunk, tipb::EncodeType encodeType, Block header);
DAGBlockOutputStream(tipb::SelectResponse & response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
FieldTpAndFlags && field_tp_and_flags_, Block header_);

Block getHeader() const override { return header; }
void write(const Block & block) override;
Expand All @@ -29,6 +31,8 @@ class DAGBlockOutputStream : public IBlockOutputStream

Int64 records_per_chunk;
tipb::EncodeType encodeType;
FieldTpAndFlags field_tp_and_flags;

Block header;

tipb::Chunk * current_chunk;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ void DAGDriver::execute()
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR);

BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(
dag_response, context.getSettings().dag_records_per_chunk, dag_request.encode_type(), streams.in->getHeader());
BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(dag_response, context.getSettings().dag_records_per_chunk,
dag_request.encode_type(), dag.getOutputFieldTpAndFlags(), streams.in->getHeader());
copyData(*streams.in, *outputStreamPtr);
}

Expand Down
23 changes: 21 additions & 2 deletions dbms/src/Interpreters/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@

#include <Interpreters/DAGQuerySource.h>

#include <Interpreters/InterpreterDAG.h>
#include <Parsers/ASTSelectQuery.h>


namespace DB
{

Expand Down Expand Up @@ -68,4 +67,24 @@ std::unique_ptr<IInterpreter> DAGQuerySource::interpreter(Context &, QueryProces
{
return std::make_unique<InterpreterDAG>(context, *this);
}

FieldTpAndFlags DAGQuerySource::getOutputFieldTpAndFlags() const
{
FieldTpAndFlags output;

const auto & ts = getTS();
const auto & column_infos = ts.columns();
for (auto i : dag_request.output_offsets())
{
// TODO: Checking bound.
auto & column_info = column_infos[i];
output.emplace_back(FieldTpAndFlag{static_cast<TiDB::TP>(column_info.tp()), static_cast<UInt32>(column_info.flag())});
}

// TODO: Add aggregation columns.
// We either write our own code to infer types that follows the convention between TiDB and TiKV, or ask TiDB to push down aggregation field types.

return output;
}

} // namespace DB
63 changes: 42 additions & 21 deletions dbms/src/Interpreters/DAGQuerySource.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
#pragma once

#include <Interpreters/IQuerySource.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <Interpreters/IQuerySource.h>
#include <Storages/Transaction/Types.h>

namespace DB
{

class Context;

/// A handy struct to get codec flag based on tp and flag.
struct FieldTpAndFlag
{
TiDB::TP tp;
UInt32 flag;

TiDB::CodecFlag getCodecFlag() const
{
TiDB::ColumnInfo ci;
ci.tp = tp;
ci.flag = flag;
return ci.getCodecFlag();
}
};
using FieldTpAndFlags = std::vector<FieldTpAndFlag>;

/// Query source of a DAG request via gRPC.
/// This is also an IR of a DAG.
class DAGQuerySource : public IQuerySource
Expand All @@ -31,49 +47,54 @@ class DAGQuerySource : public IQuerySource
String str(size_t max_query_size) override;
std::unique_ptr<IInterpreter> interpreter(Context & context, QueryProcessingStage::Enum stage) override;

void assertValid(Int32 index, const String & name)
{
if (index < 0 || index > dag_request.executors_size())
{
throw Exception("Access invalid executor: " + name);
}
}

RegionID getRegionID() const { return region_id; }
UInt64 getRegionVersion() const { return region_version; }
UInt64 getRegionConfVersion() const { return region_conf_version; }

bool hasSelection() { return sel_index != -1; };
bool hasAggregation() { return agg_index != -1; };
bool hasTopN() { return order_index != -1; };
bool hasLimit() { return order_index == -1 && limit_index != -1; };
bool hasSelection() const { return sel_index != -1; };
bool hasAggregation() const { return agg_index != -1; };
bool hasTopN() const { return order_index != -1; };
bool hasLimit() const { return order_index == -1 && limit_index != -1; };

const tipb::TableScan & getTS()
const tipb::TableScan & getTS() const
{
assertValid(ts_index, TS_NAME);
return dag_request.executors(ts_index).tbl_scan();
};
const tipb::Selection & getSelection()
const tipb::Selection & getSelection() const
{
assertValid(sel_index, SEL_NAME);
return dag_request.executors(sel_index).selection();
};
const tipb::Aggregation & getAggregation()
const tipb::Aggregation & getAggregation() const
{
assertValid(agg_index, AGG_NAME);
return dag_request.executors(agg_index).aggregation();
};
const tipb::TopN & getTopN()
const tipb::TopN & getTopN() const
{
assertValid(order_index, TOPN_NAME);
return dag_request.executors(order_index).topn();
};
const tipb::Limit & getLimit()
const tipb::Limit & getLimit() const
{
assertValid(limit_index, LIMIT_NAME);
return dag_request.executors(limit_index).limit();
};
const tipb::DAGRequest & getDAGRequest() { return dag_request; };
const tipb::DAGRequest & getDAGRequest() const { return dag_request; };

/// Used to guide output stream to encode data, as we lost DAG field type during input streams.
/// This will somewhat duplicate the planning logic, but we don't have a decent way to keep this information.
FieldTpAndFlags getOutputFieldTpAndFlags() const;

protected:
void assertValid(Int32 index, const String & name) const
{
if (index < 0 || index > dag_request.executors_size())
{
throw Exception("Access invalid executor: " + name);
}
}

protected:
Context & context;
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Interpreters/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
auto & tmt_ctx = context.getTMTContext();
auto storage = tmt_ctx.getStorages().get(id);
if (storage == nullptr)
{
tmt_ctx.getSchemaSyncer()->syncSchema(id, context, false);
storage = tmt_ctx.getStorages().get(id);
}
if (storage == nullptr)
{
return false;
}
Expand Down Expand Up @@ -116,7 +111,8 @@ bool isProject(const tipb::Executor &)
// currently, project is not pushed so always return false
return false;
}
DAGStringConverter::DAGStringConverter(Context & context_, const tipb::DAGRequest & dag_request_) : context(context_), dag_request(dag_request_)
DAGStringConverter::DAGStringConverter(Context & context_, const tipb::DAGRequest & dag_request_)
: context(context_), dag_request(dag_request_)
{
afterAgg = false;
}
Expand Down
31 changes: 16 additions & 15 deletions dbms/src/Interpreters/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <Interpreters/InterpreterDAG.h>

#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/ConcatBlockInputStream.h>
Expand All @@ -11,14 +13,12 @@
#include <Interpreters/Aggregator.h>
#include <Interpreters/DAGExpressionAnalyzer.h>
#include <Interpreters/DAGUtils.h>
#include <Interpreters/InterpreterDAG.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>

namespace DB
Expand All @@ -32,7 +32,7 @@ extern const int SCHEMA_VERSION_ERROR;
extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

InterpreterDAG::InterpreterDAG(Context & context_, DAGQuerySource & dag_)
InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_), dag(dag_), log(&Logger::get("InterpreterDAG"))
{}

Expand Down Expand Up @@ -119,6 +119,9 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
query_info.mvcc_query_info->regions_query_info.push_back(info);
query_info.mvcc_query_info->concurrent = 0.0;
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);

pipeline.transform([&](auto & stream) { stream->addTableLock(table_lock); });

/// Set the limits and quota for reading data, the speed and time of the query.
{
IProfilingBlockInputStream::LocalLimits limits;
Expand Down Expand Up @@ -157,10 +160,10 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
AnalysisResult res;
ExpressionActionsChain chain;
res.need_aggregate = dag.hasAggregation();
DAGExpressionAnalyzer expressionAnalyzer(source_columns, context);
DAGExpressionAnalyzer analyzer(source_columns, context);
if (dag.hasSelection())
{
if (expressionAnalyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name))
if (analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name))
{
res.has_where = true;
res.before_where = chain.getLastActions();
Expand All @@ -170,24 +173,23 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
}
if (res.need_aggregate)
{
res.need_aggregate
= expressionAnalyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
res.need_aggregate = analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
res.before_aggregation = chain.getLastActions();

chain.finalize();
chain.clear();

// add cast if type is not match
expressionAnalyzer.appendAggSelect(chain, dag.getAggregation());
analyzer.appendAggSelect(chain, dag.getAggregation());
//todo use output_offset to pruner the final project columns
for (auto element : expressionAnalyzer.getCurrentInputColumns())
for (auto element : analyzer.getCurrentInputColumns())
{
final_project.emplace_back(element.name, "");
}
}
if (dag.hasTopN())
{
res.has_order_by = expressionAnalyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
res.has_order_by = analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
}
// append final project results
for (auto & name : final_project)
Expand All @@ -201,16 +203,15 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
return res;
}

void InterpreterDAG::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column)
void InterpreterDAG::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expr, String & filter_column)
{
pipeline.transform(
[&](auto & stream) { stream = std::make_shared<FilterBlockInputStream>(stream, expressionActionsPtr, filter_column); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<FilterBlockInputStream>(stream, expr, filter_column); });
}

void InterpreterDAG::executeAggregation(
Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, Names & key_names, AggregateDescriptions & aggregates)
Pipeline & pipeline, const ExpressionActionsPtr & expr, Names & key_names, AggregateDescriptions & aggregates)
{
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, expressionActionsPtr); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, expr); });

Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Context;
class InterpreterDAG : public IInterpreter
{
public:
InterpreterDAG(Context & context_, DAGQuerySource & dag_);
InterpreterDAG(Context & context_, const DAGQuerySource & dag_);

~InterpreterDAG() = default;

Expand Down Expand Up @@ -85,7 +85,7 @@ class InterpreterDAG : public IInterpreter
private:
Context & context;

DAGQuerySource & dag;
const DAGQuerySource & dag;

NamesWithAliases final_project;
NamesAndTypesList source_columns;
Expand Down

0 comments on commit bc25942

Please sign in to comment.