Skip to content

accessors actualization #9857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion ydb/core/formats/arrow/accessor/abstract/constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,23 @@ class IConstructor {
virtual TString DoDebugString() const {
return "";
}
virtual bool DoIsEqualWithSameTypeTo(const IConstructor& item) const = 0;
virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0;

public:
virtual ~IConstructor() = default;

std::shared_ptr<arrow::RecordBatch> Construct(
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
AFL_VERIFY(columnData);
return DoConstruct(columnData, externalInfo);
}

bool IsEqualWithSameTypeTo(const IConstructor& item) const {
return DoIsEqualWithSameTypeTo(item);
}

TString DebugString() const {
return TStringBuilder() << GetClassName() << ":" << DoDebugString();
}
Expand Down Expand Up @@ -65,10 +78,27 @@ class IConstructor {
class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>;

public:
using TBase::TBase;

bool IsEqualTo(const TConstructorContainer& item) const {
if (!GetObjectPtr() && !item.GetObjectPtr()) {
return true;
} else if (!!GetObjectPtr() && !!item.GetObjectPtr()) {
if (GetObjectPtr()->GetClassName() != item.GetObjectPtr()->GetClassName()) {
return false;
}
return GetObjectPtr()->IsEqualWithSameTypeTo(*item.GetObjectPtr());
} else {
return false;
}
}

std::shared_ptr<arrow::RecordBatch> Construct(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const {
AFL_VERIFY(!!GetObjectPtr());
return GetObjectPtr()->Construct(batch, externalInfo);
}

static TConstructorContainer GetDefaultConstructor();
};

Expand Down
11 changes: 11 additions & 0 deletions ydb/core/formats/arrow/accessor/plain/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
#include "constructor.h"

#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
#include <ydb/library/formats/arrow/arrow_helpers.h>
#include <ydb/library/formats/arrow/simple_arrays_cache.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h>

namespace NKikimr::NArrow::NAccessor::NPlain {

Expand All @@ -30,4 +33,12 @@ std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shar
return std::make_shared<arrow::Schema>(arrow::FieldVector({ resultColumn }));
}

std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
auto chunked = columnData->GetChunkedArray();
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) }));
auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount());
return NArrow::ToBatch(table, true);
}

} // namespace NKikimr::NArrow::NAccessor::NPlain
7 changes: 7 additions & 0 deletions ydb/core/formats/arrow/accessor/plain/constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ class TConstructor: public IConstructor {

private:
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());

virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
return true;
}

virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override;
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/formats/arrow/accessor/sparsed/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class TSparsedArray: public IChunkedArray {
return chunk.GetScalar(index - chunk.GetStartPosition());
}

std::shared_ptr<arrow::RecordBatch> GetRecordBatchVerified() const {
AFL_VERIFY(Records.size() == 1)("size", Records.size());
return Records.front().GetRecords();
}

const TSparsedArrayChunk& GetSparsedChunk(const ui64 position) const {
const auto pred = [](const ui64 position, const TSparsedArrayChunk& item) {
return position < item.GetStartPosition();
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/formats/arrow/accessor/sparsed/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons
return true;
}

std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
NArrow::NAccessor::TSparsedArray sparsed(*columnData, externalInfo.GetDefaultValue());
return sparsed.GetRecordBatchVerified();
}

} // namespace NKikimr::NArrow::NAccessor::NSparsed
7 changes: 7 additions & 0 deletions ydb/core/formats/arrow/accessor/sparsed/constructor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ class TConstructor: public IConstructor {

private:
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());

virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
return true;
}
virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override;

virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;
Expand Down
21 changes: 20 additions & 1 deletion ydb/core/formats/arrow/save_load/loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ std::shared_ptr<arrow::RecordBatch> TColumnLoader::ApplyRawVerified(const TStrin
return TStatusValidator::GetValid(Apply(data));
}

TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount) const {
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type());
}

std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const {
auto data = TStatusValidator::GetValid(Apply(dataStr));
return BuildAccessor(data, TChunkConstructionData(recordsCount, DefaultValue, ResultField->type()));
return BuildAccessor(data, BuildAccessorContext(recordsCount));
}

std::shared_ptr<IChunkedArray> TColumnLoader::BuildAccessor(
Expand All @@ -65,4 +69,19 @@ std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TColumnLoader::BuildD
return AccessorConstructor->ConstructDefault(TChunkConstructionData(recordsCount, DefaultValue, ResultField->type())).DetachResult();
}

bool TColumnLoader::IsEqualTo(const TColumnLoader& item) const {
if (!!Transformer != !!item.Transformer) {
return false;
} else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
return false;
}
if (!Serializer.IsEqualTo(item.Serializer)) {
return false;
}
if (!AccessorConstructor.IsEqualTo(item.AccessorConstructor)) {
return false;
}
return true;
}

} // namespace NKikimr::NArrow::NAccessor
13 changes: 2 additions & 11 deletions ydb/core/formats/arrow/save_load/loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,7 @@ class TColumnLoader {
public:
std::shared_ptr<IChunkedArray> BuildDefaultAccessor(const ui32 recordsCount) const;

bool IsEqualTo(const TColumnLoader& item) const {
if (!!Transformer != !!item.Transformer) {
return false;
} else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
return false;
}
if (!Serializer.IsEqualTo(item.Serializer)) {
return false;
}
return true;
}
bool IsEqualTo(const TColumnLoader& item) const;

TString DebugString() const;

Expand All @@ -49,6 +39,7 @@ class TColumnLoader {

const std::shared_ptr<arrow::Field>& GetField() const;

TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const;
std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const;
std::shared_ptr<arrow::RecordBatch> ApplyRawVerified(const TString& data) const;
};
Expand Down
55 changes: 48 additions & 7 deletions ydb/core/kqp/ut/olap/sparsed_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/wrappers/fake_storage.h>
#include <ydb/core/tx/columnshard/blobs_action/common/const.h>

namespace NKikimr::NKqp {

Expand All @@ -21,13 +22,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
const TString StoreName;
ui32 MultiColumnRepCount = 100;
static const ui32 SKIP_GROUPS = 7;
const TVector<TString> FIELD_NAMES{"utf", "int", "uint", "float", "double"};
const TVector<TString> FIELD_NAMES{ "utf", "int", "uint", "float", "double" };
public:
TSparsedDataTest(const TString& storeName)
: Kikimr(Settings)
, CSController(NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NYDBTest::NColumnShard::TController>())
, StoreName(storeName)
{
, StoreName(storeName) {

}

Expand Down Expand Up @@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {

Fill(&counts[0], &counts[FIELD_NAMES.size() * groupsCount], 0);

for (auto& row: rows) {
for (auto& row : rows) {
auto incCounts = [&](ui32 i, const TString& column) {
if (*NYdb::TValueParser(row.at(column)).GetOptionalBool()) {
counts[i]++;
Expand All @@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
incCounts(ind++, "def_float" + grStr);
incCounts(ind++, "def_double" + grStr);
}
}
}
}

void CheckAllFieldsTable(bool firstCall, ui32 countExpectation, ui32* defCountStart) {
Expand Down Expand Up @@ -169,7 +169,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
NArrow::NConstruction::TStringPoolFiller sPool(1000, 52, "abcde", frq);
helper.FillTable(sPool, shiftKff, 10000);
},
[&](bool firstCall) {
[&](bool firstCall) {
CheckTable("field", "'abcde'", firstCall, countExpectation, defCountStart);
});
}
Expand All @@ -181,7 +181,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
TTypedLocalHelper helper("Utf8", Kikimr);
helper.FillMultiColumnTable(MultiColumnRepCount, shiftKff, 10000);
},
[&](bool firstCall) {
[&](bool firstCall) {
CheckAllFieldsTable(firstCall, countExpectation, defCountStart);
});
}
Expand Down Expand Up @@ -302,6 +302,47 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
TSparsedDataTest test("");
test.Execute();
}

Y_UNIT_TEST(AccessorActualization) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper helper(kikimr);
helper.SetOptionalStorageId(NOlap::NBlobOperations::TGlobal::DefaultStorageId);
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

auto session = tableClient.CreateSession().GetValueSync().GetSession();
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
csController->WaitIndexation(TDuration::Seconds(3));

{
auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=uid, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SPARSED`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
csController->WaitActualization(TDuration::Seconds(5));
{
auto it = tableClient.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT count(uid) FROM `/Root/olapStore/olapTable`
)").GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
Cerr << StreamResultToYson(it) << Endl;
}
}

}

} // namespace
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/column/info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf
}
std::vector<std::shared_ptr<IPortionDataChunk>> result;
for (auto&& s : source) {
auto data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData());
std::shared_ptr<arrow::RecordBatch> data;
if (!DataAccessorConstructor.IsEqualTo(sourceColumnFeatures.DataAccessorConstructor)) {
auto chunkedArray = sourceColumnFeatures.Loader->ApplyVerified(s->GetData(), s->GetRecordsCountVerified());
data = DataAccessorConstructor.Construct(chunkedArray, Loader->BuildAccessorContext(s->GetRecordsCountVerified()));
} else {
data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData());
}
result.emplace_back(s->CopyWithAnotherBlob(GetColumnSaver().Apply(data), *this));
}
return result;
Expand Down
Loading