Skip to content

Commit a0c44ac

Browse files
fix stats for splitting (#1079)
* fix stats for splitting * fix
1 parent 4fbaac4 commit a0c44ac

File tree

5 files changed

+48
-34
lines changed

5 files changed

+48
-34
lines changed

ydb/core/tx/columnshard/splitter/batch_slice.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch>
142142
auto columnSaver = schema->GetColumnSaver(c.GetColumnId());
143143
auto stats = schema->GetColumnSerializationStats(c.GetColumnId());
144144
TSimpleSplitter splitter(columnSaver, Counters);
145-
splitter.SetStats(stats);
145+
if (stats) {
146+
splitter.SetStats(*stats);
147+
}
146148
std::vector<IPortionColumnChunk::TPtr> chunks;
147149
for (auto&& i : splitter.Split(i, c.GetField(), Settings.GetMaxBlobSize())) {
148150
chunks.emplace_back(std::make_shared<TSplittedColumnChunk>(c.GetColumnId(), i, Schema));

ydb/core/tx/columnshard/splitter/simple.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class TSplitChunk {
7474
result.emplace_back(*this);
7575
} else {
7676
Counters->SimpleSplitter.OnTrashSerialized(blob.size());
77-
TSimpleSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data));
77+
TBatchSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data));
7878
SplitFactor = stats.PredictOptimalSplitFactor(Data->num_rows(), MaxBlobSize).value_or(1);
7979
if (SplitFactor == 1) {
8080
SplitFactor = 2;
@@ -105,7 +105,7 @@ class TSplitChunk {
105105
if (badStartPosition) {
106106
AFL_VERIFY(badBatchRecordsCount && badBatchCount)("count", badBatchCount)("records", badBatchRecordsCount);
107107
auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount);
108-
TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
108+
TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
109109
result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters);
110110
badStartPosition = {};
111111
badBatchRecordsCount = 0;
@@ -118,7 +118,7 @@ class TSplitChunk {
118118
}
119119
if (badStartPosition) {
120120
auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount);
121-
TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
121+
TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
122122
result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters);
123123
}
124124
++SplitFactor;

ydb/core/tx/columnshard/splitter/simple.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class TLinearSplitInfo {
110110
class TSimpleSplitter {
111111
private:
112112
TColumnSaver ColumnSaver;
113-
YDB_ACCESSOR_DEF(std::optional<TColumnSerializationStat>, Stats);
113+
YDB_ACCESSOR_DEF(std::optional<TBatchSerializationStat>, Stats);
114114
std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
115115
public:
116116
explicit TSimpleSplitter(const TColumnSaver& columnSaver, std::shared_ptr<NColumnShard::TSplitterCounters> counters)

ydb/core/tx/columnshard/splitter/stats.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ std::optional<TBatchSerializationStat> TSerializationStats::GetStatsForRecordBat
1010
if (!columnInfo) {
1111
return {};
1212
} else if (!result) {
13-
result = TBatchSerializationStat(*columnInfo);
14-
} else {
15-
result->Merge(*columnInfo);
13+
result = TBatchSerializationStat();
1614
}
15+
result->Merge(*columnInfo);
1716
}
1817
return result;
1918
}

ydb/core/tx/columnshard/splitter/stats.h

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ class TSimpleSerializationStat {
3030
Y_ABORT_UNLESS(RawBytes);
3131
}
3232

33+
double GetSerializedBytesPerRecord() const {
34+
AFL_VERIFY(RecordsCount);
35+
return 1.0 * SerializedBytes / RecordsCount;
36+
}
37+
double GetRawBytesPerRecord() const {
38+
AFL_VERIFY(RecordsCount);
39+
return 1.0 * RawBytes / RecordsCount;
40+
}
41+
3342
ui64 GetSerializedBytes() const{
3443
return SerializedBytes;
3544
}
@@ -54,28 +63,47 @@ class TSimpleSerializationStat {
5463
Y_ABORT_UNLESS(RawBytes >= stat.RawBytes);
5564
RawBytes -= stat.RawBytes;
5665
}
66+
};
5767

58-
double GetPackedRecordSize() const {
59-
return (double)SerializedBytes / RecordsCount;
68+
class TBatchSerializationStat {
69+
protected:
70+
double SerializedBytesPerRecord = 0;
71+
double RawBytesPerRecord = 0;
72+
public:
73+
TBatchSerializationStat() = default;
74+
TBatchSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes) {
75+
Y_ABORT_UNLESS(recordsCount);
76+
SerializedBytesPerRecord = 1.0 * bytes / recordsCount;
77+
RawBytesPerRecord = 1.0 * rawBytes / recordsCount;
78+
}
79+
80+
TBatchSerializationStat(const TSimpleSerializationStat& simple) {
81+
SerializedBytesPerRecord = simple.GetSerializedBytesPerRecord();
82+
RawBytesPerRecord = simple.GetRawBytesPerRecord();
83+
}
84+
85+
void Merge(const TSimpleSerializationStat& item) {
86+
SerializedBytesPerRecord += item.GetSerializedBytesPerRecord();
87+
RawBytesPerRecord += item.GetRawBytesPerRecord();
6088
}
6189

6290
std::optional<ui64> PredictOptimalPackRecordsCount(const ui64 recordsCount, const ui64 blobSize) const {
63-
if (!RecordsCount) {
91+
if (!SerializedBytesPerRecord) {
6492
return {};
6593
}
66-
const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes;
94+
const ui64 fullSize = 1.0 * recordsCount * SerializedBytesPerRecord;
6795
if (fullSize < blobSize) {
6896
return recordsCount;
6997
} else {
70-
return std::floor(1.0 * blobSize / SerializedBytes * RecordsCount);
98+
return std::floor(1.0 * blobSize / SerializedBytesPerRecord);
7199
}
72100
}
73101

74102
std::optional<ui64> PredictOptimalSplitFactor(const ui64 recordsCount, const ui64 blobSize) const {
75-
if (!RecordsCount) {
103+
if (!SerializedBytesPerRecord) {
76104
return {};
77105
}
78-
const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes;
106+
const ui64 fullSize = 1.0 * recordsCount * SerializedBytesPerRecord;
79107
if (fullSize < blobSize) {
80108
return 1;
81109
} else {
@@ -84,25 +112,6 @@ class TSimpleSerializationStat {
84112
}
85113
};
86114

87-
class TBatchSerializationStat: public TSimpleSerializationStat {
88-
private:
89-
using TBase = TSimpleSerializationStat;
90-
public:
91-
using TBase::TBase;
92-
TBatchSerializationStat(const TSimpleSerializationStat& item)
93-
: TBase(item)
94-
{
95-
96-
}
97-
98-
void Merge(const TSimpleSerializationStat& item) {
99-
SerializedBytes += item.GetSerializedBytes();
100-
RawBytes += item.GetRawBytes();
101-
AFL_VERIFY(RecordsCount == item.GetRecordsCount())("self_count", RecordsCount)("new_count", item.GetRecordsCount());
102-
}
103-
104-
};
105-
106115
class TColumnSerializationStat: public TSimpleSerializationStat {
107116
private:
108117
YDB_READONLY(ui32, ColumnId, 0);
@@ -114,6 +123,10 @@ class TColumnSerializationStat: public TSimpleSerializationStat {
114123

115124
}
116125

126+
double GetPackedRecordSize() const {
127+
return (double)SerializedBytes / RecordsCount;
128+
}
129+
117130
TColumnSerializationStat RecalcForRecordsCount(const ui64 recordsCount) const {
118131
TColumnSerializationStat result(ColumnId, ColumnName);
119132
result.Merge(TSimpleSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount, RawBytes / RecordsCount * recordsCount));

0 commit comments

Comments
 (0)