Skip to content

Commit 02fdbe3

Browse files
authored
Try to preserve in-memory state (locks in particular) across restarts (#12567)
1 parent 576ef7f commit 02fdbe3

19 files changed

+1900
-79
lines changed

ydb/core/protos/counters_datashard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,4 +527,5 @@ enum ETxTypes {
527527
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
528528
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
529529
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
530+
TXTYPE_INIT_RESTORED = 84 [(TxTypeOpts) = {Name: "TxInitRestored"}];
530531
}

ydb/core/protos/datashard_config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ message TDataShardConfig {
2525
optional bool DisabledOnSchemeShard = 19 [default = false];
2626
optional uint64 IncrementalRestoreReadAheadLo = 20 [default = 524288];
2727
optional uint64 IncrementalRestoreReadAheadHi = 21 [default = 1048576];
28+
optional uint64 InMemoryStateMigrationTimeoutMs = 24 [default = 1000];
2829
}

ydb/core/protos/feature_flags.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,6 @@ message TFeatureFlags {
181181
optional bool ForceDistconfDisable = 156 [default = false];
182182
optional bool EnableScaleRecommender = 157 [default = false];
183183
optional bool EnableVDiskThrottling = 158 [default = false];
184+
optional bool EnableDataShardInMemoryStateMigration = 159 [default = true];
185+
optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false];
184186
}

ydb/core/protos/tx_datashard.proto

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2144,3 +2144,118 @@ message TSerializedEvent {
21442144
// TEventSerializationInfo::IsExtendedFormat flag
21452145
optional bool IsExtendedFormat = 2;
21462146
}
2147+
2148+
// In-memory variables that are important to restore between shard generations
2149+
message TInMemoryVars {
2150+
optional NKikimrProto.TRowVersion ImmediateWriteEdge = 1;
2151+
optional NKikimrProto.TRowVersion ImmediateWriteEdgeReplied = 2;
2152+
optional NKikimrProto.TRowVersion UnprotectedReadEdge = 3;
2153+
}
2154+
2155+
// In-memory locks
2156+
message TInMemoryLockInfo {
2157+
optional uint64 LockId = 1;
2158+
optional uint32 LockNodeId = 2;
2159+
optional uint32 Generation = 3;
2160+
optional uint32 Counter = 4;
2161+
optional uint64 CreateTs = 5;
2162+
optional uint64 Flags = 6;
2163+
optional NKikimrProto.TRowVersion BreakVersion = 7;
2164+
repeated NKikimrProto.TPathID ReadTables = 8;
2165+
repeated NKikimrProto.TPathID WriteTables = 9;
2166+
}
2167+
2168+
// In-memory ranges associated with the lock
2169+
// Note: whole shard locks will have whole table read/write ranges
2170+
message TInMemoryLockRange {
2171+
optional uint64 LockId = 1;
2172+
optional NKikimrProto.TPathID TableId = 2;
2173+
optional uint64 Flags = 3;
2174+
optional bytes Data = 4;
2175+
}
2176+
2177+
// In-memory conflicts between locks
2178+
message TInMemoryLockConflict {
2179+
// When this lock is committed
2180+
optional uint64 LockId = 1;
2181+
// It must break another lock with the specified id
2182+
optional uint64 ConflictId = 2;
2183+
}
2184+
2185+
// In-memory volatile dependencies
2186+
// This specifies that to commit this lock we must use a volatile transaction
2187+
// that must have a dependency on the specified TxId.
2188+
message TInMemoryLockVolatileDependency {
2189+
optional uint64 LockId = 1;
2190+
optional uint64 TxId = 2;
2191+
}
2192+
2193+
// In-memory prepared volatile transactions, i.e. those we have accepted for
2194+
// execution and may be planned and executed later. This includes planned and
2195+
// acknowledged transactions that haven't executed yet (including those that
2196+
// have not been committed yet?)
2197+
message TInMemoryPreparedVolatileTx {
2198+
optional uint64 TxId = 1;
2199+
optional NActorsProto.TActorId Source = 2;
2200+
optional uint64 Cookie = 3;
2201+
optional uint64 Type = 4;
2202+
optional bytes Body = 5;
2203+
optional uint64 Flags = 6;
2204+
// This is Min/Max step of a prepared transactions, mostly used for deadline cleanup
2205+
optional uint64 MinStep = 7;
2206+
optional uint64 MaxStep = 8;
2207+
// Specified when the transaction has been planned already (including predictions)
2208+
optional uint64 PredictedStep = 9;
2209+
optional uint64 Step = 10;
2210+
}
2211+
2212+
// In-memory waiting volatile transactions, i.e. those that have not been
2213+
// decided yet (at least persistently), and for which we need to send a
2214+
// result when they eventually commit or abort. This is matched against
2215+
// transactions that are restored from disk.
2216+
message TInMemoryWaitingVolatileTx {
2217+
optional uint64 TxId = 1;
2218+
optional NActorsProto.TActorId Source = 2;
2219+
optional uint64 Cookie = 3;
2220+
optional uint64 Type = 4;
2221+
}
2222+
2223+
// Partial in-memory state of the datashard
2224+
message TInMemoryState {
2225+
optional TInMemoryVars Vars = 1;
2226+
repeated TInMemoryLockInfo Locks = 2;
2227+
repeated TInMemoryLockRange LockRanges = 3;
2228+
repeated TInMemoryLockConflict LockConflicts = 4;
2229+
repeated TInMemoryLockVolatileDependency LockVolatileDependencies = 5;
2230+
repeated TInMemoryPreparedVolatileTx PreparedVolatileTxs = 6;
2231+
repeated TInMemoryWaitingVolatileTx WaitingVolatileTxs = 7;
2232+
}
2233+
2234+
// Internal continuation token, may change between versions
2235+
message TInMemoryStateContinuationToken {
2236+
// Index to continue from
2237+
optional uint64 NextIndex = 1;
2238+
}
2239+
2240+
// Sent to the state actor by new generations
2241+
message TEvInMemoryStateRequest {
2242+
// Generation of the new tablet
2243+
optional uint32 Generation = 1;
2244+
2245+
// Continuation token to get the next response
2246+
optional bytes ContinuationToken = 2;
2247+
}
2248+
2249+
// Sent by the state actor to new generations
2250+
message TEvInMemoryStateResponse {
2251+
// Partial serialized TInMemoryState chunk
2252+
optional uint32 SerializedStatePayloadIndex = 1;
2253+
2254+
// Optional checkpoint offsets within the chunk. The protobuf message may
2255+
// be parsed cleanly up to these offsets without the need to concatenate
2256+
// subsequent chunks.
2257+
repeated uint32 SerializedStateCheckpoints = 2 [packed = true];
2258+
2259+
// Will be set for incomplete replies
2260+
optional bytes ContinuationToken = 3;
2261+
}

ydb/core/tx/datashard/datashard.cpp

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "datashard_impl.h"
22
#include "datashard_txs.h"
33
#include "datashard_locks_db.h"
4+
#include "memory_state_migration.h"
45
#include "probes.h"
56

67
#include <ydb/core/base/interconnect_channels.h>
@@ -174,6 +175,15 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
174175
RegisterDataShardProbes();
175176
}
176177

178+
TDataShard::~TDataShard() {
179+
if (InMemoryRestoreActor) {
180+
InMemoryRestoreActor->OnTabletDestroyed();
181+
}
182+
if (InMemoryStateActor) {
183+
InMemoryStateActor->OnTabletDestroyed();
184+
}
185+
}
186+
177187
void TDataShard::OnDetach(const TActorContext &ctx) {
178188
Cleanup(ctx);
179189
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "OnDetach: " << TabletID());
@@ -265,6 +275,13 @@ void TDataShard::Cleanup(const TActorContext& ctx) {
265275
}
266276

267277
void TDataShard::Die(const TActorContext& ctx) {
278+
if (InMemoryRestoreActor) {
279+
InMemoryRestoreActor->OnTabletDead();
280+
}
281+
if (InMemoryStateActor) {
282+
InMemoryStateActor->OnTabletDead();
283+
}
284+
268285
NTabletPipe::CloseAndForgetClient(SelfId(), SchemeShardPipe);
269286
NTabletPipe::CloseAndForgetClient(SelfId(), StateReportPipe);
270287
NTabletPipe::CloseAndForgetClient(SelfId(), DbStatsReportPipe);
@@ -1620,6 +1637,29 @@ void TDataShard::PersistSys(NIceDb::TNiceDb& db, ui64 key, bool value) const {
16201637
db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Uint64>(value ? 1 : 0));
16211638
}
16221639

1640+
void TDataShard::PersistSys(NIceDb::TNiceDb& db, ui64 key, const TActorId& value) const {
1641+
char buf[sizeof(ui64) * 2];
1642+
WriteUnaligned<ui64>(buf, value.RawX1());
1643+
WriteUnaligned<ui64>(buf + sizeof(ui64), value.RawX2());
1644+
db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Bytes>(TString(buf, sizeof(ui64) * 2)));
1645+
}
1646+
1647+
bool TDataShard::SysGetActorId(NIceDb::TNiceDb& db, ui64 key, TActorId& value) {
1648+
auto rowset = db.Table<Schema::Sys>().Key(key).Select<Schema::Sys::Bytes>();
1649+
if (!rowset.IsReady()) {
1650+
return false;
1651+
}
1652+
if (rowset.IsValid()) {
1653+
TString buf = rowset.GetValue<Schema::Sys::Bytes>();
1654+
Y_ABORT_UNLESS(buf.size() == sizeof(ui64) * 2, "Unexpected TActorId value size %" PRISZT, buf.size());
1655+
const char* data = buf.data();
1656+
ui64 x1 = ReadUnaligned<ui64>(data);
1657+
ui64 x2 = ReadUnaligned<ui64>(data + sizeof(ui64));
1658+
value = TActorId(x1, x2);
1659+
}
1660+
return true;
1661+
}
1662+
16231663
void TDataShard::PersistUserTable(NIceDb::TNiceDb& db, ui64 tableId, const TUserTable& tableInfo) {
16241664
db.Table<Schema::UserTables>().Key(tableId).Update(
16251665
NIceDb::TUpdate<Schema::UserTables::LocalTid>(tableInfo.LocalTid),
@@ -2484,11 +2524,18 @@ void TDataShard::SendImmediateWriteResult(
24842524
const ui64 step = version.Step;
24852525
const ui64 observedStep = GetMaxObservedStep();
24862526
if (step <= observedStep) {
2487-
SnapshotManager.PromoteImmediateWriteEdgeReplied(version);
2488-
if (!sessionId) {
2489-
Send(target, event, 0, cookie, span.GetTraceId());
2527+
// We avoid sending replies that would have promoted the replied edge
2528+
// when it's frozen. This prevents us replying and causing the next
2529+
// generation to potentially keep reading stale data.
2530+
if (Y_LIKELY(!InMemoryVarsFrozen) || version <= SnapshotManager.GetImmediateWriteEdgeReplied()) {
2531+
SnapshotManager.PromoteImmediateWriteEdgeReplied(version);
2532+
if (!sessionId) {
2533+
Send(target, event, 0, cookie, span.GetTraceId());
2534+
} else {
2535+
SendViaSession(sessionId, target, SelfId(), event, 0, cookie, span.GetTraceId());
2536+
}
24902537
} else {
2491-
SendViaSession(sessionId, target, SelfId(), event, 0, cookie, span.GetTraceId());
2538+
span.EndError("Dropped");
24922539
}
24932540
return;
24942541
}
@@ -2627,13 +2674,20 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
26272674
}
26282675

26292676
if (step <= mediatorStep) {
2630-
if (it->second.Span) {
2631-
it->second.Span.Attribute("ActivateStep", std::to_string(mediatorStep));
2677+
// We avoid sending replies that would have promoted the replied edge
2678+
// when it's frozen. This prevents us replying and causing the next
2679+
// generation to potentially keep reading stale data.
2680+
if (Y_LIKELY(!InMemoryVarsFrozen) || it->first <= SnapshotManager.GetImmediateWriteEdgeReplied()) {
2681+
if (it->second.Span) {
2682+
it->second.Span.Attribute("ActivateStep", std::to_string(mediatorStep));
2683+
}
2684+
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
2685+
SendViaSession(it->second.SessionId,
2686+
it->second.Target, SelfId(), it->second.Event.Release(),
2687+
0, it->second.Cookie, it->second.Span.GetTraceId());
2688+
} else {
2689+
it->second.Span.EndError("Dropped");
26322690
}
2633-
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
2634-
SendViaSession(it->second.SessionId,
2635-
it->second.Target, SelfId(), it->second.Event.Release(),
2636-
0, it->second.Cookie, it->second.Span.GetTraceId());
26372691
it = MediatorDelayedReplies.erase(it);
26382692
continue;
26392693
}
@@ -2693,6 +2747,12 @@ bool TDataShard::NeedMediatorStateRestored() const {
26932747
return false;
26942748
}
26952749

2750+
if (InMemoryVarsRestored) {
2751+
// We have migrated in-memory vars from previous generations
2752+
// We don't need mediator state for correctness
2753+
return false;
2754+
}
2755+
26962756
switch (State) {
26972757
case TShardState::Ready:
26982758
case TShardState::Readonly:

ydb/core/tx/datashard/datashard.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,10 @@ namespace TEvDataShard {
342342
EvReadScanStarted,
343343
EvReadScanFinished,
344344

345+
// Used to transfer in-memory state between generations
346+
EvInMemoryStateRequest,
347+
EvInMemoryStateResponse,
348+
345349
EvEnd
346350
};
347351

@@ -1769,6 +1773,29 @@ namespace TEvDataShard {
17691773
Record.SetErrorDescription(error);
17701774
}
17711775
};
1776+
1777+
struct TEvInMemoryStateRequest
1778+
: public TEventPB<TEvInMemoryStateRequest,
1779+
NKikimrTxDataShard::TEvInMemoryStateRequest,
1780+
EvInMemoryStateRequest>
1781+
{
1782+
TEvInMemoryStateRequest() = default;
1783+
1784+
explicit TEvInMemoryStateRequest(ui32 generation, const TString& continuationToken = {}) {
1785+
Record.SetGeneration(generation);
1786+
if (!continuationToken.empty()) {
1787+
Record.SetContinuationToken(continuationToken);
1788+
}
1789+
}
1790+
};
1791+
1792+
struct TEvInMemoryStateResponse
1793+
: public TEventPB<TEvInMemoryStateResponse,
1794+
NKikimrTxDataShard::TEvInMemoryStateResponse,
1795+
EvInMemoryStateResponse>
1796+
{
1797+
TEvInMemoryStateResponse() = default;
1798+
};
17721799
};
17731800

17741801
IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info);

0 commit comments

Comments
 (0)