Skip to content

Commit e8a4121

Browse files
accessors actualization (#9857)
1 parent 06c6c66 commit e8a4121

File tree

10 files changed

+144
-21
lines changed

10 files changed

+144
-21
lines changed

ydb/core/formats/arrow/accessor/abstract/constructor.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,23 @@ class IConstructor {
2525
virtual TString DoDebugString() const {
2626
return "";
2727
}
28+
virtual bool DoIsEqualWithSameTypeTo(const IConstructor& item) const = 0;
29+
virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
30+
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0;
2831

2932
public:
3033
virtual ~IConstructor() = default;
3134

35+
std::shared_ptr<arrow::RecordBatch> Construct(
36+
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
37+
AFL_VERIFY(columnData);
38+
return DoConstruct(columnData, externalInfo);
39+
}
40+
41+
bool IsEqualWithSameTypeTo(const IConstructor& item) const {
42+
return DoIsEqualWithSameTypeTo(item);
43+
}
44+
3245
TString DebugString() const {
3346
return TStringBuilder() << GetClassName() << ":" << DoDebugString();
3447
}
@@ -65,10 +78,27 @@ class IConstructor {
6578
class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> {
6679
private:
6780
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>;
68-
6981
public:
7082
using TBase::TBase;
7183

84+
bool IsEqualTo(const TConstructorContainer& item) const {
85+
if (!GetObjectPtr() && !item.GetObjectPtr()) {
86+
return true;
87+
} else if (!!GetObjectPtr() && !!item.GetObjectPtr()) {
88+
if (GetObjectPtr()->GetClassName() != item.GetObjectPtr()->GetClassName()) {
89+
return false;
90+
}
91+
return GetObjectPtr()->IsEqualWithSameTypeTo(*item.GetObjectPtr());
92+
} else {
93+
return false;
94+
}
95+
}
96+
97+
std::shared_ptr<arrow::RecordBatch> Construct(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const {
98+
AFL_VERIFY(!!GetObjectPtr());
99+
return GetObjectPtr()->Construct(batch, externalInfo);
100+
}
101+
72102
static TConstructorContainer GetDefaultConstructor();
73103
};
74104

ydb/core/formats/arrow/accessor/plain/constructor.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
#include "constructor.h"
33

44
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
5+
#include <ydb/library/formats/arrow/arrow_helpers.h>
56
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
7+
68
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
9+
#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h>
710

811
namespace NKikimr::NArrow::NAccessor::NPlain {
912

@@ -30,4 +33,12 @@ std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shar
3033
return std::make_shared<arrow::Schema>(arrow::FieldVector({ resultColumn }));
3134
}
3235

36+
std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
37+
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
38+
auto chunked = columnData->GetChunkedArray();
39+
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) }));
40+
auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount());
41+
return NArrow::ToBatch(table, true);
42+
}
43+
3344
} // namespace NKikimr::NArrow::NAccessor::NPlain

ydb/core/formats/arrow/accessor/plain/constructor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ class TConstructor: public IConstructor {
1212

1313
private:
1414
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());
15+
16+
virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
17+
return true;
18+
}
19+
20+
virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
21+
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override;
1522
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
1623
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
1724
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;

ydb/core/formats/arrow/accessor/sparsed/accessor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ class TSparsedArray: public IChunkedArray {
153153
return chunk.GetScalar(index - chunk.GetStartPosition());
154154
}
155155

156+
std::shared_ptr<arrow::RecordBatch> GetRecordBatchVerified() const {
157+
AFL_VERIFY(Records.size() == 1)("size", Records.size());
158+
return Records.front().GetRecords();
159+
}
160+
156161
const TSparsedArrayChunk& GetSparsedChunk(const ui64 position) const {
157162
const auto pred = [](const ui64 position, const TSparsedArrayChunk& item) {
158163
return position < item.GetStartPosition();

ydb/core/formats/arrow/accessor/sparsed/constructor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,10 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons
3131
return true;
3232
}
3333

34+
std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
35+
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
36+
NArrow::NAccessor::TSparsedArray sparsed(*columnData, externalInfo.GetDefaultValue());
37+
return sparsed.GetRecordBatchVerified();
38+
}
39+
3440
} // namespace NKikimr::NArrow::NAccessor::NSparsed

ydb/core/formats/arrow/accessor/sparsed/constructor.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@ class TConstructor: public IConstructor {
1212

1313
private:
1414
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());
15+
16+
virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
17+
return true;
18+
}
19+
virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
20+
const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override;
21+
1522
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
1623
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
1724
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;

ydb/core/formats/arrow/save_load/loader.cpp

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ std::shared_ptr<arrow::RecordBatch> TColumnLoader::ApplyRawVerified(const TStrin
5151
return TStatusValidator::GetValid(Apply(data));
5252
}
5353

54+
TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount) const {
55+
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type());
56+
}
57+
5458
std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const {
5559
auto data = TStatusValidator::GetValid(Apply(dataStr));
56-
return BuildAccessor(data, TChunkConstructionData(recordsCount, DefaultValue, ResultField->type()));
60+
return BuildAccessor(data, BuildAccessorContext(recordsCount));
5761
}
5862

5963
std::shared_ptr<IChunkedArray> TColumnLoader::BuildAccessor(
@@ -65,4 +69,19 @@ std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TColumnLoader::BuildD
6569
return AccessorConstructor->ConstructDefault(TChunkConstructionData(recordsCount, DefaultValue, ResultField->type())).DetachResult();
6670
}
6771

72+
bool TColumnLoader::IsEqualTo(const TColumnLoader& item) const {
73+
if (!!Transformer != !!item.Transformer) {
74+
return false;
75+
} else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
76+
return false;
77+
}
78+
if (!Serializer.IsEqualTo(item.Serializer)) {
79+
return false;
80+
}
81+
if (!AccessorConstructor.IsEqualTo(item.AccessorConstructor)) {
82+
return false;
83+
}
84+
return true;
85+
}
86+
6887
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/save_load/loader.h

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,7 @@ class TColumnLoader {
2525
public:
2626
std::shared_ptr<IChunkedArray> BuildDefaultAccessor(const ui32 recordsCount) const;
2727

28-
bool IsEqualTo(const TColumnLoader& item) const {
29-
if (!!Transformer != !!item.Transformer) {
30-
return false;
31-
} else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
32-
return false;
33-
}
34-
if (!Serializer.IsEqualTo(item.Serializer)) {
35-
return false;
36-
}
37-
return true;
38-
}
28+
bool IsEqualTo(const TColumnLoader& item) const;
3929

4030
TString DebugString() const;
4131

@@ -49,6 +39,7 @@ class TColumnLoader {
4939

5040
const std::shared_ptr<arrow::Field>& GetField() const;
5141

42+
TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const;
5243
std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const;
5344
std::shared_ptr<arrow::RecordBatch> ApplyRawVerified(const TString& data) const;
5445
};

ydb/core/kqp/ut/olap/sparsed_ut.cpp

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
99
#include <ydb/core/base/tablet_pipecache.h>
1010
#include <ydb/core/wrappers/fake_storage.h>
11+
#include <ydb/core/tx/columnshard/blobs_action/common/const.h>
1112

1213
namespace NKikimr::NKqp {
1314

@@ -21,13 +22,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
2122
const TString StoreName;
2223
ui32 MultiColumnRepCount = 100;
2324
static const ui32 SKIP_GROUPS = 7;
24-
const TVector<TString> FIELD_NAMES{"utf", "int", "uint", "float", "double"};
25+
const TVector<TString> FIELD_NAMES{ "utf", "int", "uint", "float", "double" };
2526
public:
2627
TSparsedDataTest(const TString& storeName)
2728
: Kikimr(Settings)
2829
, CSController(NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NYDBTest::NColumnShard::TController>())
29-
, StoreName(storeName)
30-
{
30+
, StoreName(storeName) {
3131

3232
}
3333

@@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
7979

8080
Fill(&counts[0], &counts[FIELD_NAMES.size() * groupsCount], 0);
8181

82-
for (auto& row: rows) {
82+
for (auto& row : rows) {
8383
auto incCounts = [&](ui32 i, const TString& column) {
8484
if (*NYdb::TValueParser(row.at(column)).GetOptionalBool()) {
8585
counts[i]++;
@@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
9494
incCounts(ind++, "def_float" + grStr);
9595
incCounts(ind++, "def_double" + grStr);
9696
}
97-
}
97+
}
9898
}
9999

100100
void CheckAllFieldsTable(bool firstCall, ui32 countExpectation, ui32* defCountStart) {
@@ -169,7 +169,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
169169
NArrow::NConstruction::TStringPoolFiller sPool(1000, 52, "abcde", frq);
170170
helper.FillTable(sPool, shiftKff, 10000);
171171
},
172-
[&](bool firstCall) {
172+
[&](bool firstCall) {
173173
CheckTable("field", "'abcde'", firstCall, countExpectation, defCountStart);
174174
});
175175
}
@@ -181,7 +181,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
181181
TTypedLocalHelper helper("Utf8", Kikimr);
182182
helper.FillMultiColumnTable(MultiColumnRepCount, shiftKff, 10000);
183183
},
184-
[&](bool firstCall) {
184+
[&](bool firstCall) {
185185
CheckAllFieldsTable(firstCall, countExpectation, defCountStart);
186186
});
187187
}
@@ -302,6 +302,47 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
302302
TSparsedDataTest test("");
303303
test.Execute();
304304
}
305+
306+
Y_UNIT_TEST(AccessorActualization) {
307+
auto settings = TKikimrSettings().SetWithSampleTables(false);
308+
TKikimrRunner kikimr(settings);
309+
310+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
311+
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
312+
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
313+
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
314+
315+
TLocalHelper helper(kikimr);
316+
helper.SetOptionalStorageId(NOlap::NBlobOperations::TGlobal::DefaultStorageId);
317+
helper.CreateTestOlapTable();
318+
auto tableClient = kikimr.GetTableClient();
319+
320+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
321+
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
322+
323+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
324+
csController->WaitIndexation(TDuration::Seconds(3));
325+
326+
{
327+
auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=uid, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SPARSED`)").GetValueSync();
328+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
329+
}
330+
331+
{
332+
auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`)").GetValueSync();
333+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
334+
}
335+
csController->WaitActualization(TDuration::Seconds(5));
336+
{
337+
auto it = tableClient.StreamExecuteScanQuery(R"(
338+
--!syntax_v1
339+
SELECT count(uid) FROM `/Root/olapStore/olapTable`
340+
)").GetValueSync();
341+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
342+
Cerr << StreamResultToYson(it) << Endl;
343+
}
344+
}
345+
305346
}
306347

307348
} // namespace

ydb/core/tx/columnshard/engines/scheme/column/info.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,13 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf
9191
}
9292
std::vector<std::shared_ptr<IPortionDataChunk>> result;
9393
for (auto&& s : source) {
94-
auto data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData());
94+
std::shared_ptr<arrow::RecordBatch> data;
95+
if (!DataAccessorConstructor.IsEqualTo(sourceColumnFeatures.DataAccessorConstructor)) {
96+
auto chunkedArray = sourceColumnFeatures.Loader->ApplyVerified(s->GetData(), s->GetRecordsCountVerified());
97+
data = DataAccessorConstructor.Construct(chunkedArray, Loader->BuildAccessorContext(s->GetRecordsCountVerified()));
98+
} else {
99+
data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData());
100+
}
95101
result.emplace_back(s->CopyWithAnotherBlob(GetColumnSaver().Apply(data), *this));
96102
}
97103
return result;

0 commit comments

Comments
 (0)