Skip to content

Commit 16418df

Browse files
commit processing fixes (#12519)
1 parent bff00ae commit 16418df

File tree

15 files changed

+111
-80
lines changed

15 files changed

+111
-80
lines changed

ydb/core/tx/columnshard/columnshard__progress_tx.cpp

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
2828
}
2929

3030
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
31-
NActors::TLogContextGuard logGuard =
32-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
33-
Y_ABORT_UNLESS(Self->ProgressTxInFlight);
31+
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tablet_id", Self->TabletID())(
32+
"tx_state", "TTxProgressTx::Execute")("tx_current", Self->ProgressTxInFlight);
33+
if (!Self->ProgressTxInFlight) {
34+
AbortedThroughRemoveExpired = true;
35+
return true;
36+
}
3437
Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds());
3538

3639
const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc);
@@ -45,15 +48,24 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
4548
const auto plannedItem = Self->ProgressTxController->GetFirstPlannedTx();
4649
if (!!plannedItem) {
4750
PlannedQueueItem.emplace(plannedItem->PlanStep, plannedItem->TxId);
48-
ui64 step = plannedItem->PlanStep;
49-
ui64 txId = plannedItem->TxId;
51+
const ui64 step = plannedItem->PlanStep;
52+
const ui64 txId = plannedItem->TxId;
53+
NActors::TLogContextGuard logGuardTx = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tx_id", txId);
54+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart");
5055
TxOperator = Self->ProgressTxController->GetTxOperatorVerified(txId);
5156
if (auto txPrepare = TxOperator->BuildTxPrepareForProgress(Self)) {
57+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "BuildTxPrepareForProgress");
5258
AbortedThroughRemoveExpired = true;
5359
Self->ProgressTxInFlight = txId;
5460
Self->Execute(txPrepare.release(), ctx);
5561
return true;
62+
} else if (TxOperator->IsInProgress()) {
63+
AbortedThroughRemoveExpired = true;
64+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemContinue");
65+
AFL_VERIFY(Self->ProgressTxInFlight == txId);
66+
return true;
5667
} else {
68+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "PopFirstPlannedTx");
5769
Self->ProgressTxController->PopFirstPlannedTx();
5870
}
5971
StartExecution = TMonotonic::Now();
@@ -80,8 +92,9 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
8092
if (AbortedThroughRemoveExpired) {
8193
return;
8294
}
83-
NActors::TLogContextGuard logGuard =
84-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
95+
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)(
96+
"tablet_id", Self->TabletID())(
97+
"tx_state", "TTxProgressTx::Complete");
8598
if (TxOperator) {
8699
TxOperator->ProgressOnComplete(*Self, ctx);
87100
Self->RescheduleWaitingReads();
@@ -104,11 +117,13 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
104117
};
105118

106119
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId) {
107-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID());
120+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTx")("tablet_id", TabletID())("tx_id", continueTxId);
108121
if (continueTxId) {
109122
AFL_VERIFY(!ProgressTxInFlight || ProgressTxInFlight == continueTxId)("current", ProgressTxInFlight)("expected", continueTxId);
110123
}
111124
if (!ProgressTxInFlight || ProgressTxInFlight == continueTxId) {
125+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTxStart")("tablet_id", TabletID())("tx_id", continueTxId)(
126+
"tx_current", ProgressTxInFlight);
112127
ProgressTxInFlight = continueTxId.value_or(0);
113128
Execute(new TTxProgressTx(this), ctx);
114129
}

ydb/core/tx/columnshard/engines/column_engine.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ void IColumnEngine::FetchDataAccessors(const std::shared_ptr<TDataAccessorsReque
3838

3939
TSelectInfo::TStats TSelectInfo::Stats() const {
4040
TStats out;
41-
out.Portions = PortionsOrderedPK.size();
41+
out.Portions = Portions.size();
4242

4343
THashSet<TUnifiedBlobId> uniqBlob;
44-
for (auto& portionInfo : PortionsOrderedPK) {
44+
for (auto& portionInfo : Portions) {
4545
out.Rows += portionInfo->GetRecordsCount();
4646
for (auto& blobId : portionInfo->GetBlobIds()) {
4747
out.Bytes += blobId.BlobSize();
@@ -53,10 +53,10 @@ TSelectInfo::TStats TSelectInfo::Stats() const {
5353

5454
TString TSelectInfo::DebugString() const {
5555
TStringBuilder result;
56-
result << "count:" << PortionsOrderedPK.size() << ";";
57-
if (PortionsOrderedPK.size()) {
56+
result << "count:" << Portions.size() << ";";
57+
if (Portions.size()) {
5858
result << "portions:";
59-
for (auto& portionInfo : PortionsOrderedPK) {
59+
for (auto& portionInfo : Portions) {
6060
result << portionInfo->DebugString();
6161
}
6262
}

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ struct TSelectInfo {
4646
}
4747
};
4848

49-
std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK;
49+
std::vector<std::shared_ptr<TPortionInfo>> Portions;
5050

5151
TStats Stats() const;
5252

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -502,18 +502,22 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
502502
return out;
503503
}
504504

505-
if (withUncommitted) {
506-
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
507-
AFL_VERIFY(portionInfo->HasInsertWriteId());
508-
AFL_VERIFY(!portionInfo->HasCommitSnapshot());
509-
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
510-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
511-
"portion", portionInfo->DebugString());
512-
if (skipPortion) {
505+
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
506+
AFL_VERIFY(portionInfo->HasInsertWriteId());
507+
if (withUncommitted) {
508+
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
513509
continue;
514510
}
515-
out->PortionsOrderedPK.emplace_back(portionInfo);
511+
} else if (!portionInfo->HasCommitSnapshot()) {
512+
continue;
513+
}
514+
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
515+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
516+
"portion", portionInfo->DebugString());
517+
if (skipPortion) {
518+
continue;
516519
}
520+
out->Portions.emplace_back(portionInfo);
517521
}
518522
for (const auto& [_, portionInfo] : spg->GetPortions()) {
519523
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
@@ -525,7 +529,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
525529
if (skipPortion) {
526530
continue;
527531
}
528-
out->PortionsOrderedPK.emplace_back(portionInfo);
532+
out->Portions.emplace_back(portionInfo);
529533
}
530534

531535
return out;

ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ TConclusionStatus TReadMetadata::Init(
2020

2121
SelectInfo = dataAccessor.Select(readDescription, !!LockId);
2222
if (LockId) {
23-
for (auto&& i : SelectInfo->PortionsOrderedPK) {
23+
for (auto&& i : SelectInfo->Portions) {
2424
if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) {
2525
if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) {
2626
} else {

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class TReadMetadata: public NCommon::TReadMetadata {
2121
std::vector<TCommittedBlob> CommittedBlobs;
2222
virtual bool Empty() const override {
2323
Y_ABORT_UNLESS(SelectInfo);
24-
return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty();
24+
return SelectInfo->Portions.empty() && CommittedBlobs.empty();
2525
}
2626

2727
virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
99
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
1010
ui32 sourceIdx = 0;
1111
std::deque<std::shared_ptr<IDataSource>> sources;
12-
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
12+
const auto& portions = GetReadMetadata()->SelectInfo->Portions;
1313
const auto& committed = GetReadMetadata()->CommittedBlobs;
1414
ui64 compactedPortionsBytes = 0;
1515
ui64 insertedPortionsBytes = 0;
@@ -49,7 +49,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
4949
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);
5050

5151
auto& stats = GetReadMetadata()->ReadStats;
52-
stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
52+
stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
5353
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
5454
stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size();
5555
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();

ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class TReadMetadata: public NCommon::TReadMetadata {
1212

1313
virtual bool Empty() const override {
1414
Y_ABORT_UNLESS(SelectInfo);
15-
return SelectInfo->PortionsOrderedPK.empty();
15+
return SelectInfo->Portions.empty();
1616
}
1717

1818
virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
99
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
1010
ui32 sourceIdx = 0;
1111
std::deque<std::shared_ptr<IDataSource>> sources;
12-
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
12+
const auto& portions = GetReadMetadata()->SelectInfo->Portions;
1313
ui64 compactedPortionsBytes = 0;
1414
ui64 insertedPortionsBytes = 0;
1515
for (auto&& i : portions) {
@@ -26,7 +26,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
2626
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);
2727

2828
auto& stats = GetReadMetadata()->ReadStats;
29-
stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
29+
stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
3030
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
3131
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
3232
stats->InsertedPortionsBytes = insertedPortionsBytes;

ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -566,28 +566,28 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
566566
ui64 planStep = 1;
567567
ui64 txId = 0;
568568
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
569-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
569+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
570570
}
571571

572572
{ // select from snap between insert (greater txId)
573573
ui64 planStep = 1;
574574
ui64 txId = 2;
575575
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
576-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
576+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
577577
}
578578

579579
{ // select from snap after insert (greater planStep)
580580
ui64 planStep = 2;
581581
ui64 txId = 1;
582582
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
583-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1);
583+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
584584
}
585585

586586
{ // select another pathId
587587
ui64 planStep = 2;
588588
ui64 txId = 1;
589589
auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
590-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
590+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
591591
}
592592
}
593593

@@ -657,7 +657,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
657657
{ // full scan
658658
ui64 txId = 1;
659659
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
660-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
660+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
661661
}
662662

663663
// predicates
@@ -671,7 +671,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
671671
NOlap::TPKRangesFilter pkFilter(false);
672672
Y_ABORT_UNLESS(pkFilter.Add(gt10k, nullptr, indexInfo.GetReplaceKey()));
673673
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
674-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
674+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
675675
}
676676

677677
{
@@ -683,7 +683,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
683683
NOlap::TPKRangesFilter pkFilter(false);
684684
Y_ABORT_UNLESS(pkFilter.Add(nullptr, lt10k, indexInfo.GetReplaceKey()));
685685
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
686-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9);
686+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 9);
687687
}
688688
}
689689

@@ -841,7 +841,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
841841
{ // full scan
842842
ui64 txId = 1;
843843
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
844-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
844+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
845845
}
846846

847847
// Cleanup
@@ -850,7 +850,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
850850
{ // full scan
851851
ui64 txId = 1;
852852
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
853-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
853+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
854854
}
855855

856856
// TTL
@@ -866,7 +866,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
866866
{ // full scan
867867
ui64 txId = 1;
868868
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
869-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
869+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
870870
}
871871
}
872872
{
@@ -882,7 +882,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
882882
{ // full scan
883883
ui64 txId = 1;
884884
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
885-
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
885+
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
886886
}
887887
}
888888
}

0 commit comments

Comments
 (0)