Skip to content

Commit 36a4d60

Browse files
Merge d9555c0 into a837a20
2 parents a837a20 + d9555c0 commit 36a4d60

File tree

17 files changed

+91
-59
lines changed

17 files changed

+91
-59
lines changed

ydb/core/formats/arrow/serializer/abstract.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ NKikimr::TConclusionStatus TSerializerContainer::DeserializeFromRequest(NYql::TF
2121
return TBase::GetObjectPtr()->DeserializeFromRequest(features);
2222
}
2323

24-
std::shared_ptr<NKikimr::NArrow::NSerialization::ISerializer> TSerializerContainer::GetDefaultSerializer() {
24+
std::shared_ptr<ISerializer> TSerializerContainer::GetDefaultSerializer() {
2525
return std::make_shared<TNativeSerializer>();
2626
}
27+
std::shared_ptr<ISerializer> TSerializerContainer::GetFastestSerializer() {
28+
return std::make_shared<TNativeSerializer>(arrow::Compression::UNCOMPRESSED);
29+
}
30+
2731

2832
}

ydb/core/formats/arrow/serializer/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class TSerializerContainer: public NBackgroundTasks::TInterfaceProtoContainer<IS
146146
using TBase::DeserializeFromProto;
147147

148148
static std::shared_ptr<ISerializer> GetDefaultSerializer();
149+
static std::shared_ptr<ISerializer> GetFastestSerializer();
149150

150151
TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto);
151152

ydb/core/formats/arrow/size_calcer.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,15 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
242242
}
243243

244244
NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
245-
std::optional<TString> specialKeys;
245+
std::optional<TString> specialKeysPayload;
246+
std::optional<TString> specialKeysFull;
246247
if (context.GetFieldsForSpecialKeys().size()) {
247-
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
248+
TFirstLastSpecialKeys specialKeys(batch, context.GetFieldsForSpecialKeys());
249+
specialKeysPayload = specialKeys.SerializePayloadToString();
250+
specialKeysFull = specialKeys.SerializeFullToString();
248251
}
249-
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
250-
NArrow::GetBatchDataSize(batch), specialKeys);
252+
return TSerializedBatch(NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
253+
NArrow::GetBatchDataSize(batch), specialKeysPayload, specialKeysFull);
251254
}
252255

253256
TConclusionStatus TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR) {
@@ -291,7 +294,7 @@ TConclusion<std::vector<TSerializedBatch>> TSerializedBatch::BuildWithLimit(std:
291294
}
292295

293296
TString TSerializedBatch::DebugString() const {
294-
return TStringBuilder() << "(data_size=" << Data.size() << ";schema_data_size=" << SchemaData.size() << ";rows_count=" << RowsCount << ";raw_bytes=" << RawBytes << ";)";
297+
return TStringBuilder() << "(data_size=" << Data.size() << ";rows_count=" << RowsCount << ";raw_bytes=" << RawBytes << ";)";
295298
}
296299

297300
}

ydb/core/formats/arrow/size_calcer.h

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,29 @@ class TBatchSplitttingContext {
7070

7171
class TSerializedBatch {
7272
private:
73-
YDB_READONLY_DEF(TString, SchemaData);
7473
YDB_READONLY_DEF(TString, Data);
7574
YDB_READONLY(ui32, RowsCount, 0);
7675
YDB_READONLY(ui32, RawBytes, 0);
77-
std::optional<TString> SpecialKeys;
76+
std::optional<TString> SpecialKeysFull;
77+
std::optional<TString> SpecialKeysPayload;
78+
7879
public:
7980
size_t GetSize() const {
8081
return Data.size();
8182
}
8283

83-
const TString& GetSpecialKeysSafe() const {
84-
AFL_VERIFY(SpecialKeys);
85-
return *SpecialKeys;
84+
const TString& GetSpecialKeysPayloadSafe() const {
85+
AFL_VERIFY(SpecialKeysPayload);
86+
return *SpecialKeysPayload;
87+
}
88+
89+
const TString& GetSpecialKeysFullSafe() const {
90+
AFL_VERIFY(SpecialKeysFull);
91+
return *SpecialKeysFull;
8692
}
8793

8894
bool HasSpecialKeys() const {
89-
return !!SpecialKeys;
95+
return !!SpecialKeysFull;
9096
}
9197

9298
TString DebugString() const;
@@ -95,14 +101,14 @@ class TSerializedBatch {
95101
static TConclusionStatus BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR);
96102
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);
97103

98-
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TString>& specialKeys)
99-
: SchemaData(schemaData)
100-
, Data(data)
104+
TSerializedBatch(TString&& data, const ui32 rowsCount, const ui32 rawBytes,
105+
const std::optional<TString>& specialKeysPayload, const std::optional<TString>& specialKeysFull)
106+
: Data(data)
101107
, RowsCount(rowsCount)
102108
, RawBytes(rawBytes)
103-
, SpecialKeys(specialKeys)
104-
{
105-
109+
, SpecialKeysFull(specialKeysFull)
110+
, SpecialKeysPayload(specialKeysPayload) {
111+
AFL_VERIFY(!!SpecialKeysPayload == !!SpecialKeysFull);
106112
}
107113
};
108114

ydb/core/formats/arrow/special_keys.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ NKikimr::NArrow::TReplaceKey TSpecialKeys::GetKeyByIndex(const ui32 position, co
2727
}
2828
}
2929

30-
TString TSpecialKeys::SerializeToString() const {
31-
return NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->SerializeFull(Data);
30+
TString TSpecialKeys::SerializePayloadToString() const {
31+
return NArrow::NSerialization::TSerializerContainer::GetFastestSerializer()->SerializePayload(Data);
3232
}
3333

34-
TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const {
35-
return NArrow::SerializeBatchNoCompression(Data);
34+
TString TSpecialKeys::SerializeFullToString() const {
35+
return NArrow::NSerialization::TSerializerContainer::GetFastestSerializer()->SerializeFull(Data);
3636
}
3737

3838
ui64 TSpecialKeys::GetMemoryBytes() const {
@@ -50,13 +50,17 @@ TFirstLastSpecialKeys::TFirstLastSpecialKeys(const std::shared_ptr<arrow::Record
5050
if (columnNames.size()) {
5151
keyBatch = NArrow::TColumnOperator().VerifyIfAbsent().Extract(batch, columnNames);
5252
}
53-
std::vector<ui64> indexes = {0};
54-
if (batch->num_rows() > 1) {
55-
indexes.emplace_back(batch->num_rows() - 1);
56-
}
53+
if (keyBatch->num_rows() <= 2) {
54+
Data = keyBatch;
55+
} else {
56+
std::vector<ui64> indexes = { 0 };
57+
if (batch->num_rows() > 1) {
58+
indexes.emplace_back(batch->num_rows() - 1);
59+
}
5760

58-
Data = NArrow::CopyRecords(keyBatch, indexes);
59-
Y_ABORT_UNLESS(Data->num_rows() == 1 || Data->num_rows() == 2);
61+
Data = NArrow::CopyRecords(keyBatch, indexes);
62+
Y_ABORT_UNLESS(Data->num_rows() == 1 || Data->num_rows() == 2);
63+
}
6064
}
6165

6266
TMinMaxSpecialKeys::TMinMaxSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::shared_ptr<arrow::Schema>& schema) {

ydb/core/formats/arrow/special_keys.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ class TSpecialKeys {
2222
public:
2323
ui64 GetMemoryBytes() const;
2424

25-
TString SerializeToStringDataOnlyNoCompression() const;
26-
2725
TSpecialKeys(const TString& data, const std::shared_ptr<arrow::Schema>& schema) {
2826
Data = NArrow::DeserializeBatch(data, schema);
2927
Y_ABORT_UNLESS(Data);
@@ -34,7 +32,8 @@ class TSpecialKeys {
3432
Y_ABORT_UNLESS(DeserializeFromString(data));
3533
}
3634

37-
TString SerializeToString() const;
35+
TString SerializePayloadToString() const;
36+
TString SerializeFullToString() const;
3837
ui64 GetMemorySize() const;
3938
};
4039

ydb/core/protos/tx_columnshard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ message TLogicalMetadata {
8989
optional string SpecialKeysRawData = 6;
9090
optional TEvWrite.EModificationType ModificationType = 7;
9191
optional NKikimrArrowSchema.TSchemaSubset SchemaSubset = 8;
92+
optional string SpecialKeysPayloadData = 9;
9293
}
9394

9495
message TEvWriteResult {

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
1010
meta.SetNumRows(batch->GetRowsCount());
1111
meta.SetRawBytes(batch->GetRawBytes());
1212
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
13-
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe());
13+
meta.SetSpecialKeysRawData(batch->GetSpecialKeysFullSafe());
14+
meta.SetSpecialKeysPayloadData(batch->GetSpecialKeysPayloadSafe());
1415

1516
const auto& blobRange = batch.GetRange();
1617
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());

ydb/core/tx/columnshard/engines/insert_table/meta.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,23 @@ NKikimrTxColumnShard::TLogicalMetadata TInsertedDataMeta::SerializeToProto() con
66
return OriginalProto;
77
}
88

9-
const std::optional<NKikimr::NArrow::TFirstLastSpecialKeys>& TInsertedDataMeta::GetSpecialKeys() const {
10-
if (!KeysParsed) {
11-
if (OriginalProto.HasSpecialKeysRawData()) {
12-
SpecialKeysParsed = NArrow::TFirstLastSpecialKeys(OriginalProto.GetSpecialKeysRawData());
13-
}
14-
KeysParsed = true;
9+
std::shared_ptr<NArrow::TFirstLastSpecialKeys> TInsertedDataMeta::GetSpecialKeys(const std::shared_ptr<arrow::Schema>& schema) const {
10+
if (KeyInitialized.Val()) {
11+
return SpecialKeysParsed;
1512
}
16-
return SpecialKeysParsed;
13+
std::shared_ptr<NArrow::TFirstLastSpecialKeys> result;
14+
if (OriginalProto.HasSpecialKeysPayloadData()) {
15+
result = std::make_shared<NArrow::TFirstLastSpecialKeys>(OriginalProto.GetSpecialKeysPayloadData(), schema);
16+
} else if (OriginalProto.HasSpecialKeysRawData()) {
17+
result = std::make_shared<NArrow::TFirstLastSpecialKeys>(OriginalProto.GetSpecialKeysRawData());
18+
} else {
19+
AFL_VERIFY(false);
20+
}
21+
if (AtomicCas(&KeyInitialization, 1, 0)) {
22+
SpecialKeysParsed = result;
23+
KeyInitialized = 1;
24+
}
25+
return result;
1726
}
1827

1928
}

ydb/core/tx/columnshard/engines/insert_table/meta.h

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ class TInsertedDataMeta {
1717
YDB_READONLY(NEvWrite::EModificationType, ModificationType, NEvWrite::EModificationType::Upsert);
1818
YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset);
1919

20-
mutable bool KeysParsed = false;
21-
mutable std::optional<NArrow::TFirstLastSpecialKeys> SpecialKeysParsed;
22-
20+
mutable TAtomicCounter KeyInitialized = 0;
21+
mutable TAtomic KeyInitialization = 0;
22+
mutable std::shared_ptr<NArrow::TFirstLastSpecialKeys> SpecialKeysParsed;
2323
NKikimrTxColumnShard::TLogicalMetadata OriginalProto;
24+
std::shared_ptr<NArrow::TFirstLastSpecialKeys> GetSpecialKeys(const std::shared_ptr<arrow::Schema>& schema) const;
2425

25-
const std::optional<NArrow::TFirstLastSpecialKeys>& GetSpecialKeys() const;
2626
public:
2727
ui64 GetTxVolume() const {
2828
return 2 * sizeof(ui64) + sizeof(ui32) + sizeof(OriginalProto) + (SpecialKeysParsed ? SpecialKeysParsed->GetMemoryBytes() : 0);
@@ -43,19 +43,11 @@ class TInsertedDataMeta {
4343
}
4444
}
4545

46-
std::optional<NArrow::TReplaceKey> GetFirstPK(const std::shared_ptr<arrow::Schema>& schema) const {
47-
if (GetSpecialKeys()) {
48-
return GetSpecialKeys()->GetFirst(schema);
49-
} else {
50-
return {};
51-
}
46+
NArrow::TReplaceKey GetFirstPK(const std::shared_ptr<arrow::Schema>& schema) const {
47+
return GetSpecialKeys(schema)->GetFirst();
5248
}
53-
std::optional<NArrow::TReplaceKey> GetLastPK(const std::shared_ptr<arrow::Schema>& schema) const {
54-
if (GetSpecialKeys()) {
55-
return GetSpecialKeys()->GetLast(schema);
56-
} else {
57-
return {};
58-
}
49+
NArrow::TReplaceKey GetLastPK(const std::shared_ptr<arrow::Schema>& schema) const {
50+
return GetSpecialKeys(schema)->GetLast();
5951
}
6052

6153
NKikimrTxColumnShard::TLogicalMetadata SerializeToProto() const;

0 commit comments

Comments
 (0)