Skip to content

KIKIMR-20877: internal indexes processing #1276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions ydb/core/formats/arrow/hash/calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,25 @@

namespace NKikimr::NArrow::NHash {

void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer) {
AFL_VERIFY(scalar);
NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using T = typename TWrap::T;
using TScalar = typename arrow::TypeTraits<T>::ScalarType;

auto& typedScalar = static_cast<const TScalar&>(*scalar);
if constexpr (arrow::has_string_view<T>()) {
hashCalcer.Update((const ui8*)typedScalar.value->data(), typedScalar.value->size());
} else if constexpr (arrow::has_c_type<T>()) {
hashCalcer.Update((const ui8*)(typedScalar.data()), sizeof(T));
} else {
static_assert(arrow::is_decimal_type<T>());
}
return true;
});
}

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
Expand All @@ -21,12 +40,7 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int ro
if constexpr (arrow::has_string_view<T>()) {
hashCalcer.Update((const ui8*)value.data(), value.size());
} else if constexpr (arrow::has_c_type<T>()) {
if constexpr (arrow::is_physical_integer_type<T>()) {
hashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
} else {
// Do not use bool or floats for sharding
static_assert(arrow::is_boolean_type<T>() || arrow::is_floating_type<T>());
}
hashCalcer.Update(reinterpret_cast<const ui8*>(&value), sizeof(value));
} else {
static_assert(arrow::is_decimal_type<T>());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/hash/calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class TXX64 {
TXX64(const std::vector<std::string>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);

static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const;
};
Expand Down
29 changes: 28 additions & 1 deletion ydb/core/formats/arrow/program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ enum class AggFunctionId {
AGG_MAX = 4,
AGG_SUM = 5,
};
struct GroupByOptions : public arrow::compute::ScalarAggregateOptions {
struct GroupByOptions: public arrow::compute::ScalarAggregateOptions {
struct Assign {
AggFunctionId function = AggFunctionId::AGG_UNSPECIFIED;
std::string result_column;
Expand All @@ -43,6 +43,7 @@ struct GroupByOptions : public arrow::compute::ScalarAggregateOptions {
#include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
#include <ydb/library/yql/core/arrow_kernels/request/request.h>

namespace NKikimr::NSsa {

Expand Down Expand Up @@ -605,6 +606,32 @@ IStepFunction<TAssign>::TPtr TAssign::GetFunction(arrow::compute::ExecContext* c
return std::make_shared<TSimpleFunction>(ctx);
}

TString TAssign::DebugString() const {
TStringBuilder sb;
sb << "{";
if (Operation != EOperation::Unspecified) {
sb << "op=" << Operation << ";";
}
if (YqlOperationId) {
sb << "yql_op=" << (NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId << ";";
}
if (Arguments.size()) {
sb << "arguments=[";
for (auto&& i : Arguments) {
sb << i.DebugString() << ";";
}
sb << "];";
}
if (Constant) {
sb << "const=" << Constant->ToString() << ";";
}
if (KernelFunction) {
sb << "kernel=" << KernelFunction->name() << ";";
}
sb << "column=" << Column.DebugString() << ";";
sb << "}";
return sb;
}

IStepFunction<TAggregateAssign>::TPtr TAggregateAssign::GetFunction(arrow::compute::ExecContext* ctx) const {
if (KernelFunction) {
Expand Down
35 changes: 23 additions & 12 deletions ydb/core/formats/arrow/program.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class IStepFunction {
};

class TAssign {
private:
YDB_ACCESSOR_DEF(std::optional<ui32>, YqlOperationId);
public:
using TOperationType = EOperation;

Expand Down Expand Up @@ -237,12 +239,7 @@ class TAssign {
const arrow::compute::FunctionOptions* GetOptions() const { return FuncOpts.get(); }

IStepFunction<TAssign>::TPtr GetFunction(arrow::compute::ExecContext* ctx) const;
TString DebugString() const {
return TStringBuilder() <<
"{op=" << Operation << ";column=" << Column.DebugString() << ";" << (Constant ? "const=" + Constant->ToString() + ";" : "NO;")
<< (KernelFunction ? ("kernel=" + KernelFunction->name() + ";") : "NO;")
<< "}";
}
TString DebugString() const;
private:
const TColumnInfo Column;
EOperation Operation{EOperation::Unspecified};
Expand Down Expand Up @@ -325,13 +322,26 @@ class TProgramStep {
TString DebugString() const {
TStringBuilder sb;
sb << "{";
sb << "assignes=[";
for (auto&& i : Assignes) {
sb << i.DebugString() << ";";
if (Assignes.size()) {
sb << "assignes=[";
for (auto&& i : Assignes) {
sb << i.DebugString() << ";";
}
sb << "];";
}
if (Filters.size()) {
sb << "filters=[";
for (auto&& i : Filters) {
sb << i.DebugString() << ";";
}
sb << "];";
}
if (GroupBy.size()) {
sb << "group_by_count=" << GroupBy.size() << "; ";
}
if (GroupByKeys.size()) {
sb << "group_by_keys_count=" << GroupByKeys.size() << ";";
}
sb << "];";
sb << "group_by_count = " << GroupBy.size() << "; ";
sb << "group_by_keys_count=" << GroupByKeys.size() << ";";

sb << "projections=[";
for (auto&& i : Projection) {
Expand Down Expand Up @@ -396,6 +406,7 @@ class TProgramStep {
};

struct TProgram {
public:
std::vector<std::shared_ptr<TProgramStep>> Steps;
THashMap<ui32, TColumnInfo> SourceColumns;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PEERDIR(
ydb/library/binary_json
ydb/library/dynumber
ydb/library/services
ydb/library/yql/core/arrow_kernels/request
)

IF (OS_WINDOWS)
Expand Down
31 changes: 28 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/tx/datashard/range_ops.h>
#include <ydb/core/tx/program/program.h>
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h>
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>

#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
Expand Down Expand Up @@ -45,8 +48,7 @@ TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpRead
}


std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task)
{
std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task) {
const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
std::vector<std::shared_ptr<arrow::Field>> columns;
std::vector<std::shared_ptr<arrow::Array>> data;
Expand Down Expand Up @@ -917,11 +919,34 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto

if (tableInfo->TableKind == ETableKind::Olap) {
auto* olapProgram = protoTaskMeta.MutableOlapProgram();
auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task);

olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program);

auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task);
olapProgram->SetParametersSchema(schema);
olapProgram->SetParameters(parameters);

if (!!stageInfo.Meta.ColumnTableInfoPtr) {
std::shared_ptr<NSchemeShard::TOlapSchema> olapSchema = std::make_shared<NSchemeShard::TOlapSchema>();
olapSchema->ParseFromLocalDB(stageInfo.Meta.ColumnTableInfoPtr->Description.GetSchema());
if (olapSchema->GetIndexes().GetIndexes().size()) {
NOlap::TProgramContainer container;
NOlap::TSchemaResolverColumnsOnly resolver(olapSchema);
TString error;
YQL_ENSURE(container.Init(resolver, *olapProgram, error), "" << error);
auto data = NOlap::NIndexes::NRequest::TDataForIndexesCheckers::Build(container);
if (data) {
for (auto&& [indexId, i] : olapSchema->GetIndexes().GetIndexes()) {
AFL_VERIFY(!!i.GetIndexMeta());
i.GetIndexMeta()->FillIndexCheckers(data, *olapSchema);
}
auto checker = data->GetCoverChecker();
if (!!checker) {
checker.SerializeToProto(*olapProgram->MutableIndexChecker());
}
}
}
}
} else {
YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h>
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>

namespace NKikimr::NKqp {

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ TTypedColumn CompileYqlKernelBinaryOperation(const TKqpOlapFilterBinaryOp& opera
}

const auto kernel = ctx.AddYqlKernelBinaryFunc(op, *leftColumn.Type, *rightColumn.Type, type);
cmpFunc->SetYqlOperationId((ui32)op);
cmpFunc->SetFunctionType(TProgram::YQL_KERNEL);
cmpFunc->SetKernelIdx(kernel.first);
cmpFunc->AddArguments()->SetId(leftColumn.Id);
Expand Down Expand Up @@ -706,8 +707,10 @@ const TTypedColumn BuildLogicalProgram(const TExprNode::TChildrenType& args, con
const auto idx = ctx.GetKernelRequestBuilder().AddBinaryOp(function, block, block, block);
logicalFunc->SetKernelIdx(idx);
logicalFunc->SetFunctionType(TProgram::YQL_KERNEL);
logicalFunc->SetYqlOperationId((ui32)function);
} else {
logicalFunc->SetFunctionType(function);
logicalFunc->SetId((ui32)function);
}

return {logicalOp->GetColumn().GetId(), block};
Expand Down
Loading