Skip to content

Commit 8d748e2

Browse files
Merge be651b5 into 1dcd7d2
2 parents 1dcd7d2 + be651b5 commit 8d748e2

File tree

37 files changed

+846
-687
lines changed

37 files changed

+846
-687
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,13 @@ class IChunkedArray {
9696
return Addresses.size();
9797
}
9898

99-
ui32 GetLocalIndex(const ui32 position) const {
100-
AFL_VERIFY(Contains(position))("pos", position)("start", GlobalStartPosition);
101-
return position - GlobalStartPosition;
99+
ui32 GetLocalIndex(const ui32 global) const {
100+
AFL_VERIFY(Contains(global))("pos", global)("start", GlobalStartPosition);
101+
return global - GlobalStartPosition;
102+
}
103+
104+
ui32 GetGlobalIndex(const ui32 local) const {
105+
return local + GlobalStartPosition;
102106
}
103107

104108
bool Contains(const ui32 position) const {

ydb/core/formats/arrow/accessor/abstract/constructor.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,12 @@ TConstructorContainer TConstructorContainer::GetDefaultConstructor() {
88
return result;
99
}
1010

11+
TString IConstructor::SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
12+
AFL_VERIFY(columnData);
13+
AFL_VERIFY(columnData->GetType() == Type)("column", columnData->GetType())("current", Type);
14+
AFL_VERIFY(columnData->GetDataType()->Equals(externalInfo.GetColumnType()))("column", columnData->GetDataType()->ToString())(
15+
"external", externalInfo.GetColumnType()->ToString());
16+
return DoSerializeToString(columnData, externalInfo);
17+
}
18+
1119
}

ydb/core/formats/arrow/accessor/abstract/constructor.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,7 @@ class IConstructor {
4343

4444
virtual ~IConstructor() = default;
4545

46-
TString SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
47-
AFL_VERIFY(columnData);
48-
AFL_VERIFY(columnData->GetType() == Type)("column", columnData->GetType())("current", Type);
49-
return DoSerializeToString(columnData, externalInfo);
50-
}
46+
TString SerializeToString(const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const;
5147

5248
bool IsEqualWithSameTypeTo(const IConstructor& item) const {
5349
return DoIsEqualWithSameTypeTo(item);

ydb/core/formats/arrow/accessor/sparsed/accessor.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ class TSparsedArray: public IChunkedArray {
194194
const std::shared_ptr<arrow::Scalar>& defaultValue, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount);
195195

196196
public:
197+
static EType GetTypeStatic() {
198+
return EType::SparsedArray;
199+
}
200+
197201
virtual void Reallocate() override;
198202

199203
static std::shared_ptr<TSparsedArray> BuildFalseArrayUI8(const ui32 recordsCount) {
@@ -223,6 +227,10 @@ class TSparsedArray: public IChunkedArray {
223227
return Record;
224228
}
225229

230+
const TSparsedArrayChunk& GetSparsedChunk() const {
231+
return Record;
232+
}
233+
226234
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
227235
return Record.GetScalar(index);
228236
}

ydb/core/kqp/ut/olap/helpers/get_value.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ ui64 GetUint64(const NYdb::TValue& v) {
110110
}
111111
}
112112

113+
ui64 GetInt64(const NYdb::TValue& v) {
114+
NYdb::TValueParser value(v);
115+
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {
116+
return *value.GetOptionalInt64();
117+
} else {
118+
return value.GetInt64();
119+
}
120+
}
121+
113122
TString GetUtf8(const NYdb::TValue& v) {
114123
NYdb::TValueParser value(v);
115124
if (value.GetKind() == NYdb::TTypeParser::ETypeKind::Optional) {

ydb/core/kqp/ut/olap/helpers/get_value.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ void PrintRows(IOutputStream& out, const TVector<THashMap<TString, NYdb::TValue>
1010
ui64 GetUint32(const NYdb::TValue& v);
1111
i64 GetInt32(const NYdb::TValue& v);
1212
ui64 GetUint64(const NYdb::TValue& v);
13+
ui64 GetInt64(const NYdb::TValue& v);
1314
TString GetUtf8(const NYdb::TValue& v);
1415
TInstant GetTimestamp(const NYdb::TValue& v);
1516

ydb/core/kqp/ut/olap/sparsed_ut.cpp

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,25 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
4444
return GetUint64(rows[0].at("count"));
4545
}
4646

47-
ui32 GetDefaultsCount(const TString& fieldName, const TString& defValueStr) const {
47+
ui32 GetDefaultsCount(const TString& fieldName, const TString& defValueStr, std::set<ui32>* notDefaultIds = nullptr) const {
48+
{
49+
auto selectQueryTmpl = TString(R"(SELECT pk_int FROM `/Root/)") + (StoreName.empty() ? "" : StoreName + "/") +
50+
R"(olapTable`
51+
WHERE %s != %s
52+
ORDER BY pk_int
53+
)";
54+
55+
auto tableClient = Kikimr.GetTableClient();
56+
auto rows = ExecuteScanQuery(tableClient, Sprintf(selectQueryTmpl.c_str(), fieldName.c_str(), defValueStr.c_str()));
57+
if (notDefaultIds) {
58+
std::set<ui32> result;
59+
for (auto&& i : rows) {
60+
AFL_VERIFY(result.emplace(GetInt64(i.at("pk_int"))).second);
61+
}
62+
*notDefaultIds = result;
63+
}
64+
65+
}
4866
auto selectQueryTmpl = TString(R"(
4967
SELECT
5068
count(*) as count,
@@ -116,12 +134,26 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
116134
}
117135
}
118136

119-
void CheckTable(const TString& fieldName, const TString& defValueStr, bool firstCall, ui32 countExpectation, ui32& defCountStart) {
120-
const ui32 defCount = GetDefaultsCount(fieldName, defValueStr);
137+
void CheckTable(const TString& fieldName, const TString& defValueStr, bool firstCall, ui32 countExpectation, ui32& defCountStart, std::set<ui32>* notDefaultValues = nullptr) {
138+
std::set<ui32> ndvLocal;
139+
const ui32 defCount = GetDefaultsCount(fieldName, defValueStr, &ndvLocal);
121140
if (firstCall) {
122141
defCountStart = defCount;
142+
if (notDefaultValues) {
143+
*notDefaultValues = ndvLocal;
144+
}
123145
} else {
124-
AFL_VERIFY(defCountStart == defCount);
146+
AFL_VERIFY(defCountStart == defCount)("start", defCountStart)("current", defCount);
147+
if (notDefaultValues) {
148+
auto it1 = ndvLocal.begin();
149+
auto it2 = notDefaultValues->begin();
150+
while (it1 != ndvLocal.end() && it2 != notDefaultValues->end()) {
151+
AFL_VERIFY(*it1 == *it2)("local", *it1)("check", *it2)("local_size", ndvLocal.size())("check_size", notDefaultValues->size());
152+
++it1;
153+
++it2;
154+
}
155+
AFL_VERIFY(ndvLocal.size() == notDefaultValues->size())("local", ndvLocal.size())("check", notDefaultValues->size());
156+
}
125157
}
126158
const ui32 count = GetCount();
127159
AFL_VERIFY(count == countExpectation)("expect", countExpectation)("count", count);
@@ -144,35 +176,28 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
144176
checkTable(true);
145177
printTime("checkTable");
146178

147-
CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
148-
CSController->WaitIndexation(TDuration::Seconds(5));
149-
printTime("wait");
150-
151-
checkTable(false);
152-
printTime("checkTable");
153-
154179
CSController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
155180
CSController->WaitCompactions(TDuration::Seconds(5));
156181
printTime("wait");
157182

158183
checkTable(false);
159184
printTime("checkTable");
160185

161-
CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
162186
CSController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
163187
printTime("wait");
164188
}
165189

166190
void FillCircle(const double shiftKff, const ui32 countExpectation) {
167191
ui32 defCountStart = (ui32)-1;
192+
std::set<ui32> notDefaultValues;
168193
FillCircleImpl([&]() {
169194
TTypedLocalHelper helper("Utf8", Kikimr, "olapTable", StoreName);
170195
const double frq = 0.9;
171196
NArrow::NConstruction::TStringPoolFiller sPool(1000, 52, "abcde", frq);
172197
helper.FillTable(sPool, shiftKff, 10000);
173198
},
174199
[&](bool firstCall) {
175-
CheckTable("field", "'abcde'", firstCall, countExpectation, defCountStart);
200+
CheckTable("field", "'abcde'", firstCall, countExpectation, defCountStart, &notDefaultValues);
176201
});
177202
}
178203

@@ -212,7 +237,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
212237
)
213238
PARTITION BY HASH(pk_int)
214239
WITH (
215-
STORE = COLUMN
240+
STORE = COLUMN,
241+
PARTITION_COUNT = 64
216242
))";
217243
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
218244
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

ydb/core/tx/columnshard/blobs_action/transaction/tx_draft.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ namespace NKikimr::NColumnShard {
44

55
bool TTxWriteDraft::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
66
TMemoryProfileGuard mpg("TTxWriteDraft::Execute");
7+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "draft_started");
78
NOlap::TBlobManagerDb blobManagerDb(txc.DB);
89
for (auto&& action : WriteController->GetBlobActions()) {
910
action.second->OnExecuteTxBeforeWrite(*Self, blobManagerDb);
1011
}
12+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "draft_finished");
1113
return true;
1214
}
1315

1416
void TTxWriteDraft::Complete(const TActorContext& ctx) {
1517
TMemoryProfileGuard mpg("TTxWriteDraft::Complete");
18+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "draft_completed");
1619
Completed = true;
1720
for (auto&& action : WriteController->GetBlobActions()) {
1821
action.second->OnCompleteTxBeforeWrite(*Self);

ydb/core/tx/columnshard/columnshard__write_index.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,13 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
5454
const TConclusion<bool> needDraftTransaction = writeController->GetBlobsAction().NeedDraftWritingTransaction();
5555
AFL_VERIFY(needDraftTransaction.IsSuccess())("error", needDraftTransaction.GetErrorMessage());
5656
if (*needDraftTransaction) {
57+
ACFL_DEBUG("event", "TTxWriteDraft");
5758
Execute(new TTxWriteDraft(this, writeController));
5859
} else if (needDiskLimiter) {
60+
ACFL_DEBUG("event", "Limiter");
5961
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID()));
6062
} else {
63+
ACFL_DEBUG("event", "WriteActor");
6164
Register(CreateWriteActor(TabletID(), writeController, TInstant::Max()));
6265
}
6366
}

0 commit comments

Comments
 (0)