Skip to content

24-2: Always show table name in locks broken error #8799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

void Finalize() {
if (LocksBroken) {
TString message = "Transaction locks invalidated.";

return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
return ReplyErrorAndDie(
Ydb::StatusIds::ABORTED,
YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated. Unknown table."));
}

auto& response = *ResponseEv->Record.MutableResponse();
Expand Down Expand Up @@ -1100,29 +1099,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
shardState->State = TShardState::EState::Finished;

Counters->TxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter?

LocksBroken = true;

TMaybe<TString> tableName;
if (!res->Record.GetTxLocks().empty()) {
auto& lock = res->Record.GetTxLocks(0);
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
if (it != TasksGraph.GetStagesInfo().end()) {
tableName = it->second.Meta.TableConstInfo->Path;
}
}

// Reply as soon as we know which table had locks invalidated
if (tableName) {
auto message = TStringBuilder()
<< "Transaction locks invalidated. Table: " << *tableName;

return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
ResponseEv->BrokenLockPathId = TKikimrPathId(
res->Record.GetTxLocks(0).GetSchemeShard(),
res->Record.GetTxLocks(0).GetPathId());
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}

// Receive more replies from other shards
CheckExecutionComplete();
return;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct TEvKqpExecuter {

NLWTrace::TOrbit Orbit;
IKqpGateway::TKqpSnapshot Snapshot;
std::optional<NYql::TKikimrPathId> BrokenLockPathId;
ui64 ResultRowsCount = 0;
ui64 ResultRowsBytes = 0;

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1722,7 +1722,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);
response.MutableIssues()->Swap(issues);
if (issues) {
response.MutableIssues()->Swap(issues);
}

LOG_T("ReplyErrorAndDie. Response: " << response.DebugString()
<< ", to ActorId: " << Target);
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {

for (const auto& info : tableInfos) {
tableInfoMap.emplace(info.GetTableName(), &info);

TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
TableByIdMap.emplace(pathId, info.GetTableName());
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

// Invalidate query cache on scheme/internal errors
switch (status) {
case Ydb::StatusIds::ABORTED: {
if (ev->BrokenLockPathId) {
issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId));
}
break;
}
case Ydb::StatusIds::SCHEME_ERROR:
case Ydb::StatusIds::INTERNAL_ERROR:
InvalidateQuery();
Expand Down
33 changes: 21 additions & 12 deletions ydb/core/kqp/session_actor/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,33 @@ namespace NKqp {

using namespace NYql;

TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKikimrPathId& pathId) {
TStringBuilder message;
message << "Transaction locks invalidated.";

TMaybe<TString> tableName;
if (invalidatedLock) {
TKikimrPathId id(invalidatedLock->GetSchemeShard(), invalidatedLock->GetPathId());
auto table = txCtx.TableByIdMap.FindPtr(id);
if (table) {
tableName = *table;
if (pathId.OwnerId() != 0) {
auto table = txCtx.TableByIdMap.FindPtr(pathId);
if (!table) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
} else {
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
if (pathId.TableId() == pathId.TableId()) {
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
}
}
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
}
}

if (tableName) {
message << " Table: " << *tableName;
}

return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpTxLock& invalidatedLock) {
return GetLocksInvalidatedIssue(
txCtx,
TKikimrPathId(
invalidatedLock.GetSchemeShard(),
invalidatedLock.GetPathId()));
}

std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ class TTransactionsCache {
}
};

NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);

Expand Down
36 changes: 36 additions & 0 deletions ydb/core/kqp/ut/tx/kqp_locks_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,42 @@ Y_UNIT_TEST_SUITE(KqpLocks) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[[2u];#;[11u];["Session2"]]])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(TwoPhaseTx) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();

auto session1 = db.CreateSession().GetValueSync().GetSession();
auto session2 = db.CreateSession().GetValueSync().GetSession();

auto result = session1.ExecuteDataQuery(Q_(R"(
REPLACE INTO `/Root/Test` (Group, Name, Comment) VALUES (1U, "Paul", "Changed");
SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name;
)"), TTxControl::BeginTx(TTxSettings::SerializableRW())).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto tx1 = result.GetTransaction();
UNIT_ASSERT(tx1);

result = session2.ExecuteDataQuery(Q_(R"(
REPLACE INTO `/Root/Test` (Group, Name, Comment)
VALUES (1U, "Paul", "Changed");
)"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

result = session1.ExecuteDataQuery(Q_(R"(
SELECT * FROM `KeyValue`;
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto commitResult = tx1->Commit().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
commitResult.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED,
[] (const NYql::TIssue& issue) {
return issue.GetMessage().Contains("/Root/Test");
}), commitResult.GetIssues().ToString());
}
}

} // namespace NKqp
Expand Down
Loading