Skip to content

Avoid persistent and in-memory tx status getting out-of-sync #14781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_boot_stages.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ namespace NBoot {

const auto was = Back->DatabaseImpl->Rewind(Back->Serial);

// Notify database that all merges have completed
Back->DatabaseImpl->MergeDone();

result.Database = new NTable::TDatabase(Back->DatabaseImpl.Release());

if (auto logl = Env->Logger()->Log(ELnLev::Info)) {
Expand Down
27 changes: 18 additions & 9 deletions ydb/core/tablet_flat/flat_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,14 @@ TEpoch TDatabase::TxSnapTable(ui32 table)
return DatabaseImpl->FlushTable(table);
}

TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const
TAutoPtr<TSubset> TDatabase::CompactionSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle) const
{
return Require(table)->Subset(bundle, before);
return Require(table)->CompactionSubset(before, bundle);
}

TAutoPtr<TSubset> TDatabase::PartSwitchSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus) const
{
return Require(table)->PartSwitchSubset(before, bundle, txStatus);
}

TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const
Expand Down Expand Up @@ -617,14 +622,13 @@ void TDatabase::ReplaceSlices(ui32 table, TBundleSlicesMap slices)
return DatabaseImpl->ReplaceSlices(table, std::move(slices));
}

void TDatabase::Replace(ui32 table, TArrayRef<const TPartView> partViews, const TSubset &subset)
void TDatabase::Replace(
ui32 table,
const TSubset& subset,
TArrayRef<const TPartView> newParts,
TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus)
{
return DatabaseImpl->Replace(table, partViews, subset);
}

void TDatabase::ReplaceTxStatus(ui32 table, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> txStatus, const TSubset &subset)
{
return DatabaseImpl->ReplaceTxStatus(table, txStatus, subset);
return DatabaseImpl->Replace(table, subset, newParts, newTxStatus);
}

void TDatabase::Merge(ui32 table, TPartView partView)
Expand All @@ -642,6 +646,11 @@ void TDatabase::Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart> txStatus)
return DatabaseImpl->Merge(table, std::move(txStatus));
}

void TDatabase::MergeDone(ui32 table)
{
return DatabaseImpl->MergeDone(table);
}

TAlter& TDatabase::Alter()
{
Y_ABORT_UNLESS(Redo, "Scheme change must be done within a transaction");
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/tablet_flat/flat_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ class TDatabase {
void UpdateApproximateFreeSharesByChannel(const THashMap<ui32, float>& approximateFreeSpaceShareByChannel);
TString SnapshotToLog(ui32 table, TTxStamp);

TAutoPtr<TSubset> Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const;
TAutoPtr<TSubset> CompactionSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle) const;
TAutoPtr<TSubset> PartSwitchSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus) const;
TAutoPtr<TSubset> Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const;
TAutoPtr<TSubset> ScanSnapshot(ui32 table, TRowVersion snapshot = TRowVersion::Max());

Expand All @@ -235,11 +236,15 @@ class TDatabase {
TBundleSlicesMap LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const;
void ReplaceSlices(ui32 table, TBundleSlicesMap slices);

void Replace(ui32 table, TArrayRef<const TPartView>, const TSubset&);
void ReplaceTxStatus(ui32 table, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>, const TSubset&);
void Replace(
ui32 table,
const TSubset&,
TArrayRef<const TPartView>,
TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>);
void Merge(ui32 table, TPartView);
void Merge(ui32 table, TIntrusiveConstPtr<TColdPart>);
void Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart>);
void MergeDone(ui32 table);

void DebugDumpTable(ui32 table, IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const;
void DebugDump(IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const;
Expand Down
31 changes: 21 additions & 10 deletions ydb/core/tablet_flat/flat_dbase_naked.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,51 +481,62 @@ namespace NTable {
wrap.Aggr(Stats, true /* enter */);
}

void Replace(ui32 tid, TArrayRef<const TPartView> partViews, const TSubset &subset) noexcept
void Replace(
ui32 tid,
const TSubset &subset,
TArrayRef<const TPartView> newParts,
TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus) noexcept
{
auto &wrap = Get(tid, true);

wrap.Aggr(Stats, false /* leave */);
wrap->Replace(partViews, subset);
wrap->Replace(subset, newParts, newTxStatus);
wrap.Aggr(Stats, true /* enter */);
}

void ReplaceTxStatus(ui32 tid, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> txStatus, const TSubset &subset) noexcept
void Merge(ui32 tid, TPartView partView) noexcept
{
auto &wrap = Get(tid, true);

wrap.Aggr(Stats, false /* leave */);
wrap->ReplaceTxStatus(txStatus, subset);
wrap->Merge(std::move(partView));
wrap.Aggr(Stats, true /* enter */);
}

void Merge(ui32 tid, TPartView partView) noexcept
void Merge(ui32 tid, TIntrusiveConstPtr<TColdPart> part) noexcept
{
auto &wrap = Get(tid, true);

wrap.Aggr(Stats, false /* leave */);
wrap->Merge(std::move(partView));
wrap->Merge(std::move(part));
wrap.Aggr(Stats, true /* enter */);
}

void Merge(ui32 tid, TIntrusiveConstPtr<TColdPart> part) noexcept
void Merge(ui32 tid, TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
{
auto &wrap = Get(tid, true);

wrap.Aggr(Stats, false /* leave */);
wrap->Merge(std::move(part));
wrap->Merge(std::move(txStatus));
wrap.Aggr(Stats, true /* enter */);
}

void Merge(ui32 tid, TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
void MergeDone(ui32 tid) noexcept
{
auto &wrap = Get(tid, true);

wrap.Aggr(Stats, false /* leave */);
wrap->Merge(std::move(txStatus));
wrap->MergeDone();
wrap.Aggr(Stats, true /* enter */);
}

void MergeDone() noexcept
{
for (auto &pr : Tables) {
MergeDone(pr.first);
}
}

bool ApplySchema(const TSchemeChanges &delta)
{
TModifier modifier(*Scheme);
Expand Down
35 changes: 24 additions & 11 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}

if (partSwitch.FollowerUpdateStep) {
auto subset = Database->Subset(partSwitch.TableId, partSwitch.Leaving, partSwitch.Head);
auto subset = Database->PartSwitchSubset(partSwitch.TableId, partSwitch.Head, partSwitch.Leaving, partSwitch.LeavingTxStatus);

if (partSwitch.Head != subset->Head) {
Y_ABORT("Follower table epoch head has diverged from leader");
Expand All @@ -1488,30 +1488,36 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}

Y_ABORT_UNLESS(newColdParts.empty(), "Unexpected cold part at a follower");
Database->Replace(partSwitch.TableId, std::move(newParts), *subset);
Database->ReplaceTxStatus(partSwitch.TableId, std::move(newTxStatus), *subset);
Database->Replace(partSwitch.TableId, *subset, std::move(newParts), std::move(newTxStatus));

for (auto &gone : subset->Flatten)
DropCachesOfBundle(*gone);

Send(Owner->Tablet(), new TEvTablet::TEvFGcAck(Owner->TabletID(), Generation(), partSwitch.FollowerUpdateStep));
} else {
bool merged = false;
for (auto &partView : newParts) {
Database->Merge(partSwitch.TableId, partView);
merged = true;

if (CompactionLogic) {
CompactionLogic->BorrowedPart(partSwitch.TableId, std::move(partView));
}
}
for (auto &part : newColdParts) {
Database->Merge(partSwitch.TableId, part);
merged = true;

if (CompactionLogic) {
CompactionLogic->BorrowedPart(partSwitch.TableId, std::move(part));
}
}
for (auto &txStatus : newTxStatus) {
Database->Merge(partSwitch.TableId, txStatus);
merged = true;
}
if (merged) {
Database->MergeDone(partSwitch.TableId);
}
}

Expand All @@ -1533,7 +1539,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
// N.B. there should be a single source table per part switch
for (auto& [sourceTable, state] : perTable) {
// Rebase source parts to their respective new epochs
auto srcSubset = Database->Subset(sourceTable, state.Bundles, NTable::TEpoch::Zero());
auto srcSubset = Database->PartSwitchSubset(sourceTable, NTable::TEpoch::Zero(), state.Bundles, { });
TVector<NTable::TPartView> rebased(Reserve(srcSubset->Flatten.size()));
for (const auto& partView : srcSubset->Flatten) {
Y_ABORT_UNLESS(!partView->TxIdStats, "Cannot move parts with uncommitted deltas");
Expand All @@ -1542,7 +1548,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}

// Remove source parts from the source table
Database->Replace(sourceTable, { }, *srcSubset);
Database->Replace(sourceTable, *srcSubset, { }, { });

if (CompactionLogic) {
CompactionLogic->RemovedParts(sourceTable, state.Bundles);
Expand All @@ -1557,6 +1563,8 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
}
}

Database->MergeDone(partSwitch.TableId);
}
}

Expand Down Expand Up @@ -2310,7 +2318,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
}

// Remove source parts from the source table
Database->Replace(src, { }, *srcSubset);
Database->Replace(src, *srcSubset, { }, { });

const auto logicResult = CompactionLogic->RemovedParts(src, labels);

Expand Down Expand Up @@ -2342,6 +2350,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
Database->Merge(dst, partView);
CompactionLogic->BorrowedPart(dst, partView);
}
Database->MergeDone(dst);

// Serialize rebased parts as moved from the source table
NKikimrExecutorFlat::TTablePartSwitch proto;
Expand Down Expand Up @@ -3050,7 +3059,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable::
TAutoPtr<NTable::TSubset> subset;

if (params) {
subset = Database->Subset(table, { }, params->Edge.Head);
subset = Database->CompactionSubset(table, params->Edge.Head, { });

if (params->Parts) {
subset->Flatten.insert(subset->Flatten.end(), params->Parts.begin(), params->Parts.end());
Expand Down Expand Up @@ -3397,8 +3406,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
newParts.emplace_back(result.Part);
}

Database->Replace(tableId, newParts, *ops->Subset);
Database->ReplaceTxStatus(tableId, newTxStatus, *ops->Subset);
Database->Replace(tableId, *ops->Subset, newParts, newTxStatus);

TVector<TLogoBlobID> bundles(Reserve(ops->Subset->Flatten.size() + ops->Subset->ColdParts.size()));
for (auto &part: ops->Subset->Flatten) {
Expand Down Expand Up @@ -4525,23 +4533,28 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
if (!memTableSnapshot->GetCommittedTransactions().empty() || !memTableSnapshot->GetRemovedTransactions().empty()) {
// We must compact tx status when mem table has changes
compactTxStatus = true;
break;
}
}
for (const auto& txStatus : snapshot->Subset->TxStatus) {
if (txStatus->Label.TabletID() != Owner->TabletID()) {
// We want to compact borrowed tx status
compactTxStatus = true;
break;
}
}
if (snapshot->Subset->TxStatus && snapshot->Subset->GarbageTransactions) {
// We want to remove garbage transactions
compactTxStatus = true;
}

if (compactTxStatus) {
comp->CommittedTransactions = snapshot->Subset->CommittedTransactions;
comp->RemovedTransactions = snapshot->Subset->RemovedTransactions;
comp->Frozen.reserve(snapshot->Subset->Frozen.size());
for (auto& memTableSnapshot : snapshot->Subset->Frozen) {
comp->Frozen.push_back(memTableSnapshot.MemTable);
}
comp->TxStatus = snapshot->Subset->TxStatus;
comp->GarbageTransactions = snapshot->Subset->GarbageTransactions;
} else {
// We are not compacting tx status, avoid deleting current blobs
snapshot->Subset->TxStatus.clear();
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/tablet_flat/flat_executor_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ namespace NTabletFlatExecutor {
THolder<NTable::TCompactionParams> Params;
NTable::TRowVersionRanges::TSnapshot RemovedRowVersions;

// Non-empty when compaction also needs to write a tx status table part
NTable::TTransactionMap CommittedTransactions;
NTable::TTransactionSet RemovedTransactions;
// The above may contain extra keys, these allow them to be narrowed
// Non-empty when compaction also needs to produce a tx status table part
TVector<TIntrusiveConstPtr<NTable::TMemTable>> Frozen;
TVector<TIntrusiveConstPtr<NTable::TTxStatusPart>> TxStatus;
// Non-empty for transactions that no longer need their status maintained
NTable::TTransactionSet GarbageTransactions;
};

}
Expand Down
Loading
Loading