Skip to content

Commit 640be37

Browse files
correct generations usage for ignore deprecated sessions (#4563)
1 parent 8782a39 commit 640be37

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class TShardScannerInfo {
2424
private:
2525
std::optional<TActorId> ActorId;
2626
const ui64 TabletId;
27+
const ui64 Generation;
2728
i64 DataChunksInFlightCount = 0;
2829
bool TracingStarted = false;
2930
const ui64 FreeSpace = (ui64)8 << 20;
@@ -46,18 +47,19 @@ class TShardScannerInfo {
4647
TracingStarted = true;
4748
}
4849
if (NActors::TlsActivationContext) {
49-
NActors::TActivationContext::AsActorContext().Send(*ActorId, new TEvKqpCompute::TEvScanDataAck(FreeSpace, TabletId, 1), flags, TabletId);
50+
NActors::TActivationContext::AsActorContext().Send(*ActorId, new TEvKqpCompute::TEvScanDataAck(FreeSpace, Generation, 1), flags, TabletId);
5051
}
5152
}
5253
public:
5354
TShardScannerInfo(TShardState& state, const IExternalObjectsProvider& externalObjectsProvider)
5455
: TabletId(state.TabletId)
56+
, Generation(++state.Generation)
5557
{
5658
const bool subscribed = std::exchange(state.SubscribedOnTablet, true);
5759

5860
const auto& keyColumnTypes = externalObjectsProvider.GetKeyColumnTypes();
5961
auto ranges = state.GetScanRanges(keyColumnTypes);
60-
auto ev = externalObjectsProvider.BuildEvKqpScan(0, TabletId, ranges);
62+
auto ev = externalObjectsProvider.BuildEvKqpScan(0, Generation, ranges);
6163

6264
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("info", state.ToString(keyColumnTypes))
6365
("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry))("subscribed", subscribed);
@@ -278,9 +280,9 @@ class TInFlightShards: public NComputeActor::TScanShardsStatistics {
278280
}
279281
}
280282

281-
void RegisterScannerActor(const ui64 tabletId, const TActorId& scanActorId) {
283+
void RegisterScannerActor(const ui64 tabletId, const ui64 generation, const TActorId& scanActorId) {
282284
auto state = GetShardState(tabletId);
283-
if (!state) {
285+
if (!state || generation != state->Generation) {
284286
return;
285287
}
286288
AFL_ENSURE(state->State == NComputeActor::EShardState::Starting)("state", state->State);

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,15 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr&
106106
}
107107
auto& msg = ev->Get()->Record;
108108
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
109-
InFlightShards.RegisterScannerActor(msg.GetTabletId(), scanActorId);
109+
InFlightShards.RegisterScannerActor(msg.GetTabletId(), msg.GetGeneration(), scanActorId);
110110
}
111111

112112
void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
113113
if (!InFlightShards.IsActive()) {
114114
return;
115115
}
116116
auto state = InFlightShards.GetShardStateByActorId(ev->Sender);
117-
if (!state) {
117+
if (!state || state->Generation != ev->Get()->Generation) {
118118
return;
119119
}
120120
AFL_ENSURE(state->State == EShardState::Running)("state", state->State)("actor_id", state->ActorId)("ev_sender", ev->Sender);
@@ -152,6 +152,9 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev)
152152
return;
153153
}
154154
}
155+
if (state->Generation != ev->Get()->Record.GetGeneration()) {
156+
return;
157+
}
155158

156159
if (state->State == EShardState::Starting) {
157160
++TotalRetries;

0 commit comments

Comments
 (0)