Skip to content

Commit 8fde16d

Browse files
authored
remove obsolete invalidation from kqp (#11965)
1 parent 8edc4ee commit 8fde16d

File tree

4 files changed

+11
-21
lines changed

4 files changed

+11
-21
lines changed

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,6 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
251251

252252
for (const auto& x : request->ResultSet) {
253253
if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
254-
// invalidate table
255-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {}));
256-
257254
switch (x.Status) {
258255
case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
259256
statusCode = NDqProto::StatusIds::SCHEME_ERROR;
@@ -633,7 +630,6 @@ void TKqpScanFetcherActor::ResolveShard(TShardState& state) {
633630

634631
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
635632
request->ResultSet.emplace_back(std::move(keyDesc));
636-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {}));
637633
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
638634
}
639635

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,6 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
520520
ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(),
521521
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
522522

523-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
524523
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
525524
}
526525

@@ -544,9 +543,6 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
544543

545544
for (const auto& x : request->ResultSet) {
546545
if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
547-
// invalidate table
548-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
549-
550546
switch (x.Status) {
551547
case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
552548
statusCode = NDqProto::StatusIds::SCHEME_ERROR;

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
586586
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
587587
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
588588

589-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
590589
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
591590

592591
SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ namespace {
6060
evWrite->Record.SetTxId(txId);
6161
auto* protoLocks = evWrite->Record.MutableLocks();
6262
protoLocks->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
63-
63+
6464
const auto prepareSettings = txManager->GetPrepareTransactionInfo();
6565
if (!prepareSettings.ArbiterColumnShard) {
6666
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
@@ -360,7 +360,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
360360
CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(MessageSettings, ResolveAttempts));
361361
TlsActivationContext->Schedule(
362362
CalculateNextAttemptDelay(MessageSettings, ResolveAttempts),
363-
new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
363+
new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
364364
}
365365

366366
void Handle(TEvPrivate::TEvResolveRequestPlanned::TPtr&) {
@@ -395,7 +395,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
395395
TableWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::TableWriteActorTableNavigate, TableWriteActorSpan.GetTraceId(),
396396
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
397397

398-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}), 0, 0, TableWriteActorStateSpan.GetTraceId());
399398
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, TableWriteActorStateSpan.GetTraceId());
400399
}
401400

@@ -496,7 +495,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
496495
}()
497496
<< ", Cookie=" << ev->Cookie);
498497

499-
498+
500499

501500
switch (ev->Get()->GetStatus()) {
502501
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
@@ -581,7 +580,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
581580
NYql::NDqProto::StatusIds::UNAVAILABLE,
582581
getIssues());
583582
return;
584-
}
583+
}
585584
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
586585
CA_LOG_W("Got OVERLOADED for table `"
587586
<< SchemeEntry->TableId.PathId.ToString() << "`."
@@ -808,7 +807,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
808807
? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE
809808
: NKikimrDataEvents::TEvWrite::MODE_PREPARE)
810809
: NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
811-
810+
812811
if (isImmediateCommit) {
813812
const auto locks = TxManager->GetLocks(shardId);
814813
if (!locks.empty()) {
@@ -944,7 +943,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
944943
if (TableWriteActorSpan) {
945944
TableWriteActorSpan.EndError(message);
946945
}
947-
946+
948947
Callbacks->OnError(message, statusCode, subIssues);
949948
}
950949

@@ -1359,7 +1358,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
13591358
} else {
13601359
token = *ev->Get()->Token;
13611360
}
1362-
1361+
13631362
auto& queue = DataQueues[token.TableId];
13641363
queue.emplace();
13651364
auto& message = queue.back();
@@ -1372,7 +1371,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
13721371

13731372
ev->Get()->Data = nullptr;
13741373
ev->Get()->Alloc = nullptr;
1375-
1374+
13761375
Process();
13771376
}
13781377

@@ -1672,7 +1671,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
16721671
queue.pop();
16731672
}
16741673
}
1675-
1674+
16761675
for (auto& [_, info] : WriteInfos) {
16771676
if (info.WriteTableActor) {
16781677
info.WriteTableActor->Terminate();
@@ -1846,7 +1845,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
18461845
NYql::NDqProto::StatusIds::UNAVAILABLE,
18471846
getIssues());
18481847
return;
1849-
}
1848+
}
18501849
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
18511850
CA_LOG_W("Got OVERLOADED for table ."
18521851
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
@@ -2023,7 +2022,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
20232022
BufferWriteActorState.EndError(message);
20242023
BufferWriteActor.EndError(message);
20252024
CA_LOG_E(message << ". statusCode=" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << ". subIssues=" << subIssues.ToString() << ". sessionActorId=" << SessionActorId << ". isRollback=" << (State == EState::ROLLINGBACK));
2026-
2025+
20272026
Y_ABORT_UNLESS(!HasError);
20282027
HasError = true;
20292028
if (State != EState::ROLLINGBACK) {

0 commit comments

Comments
 (0)