Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/CMakeLists.darwin-arm64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/CMakeLists.darwin-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/CMakeLists.linux-aarch64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/CMakeLists.linux-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/CMakeLists.windows-x86_64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_misc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_observer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_sausagecache.cpp
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tablet_flat/flat_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,11 @@ const TDbStats& TDatabase::Counters() const noexcept
return DatabaseImpl->Stats;
}

void TDatabase::SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr) noexcept
{
Require(table)->SetTableObserver(std::move(ptr));
}

TDatabase::TChg TDatabase::Head(ui32 table) const noexcept
{
if (table == Max<ui32>()) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "flat_dbase_change.h"
#include "flat_dbase_misc.h"
#include "flat_iterator.h"
#include "flat_table_observer.h"
#include "util_basics.h"

namespace NKikimr {
Expand Down Expand Up @@ -57,6 +58,8 @@ class TDatabase {
TDatabase(TDatabaseImpl *databaseImpl = nullptr) noexcept;
~TDatabase();

void SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr) noexcept;

/* Returns durable monotonic change number for table or entire database
on default (table = Max<ui32>()). Serial is incremented for each
successful Commit(). AHTUNG: Serial may go to the past in case of
Expand Down
52 changes: 43 additions & 9 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ TTableSnapshotContext::~TTableSnapshotContext() = default;

using namespace NResourceBroker;

class TExecutor::TActiveTransactionZone {
public:
explicit TActiveTransactionZone(TExecutor* self) noexcept
: Self(self)
{
Y_DEBUG_ABORT_UNLESS(!Self->ActiveTransaction);
Self->ActiveTransaction = true;
Active = true;
}

~TActiveTransactionZone() noexcept {
Done();
}

void Done() noexcept {
if (Active) {
Self->ActiveTransaction = false;
Active = false;
}
}

private:
TExecutor* Self;
bool Active = false;
};

TExecutor::TExecutor(
NFlatExecutorSetup::ITablet* owner,
const TActorId& ownerActorId)
Expand Down Expand Up @@ -876,6 +902,9 @@ void TExecutor::ApplyFollowerUpdate(THolder<TEvTablet::TFUpdateBody> update) {
if (update->IsSnapshot) // do nothing over snapshot after initial one
return;

// Protect against recursive transactions in callbacks
TActiveTransactionZone activeTransaction(this);

TString schemeUpdate;
TString dataUpdate;
TStackVec<TString> partSwitches;
Expand Down Expand Up @@ -940,6 +969,11 @@ void TExecutor::ApplyFollowerUpdate(THolder<TEvTablet::TFUpdateBody> update) {
if (schemeUpdate) {
ReadResourceProfile();
ReflectSchemeSettings();
Owner->OnFollowerSchemaUpdated();
}

if (dataUpdate) {
Owner->OnFollowerDataUpdated();
}
}

Expand Down Expand Up @@ -1006,8 +1040,10 @@ void TExecutor::ApplyFollowerAuxUpdate(const TString &auxBody) {
const TString aux = NPageCollection::TSlicer::Lz4()->Decode(auxBody);
TProtoBox<NKikimrExecutorFlat::TFollowerAux> proto(aux);

if (proto.HasUserAuxUpdate())
if (proto.HasUserAuxUpdate()) {
TActiveTransactionZone activeTransaction(this);
Owner->OnLeaderUserAuxUpdate(std::move(proto.GetUserAuxUpdate()));
}
}

void TExecutor::RequestFromSharedCache(TAutoPtr<NPageCollection::TFetch> fetch,
Expand Down Expand Up @@ -1644,9 +1680,7 @@ void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
}

void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ctx) {
Y_DEBUG_ABORT_UNLESS(!ActiveTransaction);

ActiveTransaction = true;
TActiveTransactionZone activeTransaction(this);
++seat->Retries;

THPTimer cpuTimer;
Expand Down Expand Up @@ -1736,7 +1770,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
}
PrivatePageCache->ResetTouchesAndToLoad(false);

ActiveTransaction = false;
activeTransaction.Done();
PlanTransactionActivation();
}

Expand Down Expand Up @@ -2813,7 +2847,7 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
Y_ABORT_UNLESS(msg->Generation == Generation());
const ui32 step = msg->Step;

ActiveTransaction = true;
TActiveTransactionZone activeTransaction(this);

GcLogic->OnCommitLog(step, msg->ConfirmedOnSend, ctx);
CommitManager->Confirm(step);
Expand Down Expand Up @@ -2907,7 +2941,7 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
std::move(msg->GroupWrittenOps),
ctx);

ActiveTransaction = false;
activeTransaction.Done();
PlanTransactionActivation();

MaybeRelaxRejectProbability();
Expand Down Expand Up @@ -3236,7 +3270,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
return Broken();
}

ActiveTransaction = true;
TActiveTransactionZone activeTransaction(this);

const ui64 snapStamp = msg->Params->Edge.TxStamp ? msg->Params->Edge.TxStamp
: MakeGenStepPair(Generation(), msg->Step);
Expand Down Expand Up @@ -3469,7 +3503,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
Owner->CompactionComplete(tableId, OwnerCtx());
MaybeRelaxRejectProbability();

ActiveTransaction = false;
activeTransaction.Done();

if (LogicSnap->MayFlush(false)) {
MakeLogSnapshot();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ class TExecutor
THashMap<ui64, THolder<TScanSnapshot>> ScanSnapshots;
ui64 ScanSnapshotId = 1;

class TActiveTransactionZone;

bool ActiveTransaction = false;
bool BrokenTransaction = false;
ui32 ActivateTransactionWaiting = 0;
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tablet_flat/flat_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,9 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemG
}

MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions);
if (TableObserver) {
TableObserver->OnUpdate(rop, key, ops, rowVersion);
}
}

void TTable::AddTxRef(ui64 txId)
Expand Down Expand Up @@ -863,6 +866,10 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMe
} else {
Y_DEBUG_ABORT_UNLESS(TxRefs[txId] > 0);
}

if (TableObserver) {
TableObserver->OnUpdateTx(rop, key, ops, txId);
}
}

void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
Expand Down Expand Up @@ -1338,6 +1345,11 @@ TCompactionStats TTable::GetCompactionStats() const
return stats;
}

void TTable::SetTableObserver(TIntrusivePtr<ITableObserver> ptr) noexcept
{
TableObserver = std::move(ptr);
}

void TPartStats::Add(const TPartView& partView)
{
PartsCount += 1;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tablet_flat/flat_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "flat_table_stats.h"
#include "flat_table_subset.h"
#include "flat_table_misc.h"
#include "flat_table_observer.h"
#include "flat_sausage_solid.h"
#include "util_basics.h"

Expand Down Expand Up @@ -322,7 +323,7 @@ class TTable: public TAtomicRefCount<TTable> {

TCompactionStats GetCompactionStats() const;

void FillTxStatusCache(THashMap<TLogoBlobID, TSharedData>& cache) const noexcept;
void SetTableObserver(TIntrusivePtr<ITableObserver> ptr) noexcept;

private:
TMemTable& MemTable();
Expand Down Expand Up @@ -358,6 +359,7 @@ class TTable: public TAtomicRefCount<TTable> {
absl::flat_hash_set<ui64> CheckTransactions;
TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
TIntrusivePtr<ITableObserver> TableObserver;

private:
struct TRollbackRemoveTxRef {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/flat_table_observer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "flat_table_observer.h"
31 changes: 31 additions & 0 deletions ydb/core/tablet_flat/flat_table_observer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
#include "defs.h"
#include "flat_row_eggs.h"
#include "flat_update_op.h"

#include <util/generic/ptr.h>

namespace NKikimr::NTable {

class ITableObserver : public TThrRefBase {
public:
/**
* Called when a new update is applied to the table
*/
virtual void OnUpdate(
ERowOp rop,
TArrayRef<const TRawTypeValue> key,
TArrayRef<const TUpdateOp> ops,
TRowVersion rowVersion) = 0;

/**
* Called when an uncommitted update is applied to the table
*/
virtual void OnUpdateTx(
ERowOp rop,
TArrayRef<const TRawTypeValue> key,
TArrayRef<const TUpdateOp> ops,
ui64 txId) = 0;
};

} // namespace NKikimr::NTable
8 changes: 8 additions & 0 deletions ydb/core/tablet_flat/tablet_flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ namespace NFlatExecutorSetup {
void ITablet::OnFollowersCountChanged() {
// nothing by default
}

void ITablet::OnFollowerSchemaUpdated() {
// nothing by default
}

void ITablet::OnFollowerDataUpdated() {
// nothing by default
}
}

}}
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ namespace NFlatExecutorSetup {

virtual void OnFollowersCountChanged();

virtual void OnFollowerSchemaUpdated();
virtual void OnFollowerDataUpdated();

// create transaction?
protected:
ITablet(TTabletStorageInfo *info, const TActorId &tablet)
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tablet_flat/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ SRCS(
flat_table_part.cpp
flat_table_part.h
flat_table_misc.cpp
flat_table_observer.cpp
flat_table_observer.h
flat_update_op.h
probes.cpp
shared_handle.cpp
Expand Down