Skip to content

Scheme correction for indexes usage #1180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/hash/calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace NKikimr::NArrow::NHash {

void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) const {
void TXX64::AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NArrow::NHash::NXX64::TStreamStringHashCalcer& hashCalcer) {
NArrow::SwitchType(array->type_id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using T = typename TWrap::T;
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/formats/arrow/hash/calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ class TXX64 {
ui64 Seed = 0;
const std::vector<TString> ColumnNames;
const ENoColumnPolicy NoColumnPolicy;
void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer) const;

std::vector<std::shared_ptr<arrow::Array>> GetColumns(const std::shared_ptr<arrow::RecordBatch>& batch) const;

public:
TXX64(const std::vector<TString>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);
TXX64(const std::vector<std::string>& columnNames, const ENoColumnPolicy noColumnPolicy, const ui64 seed = 0);

static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const;


};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "upsert_index.h"
#include <util/string/type.h>
#include <library/cpp/json/json_reader.h>

namespace NKikimr::NKqp {

TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) {
{
auto fValue = features.Extract("NAME");
if (!fValue) {
return TConclusionStatus::Fail("can't find alter parameter NAME");
}
IndexName = *fValue;
}
TString indexType;
{
auto fValue = features.Extract("TYPE");
if (!fValue) {
return TConclusionStatus::Fail("can't find alter parameter TYPE");
}
indexType = *fValue;
}
{
auto fValue = features.Extract("FEATURES");
if (!fValue) {
return TConclusionStatus::Fail("can't find alter parameter FEATURES");
}
if (!IndexMetaConstructor.Initialize(indexType)) {
return TConclusionStatus::Fail("can't initialize index meta object for type \"" + indexType + "\"");
}
NJson::TJsonValue jsonData;
if (!NJson::ReadJsonFastTree(*fValue, &jsonData)) {
return TConclusionStatus::Fail("incorrect json in request FEATURES parameter");
}
auto result = IndexMetaConstructor->DeserializeFromJson(jsonData);
if (result.IsFail()) {
return result;
}
}
return TConclusionStatus::Success();
}

void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* indexProto = schemaData.AddUpsertIndexes();
indexProto->SetName(IndexName);
IndexMetaConstructor.SerializeToProto(*indexProto);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h>

namespace NKikimr::NKqp {

class TUpsertIndexOperation : public ITableStoreOperation {
private:
static TString GetTypeName() {
return "UPSERT_INDEX";
}

static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
private:
TString IndexName;
NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;

void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const override;
};

}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SRCS(
GLOBAL add_column.cpp
GLOBAL alter_column.cpp
GLOBAL drop_column.cpp
GLOBAL upsert_index.cpp
)

PEERDIR(
Expand Down
37 changes: 37 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,40 @@ message TOlapColumnDescription {
optional TDictionaryEncodingSettings DictionaryEncoding = 9;
}

message TRequestedBloomFilter {
optional double FalsePositiveProbability = 1 [default = 0.1];
repeated string ColumnNames = 3;
}

message TOlapIndexRequested {
optional string Name = 1;
optional TCompressionOptions Compression = 3;

optional string ClassName = 2;
oneof Implementation {
TRequestedBloomFilter BloomFilter = 40;
}
}

message TBloomFilter {
optional double FalsePositiveProbability = 1 [default = 0.1];
optional uint64 MaxBytesCount = 2 [default = 8196];
repeated uint32 ColumnIds = 3;
}

message TOlapIndexDescription {
// This id is auto-generated by schemeshard
optional uint32 Id = 1;

optional string Name = 2;
optional TCompressionOptions Compression = 3;

optional string ClassName = 4;
oneof Implementation {
TBloomFilter BloomFilter = 40;
}
}

enum EColumnTableEngine {
COLUMN_ENGINE_NONE = 0;
COLUMN_ENGINE_REPLACING_TIMESERIES = 1;
Expand Down Expand Up @@ -456,13 +490,16 @@ message TColumnTableSchema {
optional TCompressionOptions DefaultCompression = 8;

optional bool CompositeMarks = 9 [ default = false ];
repeated TOlapIndexDescription Indexes = 10;
}

message TAlterColumnTableSchema {
repeated TOlapColumnDescription AddColumns = 1;
//optional TCompressionOptions DefaultCompression = 5;
repeated TOlapColumnDescription DropColumns = 6;
repeated TOlapColumnDiff AlterColumns = 7;
repeated TOlapIndexRequested UpsertIndexes = 8;
repeated string DropIndexes = 9;
}

// Schema presets are used to manage multiple tables with the same schema
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/indexes/abstract.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/formats/arrow/hash/xx_hash.h>
#include <ydb/core/formats/arrow/hash/calcer.h>

namespace NKikimr::NOlap::NIndexes {

} // namespace NKikimr::NOlap::NIndexes
147 changes: 147 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/indexes/abstract.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#pragma once

#include <ydb/core/tx/columnshard/splitter/chunks.h>
#include <ydb/core/tx/program/program.h>

#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <library/cpp/object_factory/object_factory.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <util/generic/string.h>

#include <memory>
#include <vector>

namespace NKikimr::NOlap {
struct TIndexInfo;
}

namespace NKikimr::NSchemeShard {
class TOlapSchema;
class IErrorCollector;
}

namespace NKikimr::NOlap::NIndexes {

class IIndexChecker {
protected:
virtual bool DoCheck(std::vector<TString>&& blobs) const = 0;
public:
virtual ~IIndexChecker() = default;
bool Check(std::vector<TString>&& blobs) const {
return DoCheck(std::move(blobs));
}
};

class TIndexCheckerContainer {
private:
YDB_READONLY(ui32, IndexId, 0);
YDB_READONLY_DEF(std::shared_ptr<IIndexChecker>, Object);
public:
TIndexCheckerContainer(const ui32 indexId, const std::shared_ptr<IIndexChecker>& object)
: IndexId(indexId)
, Object(object) {
AFL_VERIFY(IndexId);
AFL_VERIFY(Object);
}

const IIndexChecker* operator->() const {
return Object.get();
}
};

class IIndexMeta {
protected:
virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const = 0;
virtual std::shared_ptr<IIndexChecker> DoBuildIndexChecker(const TProgramContainer& program) const = 0;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0;

public:
using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
using TProto = NKikimrSchemeOp::TOlapIndexDescription;

virtual ~IIndexMeta() = default;

std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const {
return DoBuildIndex(indexId, data, indexInfo);
}

std::shared_ptr<IIndexChecker> BuildIndexChecker(const TProgramContainer& program) const {
return DoBuildIndexChecker(program);
}

bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
return DoDeserializeFromProto(proto);
}

void SerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const {
return DoSerializeToProto(proto);
}

virtual TString GetClassName() const = 0;
};

class IIndexMetaConstructor {
protected:
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const = 0;
virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) = 0;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const = 0;
public:
using TFactory = NObjectFactory::TObjectFactory<IIndexMetaConstructor, TString>;
using TProto = NKikimrSchemeOp::TOlapIndexRequested;

virtual ~IIndexMetaConstructor() = default;

TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
return DoDeserializeFromJson(jsonInfo);
}

std::shared_ptr<IIndexMeta> CreateIndexMeta(const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
return DoCreateIndexMeta(currentSchema, errors);
}

TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
return DoDeserializeFromProto(proto);
}

void SerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
return DoSerializeToProto(proto);
}

virtual TString GetClassName() const = 0;
};

class TIndexMetaContainer: public NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IIndexMeta>;
YDB_READONLY(ui32, IndexId, 0);
public:
TIndexMetaContainer() = default;
TIndexMetaContainer(const ui32 indexId, const std::shared_ptr<IIndexMeta>& object)
: TBase(object)
, IndexId(indexId)
{
AFL_VERIFY(IndexId);
AFL_VERIFY(Object);
}

bool DeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) {
if (!TBase::DeserializeFromProto(proto)) {
return false;
}
IndexId = proto.GetId();
return true;
}

std::optional<TIndexCheckerContainer> BuildIndexChecker(const TProgramContainer& program) const {
auto checker = GetObjectPtr()->BuildIndexChecker(program);
if (!checker) {
return {};
}
return TIndexCheckerContainer(IndexId, checker);
}
};

} // namespace NKikimr::NOlap::NIndexes
Loading