Skip to content

remove obsolete invalidation from kqp #11965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,6 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet

for (const auto& x : request->ResultSet) {
if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
// invalidate table
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {}));

switch (x.Status) {
case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
statusCode = NDqProto::StatusIds::SCHEME_ERROR;
Expand Down Expand Up @@ -628,7 +625,6 @@ void TKqpScanFetcherActor::ResolveShard(TShardState& state) {

auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->ResultSet.emplace_back(std::move(keyDesc));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}

Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kqp/runtime/kqp_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,6 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}

Expand All @@ -544,9 +543,6 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq

for (const auto& x : request->ResultSet) {
if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
// invalidate table
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));

switch (x.Status) {
case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
statusCode = NDqProto::StatusIds::SCHEME_ERROR;
Expand Down
9 changes: 4 additions & 5 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
};

struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
: ReadId(readId)
, LastSeqNo(lastSeqNo)
, InstantStart(instantStart) {
Expand Down Expand Up @@ -259,7 +259,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath();
LookupActorStateSpan.EndError(errorMsg);

Expand Down Expand Up @@ -419,7 +419,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
auto readIt = Reads.find(ev->Get()->ReadId);
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
auto& read = readIt->second;

if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
if (ev->Get()->InstantStart) {
read.SetFinished();
Expand Down Expand Up @@ -566,10 +566,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));

Counters->IteratorsShardResolve->Inc();
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));

SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
Expand Down
23 changes: 11 additions & 12 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace {
evWrite->Record.SetTxId(txId);
auto* protoLocks = evWrite->Record.MutableLocks();
protoLocks->SetOp(NKikimrDataEvents::TKqpLocks::Commit);

const auto prepareSettings = txManager->GetPrepareTransactionInfo();
if (!prepareSettings.ArbiterColumnShard) {
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
Expand Down Expand Up @@ -360,7 +360,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(MessageSettings, ResolveAttempts));
TlsActivationContext->Schedule(
CalculateNextAttemptDelay(MessageSettings, ResolveAttempts),
new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
}

void Handle(TEvPrivate::TEvResolveRequestPlanned::TPtr&) {
Expand Down Expand Up @@ -395,7 +395,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
TableWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::TableWriteActorTableNavigate, TableWriteActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}), 0, 0, TableWriteActorStateSpan.GetTraceId());
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, TableWriteActorStateSpan.GetTraceId());
}

Expand Down Expand Up @@ -496,7 +495,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
}()
<< ", Cookie=" << ev->Cookie);



switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
Expand Down Expand Up @@ -568,7 +567,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
}
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
CA_LOG_W("Got OVERLOADED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`."
Expand Down Expand Up @@ -795,7 +794,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE
: NKikimrDataEvents::TEvWrite::MODE_PREPARE)
: NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);

if (isImmediateCommit) {
const auto locks = TxManager->GetLocks(shardId);
if (!locks.empty()) {
Expand Down Expand Up @@ -931,7 +930,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
if (TableWriteActorSpan) {
TableWriteActorSpan.EndError(message);
}

Callbacks->OnError(message, statusCode, subIssues);
}

Expand Down Expand Up @@ -1346,7 +1345,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
} else {
token = *ev->Get()->Token;
}

auto& queue = DataQueues[token.TableId];
queue.emplace();
auto& message = queue.back();
Expand All @@ -1359,7 +1358,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub

ev->Get()->Data = nullptr;
ev->Get()->Alloc = nullptr;

Process();
}

Expand Down Expand Up @@ -1659,7 +1658,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
queue.pop();
}
}

for (auto& [_, info] : WriteInfos) {
if (info.WriteTableActor) {
info.WriteTableActor->Terminate();
Expand Down Expand Up @@ -1821,7 +1820,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
}
}
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
CA_LOG_W("Got OVERLOADED for table ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
Expand Down Expand Up @@ -1998,7 +1997,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
BufferWriteActorState.EndError(message);
BufferWriteActor.EndError(message);
CA_LOG_E(message << ". statusCode=" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << ". subIssues=" << subIssues.ToString() << ". sessionActorId=" << SessionActorId << ". isRollback=" << (State == EState::ROLLINGBACK));

Y_ABORT_UNLESS(!HasError);
HasError = true;
if (State != EState::ROLLINGBACK) {
Expand Down
Loading