Skip to content

Commit f60b60f

Browse files
authored
YQ fixed hanging and TLI errors for distributed WM (#6263)
1 parent c32c9c8 commit f60b60f

File tree

7 files changed

+137
-69
lines changed

7 files changed

+137
-69
lines changed

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "actors.h"
22

3+
#include <ydb/core/base/path.h>
4+
35
#include <ydb/core/kqp/common/events/events.h>
46
#include <ydb/core/kqp/common/simple/services.h>
57
#include <ydb/core/kqp/workload_service/common/events.h>
@@ -46,7 +48,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
4648
TPoolHandlerActorBase(void (TDerived::* requestFunc)(TAutoPtr<IEventHandle>& ev), const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters)
4749
: TBase(requestFunc)
4850
, CountersRoot(counters)
49-
, CountersSubgroup(counters->GetSubgroup("pool", TStringBuilder() << database << "/" << poolId))
51+
, CountersSubgroup(counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId)))
5052
, Database(database)
5153
, PoolId(poolId)
5254
, QueueSizeLimit(GetMaxQueueSize(poolConfig))
@@ -475,6 +477,13 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
475477
}
476478
}
477479

480+
void PassAway() override {
481+
GlobalInFly->Set(0);
482+
GlobalDelayedRequests->Set(0);
483+
484+
TBase::PassAway();
485+
}
486+
478487
protected:
479488
bool ShouldResign() const override {
480489
return InFlightLimit == 0 || InFlightLimit == std::numeric_limits<ui64>::max();
@@ -519,6 +528,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
519528
RemoveFinishedRequests();
520529
RefreshRequired |= !PendingRequests.empty();
521530
RefreshRequired |= (GetLocalInFlight() || !DelayedRequests.empty()) && TInstant::Now() - LastRefreshTime > LEASE_DURATION / 4;
531+
RefreshRequired |= GlobalState.AmountRequests() && TInstant::Now() - LastRefreshTime > LEASE_DURATION;
522532
if (RefreshRequired) {
523533
RefreshRequired = false;
524534
RunningOperation = true;
@@ -532,7 +542,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
532542
LOG_T("Try to start scheduled refresh");
533543

534544
RefreshState();
535-
if (GetLocalInFlight() + DelayedRequests.size() > 0) {
545+
if (GetLocalSessionsCount() || GlobalState.AmountRequests()) {
536546
ScheduleRefresh();
537547
}
538548
}
@@ -564,6 +574,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
564574
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
565575
LOG_E("refresh pool state failed " << ev->Get()->Status << ", issues: " << ev->Get()->Issues.ToOneLineString());
566576
RefreshRequired = true;
577+
ScheduleRefresh();
567578
return;
568579
}
569580

@@ -573,6 +584,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
573584
LastRefreshTime = TInstant::Now();
574585

575586
GlobalState = ev->Get()->PoolState;
587+
if (GlobalState.AmountRequests()) {
588+
ScheduleRefresh();
589+
}
576590
GlobalInFly->Set(GlobalState.RunningRequests);
577591
GlobalDelayedRequests->Set(GlobalState.DelayedRequests);
578592
LOG_T("succefully refreshed pool state, in flight: " << GlobalState.RunningRequests << ", delayed: " << GlobalState.DelayedRequests);
@@ -598,10 +612,12 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
598612

599613
DoDelayRequest();
600614
DoStartDelayedRequest();
615+
RefreshState();
601616
};
602617

603618
void Handle(TEvPrivate::TEvDelayRequestResponse::TPtr& ev) {
604619
RunningOperation = false;
620+
ScheduleRefresh();
605621

606622
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
607623
LOG_E("failed to delay request " << ev->Get()->Status << ", session id: " << ev->Get()->SessionId << ", issues: " << ev->Get()->Issues.ToOneLineString());
@@ -617,7 +633,6 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
617633
GlobalDelayedRequests->Inc();
618634
LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId);
619635

620-
ScheduleRefresh();
621636
DoStartDelayedRequest();
622637
RefreshState();
623638
};
@@ -637,12 +652,13 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
637652
return;
638653
}
639654

640-
if (!sessionId) {
641-
LOG_D("first request in queue is remote, send notification to node " << ev->Get()->NodeId);
655+
const ui32 nodeId = ev->Get()->NodeId;
656+
if (SelfId().NodeId() != nodeId) {
657+
LOG_D("first request in queue is remote, send notification to node " << nodeId);
642658
auto event = std::make_unique<TEvPrivate::TEvRefreshPoolState>();
643659
event->Record.SetPoolId(PoolId);
644660
event->Record.SetDatabase(Database);
645-
this->Send(MakeKqpWorkloadServiceId(ev->Get()->NodeId), std::move(event));
661+
this->Send(MakeKqpWorkloadServiceId(nodeId), std::move(event));
646662
RefreshState();
647663
return;
648664
}
@@ -680,7 +696,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
680696
}
681697

682698
for (const TString& sessionId : ev->Get()->SesssionIds) {
683-
LOG_T("succefully cleanuped request, session id: " << sessionId);
699+
LOG_T("cleanuped request, session id: " << sessionId);
684700
if (TRequest* request = GetRequestSafe(sessionId)) {
685701
FinalReply(request, ev->Get()->Status, ev->Get()->Issues);
686702
}
@@ -696,7 +712,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
696712
return;
697713
}
698714

699-
if (!PendingRequests.empty() && DelayedRequests.empty() && GlobalState.DelayedRequests == 0 && GlobalState.RunningRequests < InFlightLimit) {
715+
if (!PendingRequests.empty() && QueueSizeLimit == 0 && GlobalState.RunningRequests < InFlightLimit) {
700716
RunningOperation = true;
701717
const TString& sessionId = PopPendingRequest();
702718
this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, CountersSubgroup));

ydb/core/kqp/workload_service/tables/table_queries.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class TTablesCreator : public NTableCreator::TMultiTableCreator {
9292
Col("wait_deadline", NScheme::NTypeIds::Timestamp),
9393
Col("lease_deadline", NScheme::NTypeIds::Timestamp),
9494
},
95-
{ "database", "pool_id", "start_time", "session_id" },
95+
{ "database", "pool_id", "node_id", "start_time", "session_id" },
9696
NKikimrServices::KQP_WORKLOAD_SERVICE,
9797
TtlCol("lease_deadline", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
9898
);
@@ -383,7 +383,7 @@ class TRefreshPoolStateQuery : public TQueryBase {
383383
.Utf8(PoolId)
384384
.Build();
385385

386-
RunDataQuery(sql, &params);
386+
RunDataQuery(sql, &params, TTxControl::BeginAndCommitTx(true));
387387
SetQueryResultHandler(&TRefreshPoolStateQuery::OnQueryResult, "Describe pool");
388388
}
389389

@@ -515,6 +515,8 @@ class TStartFirstDelayedRequestQuery : public TQueryBase {
515515
{}
516516

517517
void OnRunQuery() override {
518+
RequestNodeId = SelfId().NodeId();
519+
518520
TString sql = TStringBuilder() << R"(
519521
-- TStartFirstDelayedRequestQuery::OnRunQuery
520522
DECLARE $database AS Text;
@@ -539,7 +541,7 @@ class TStartFirstDelayedRequestQuery : public TQueryBase {
539541
.Utf8(PoolId)
540542
.Build();
541543

542-
RunDataQuery(sql, &params);
544+
RunDataQuery(sql, &params, TTxControl::BeginAndCommitTx(true));
543545
SetQueryResultHandler(&TStartFirstDelayedRequestQuery::OnGetPoolInfo, "Describe pool");
544546
}
545547

@@ -599,6 +601,7 @@ class TStartFirstDelayedRequestQuery : public TQueryBase {
599601
DELETE FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(`
600602
WHERE database = $database
601603
AND pool_id = $pool_id
604+
AND node_id = $node_id
602605
AND start_time = $start_time
603606
AND session_id = $session_id;
604607
@@ -780,6 +783,7 @@ class TCleanupRequestsQuery : public TQueryBase {
780783
DELETE FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(`
781784
WHERE database = $database
782785
AND pool_id = $pool_id
786+
AND node_id = $node_id
783787
AND session_id IN $session_ids;
784788
785789
DELETE FROM `)" << TTablesCreator::GetRunningRequestsPath() << R"(`

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
404404
}
405405

406406
void WaitPoolHandlersCount(i64 finalCount, std::optional<i64> initialCount = std::nullopt, TDuration timeout = FUTURE_WAIT_TIMEOUT) const override {
407-
auto counter = GetServiceCounters(GetRuntime()->GetAppData().Counters, "kqp")
408-
->GetSubgroup("subsystem", "workload_manager")
409-
->GetCounter("ActivePoolHandlers");
407+
auto counter = GetWorkloadManagerCounters(0)->GetCounter("ActivePoolHandlers");
410408

411409
if (initialCount) {
412410
UNIT_ASSERT_VALUES_EQUAL_C(counter->Val(), *initialCount, "Unexpected pool handlers count");
@@ -429,6 +427,18 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
429427
Sleep(TDuration::Seconds(1));
430428
}
431429

430+
void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const override {
431+
for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount_; ++nodeIndex) {
432+
auto subgroup = GetWorkloadManagerCounters(nodeIndex)
433+
->GetSubgroup("pool", CanonizePath(TStringBuilder() << Settings_.DomainName_ << "/" << (poolId ? poolId : Settings_.PoolId_)));
434+
435+
CheckCommonCounters(subgroup);
436+
if (checkTableCounters) {
437+
CheckTableCounters(subgroup);
438+
}
439+
}
440+
}
441+
432442
TTestActorRuntime* GetRuntime() const override {
433443
return Server_->GetRuntime();
434444
}
@@ -460,6 +470,45 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
460470
return event;
461471
}
462472

473+
NMonitoring::TDynamicCounterPtr GetWorkloadManagerCounters(ui32 nodeIndex) const {
474+
return GetServiceCounters(GetRuntime()->GetAppData(nodeIndex).Counters, "kqp")
475+
->GetSubgroup("subsystem", "workload_manager");
476+
}
477+
478+
static void CheckCommonCounters(NMonitoring::TDynamicCounterPtr subgroup) {
479+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalInFly", false)->Val(), 0);
480+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalDelayedRequests", false)->Val(), 0);
481+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOverloaded", true)->Val(), 0);
482+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueError", true)->Val(), 0);
483+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("CleanupError", true)->Val(), 0);
484+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("Cancelled", true)->Val(), 0);
485+
486+
UNIT_ASSERT_GE(subgroup->GetCounter("ContinueOk", true)->Val(), 1);
487+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOk", true)->Val(), subgroup->GetCounter("CleanupOk", true)->Val());
488+
}
489+
490+
static void CheckTableCounters(NMonitoring::TDynamicCounterPtr subgroup) {
491+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("PendingRequestsCount", false)->Val(), 0);
492+
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("FinishingRequestsCount", false)->Val(), 0);
493+
494+
const std::vector<std::pair<TString, bool>> tableQueries = {
495+
{"TCleanupTablesQuery", false},
496+
{"TRefreshPoolStateQuery", true},
497+
{"TDelayRequestQuery", true},
498+
{"TStartFirstDelayedRequestQuery", true},
499+
{"TStartRequestQuery", false},
500+
{"TCleanupRequestsQuery", true},
501+
};
502+
for (const auto& [operation, runExpected] : tableQueries) {
503+
auto operationSubgroup = subgroup->GetSubgroup("operation", operation);
504+
505+
UNIT_ASSERT_VALUES_EQUAL_C(operationSubgroup->GetCounter("FinishError", true)->Val(), 0, TStringBuilder() << "Unexpected vaule for operation " << operation);
506+
if (runExpected) {
507+
UNIT_ASSERT_GE_C(operationSubgroup->GetCounter("FinishOk", true)->Val(), 1, TStringBuilder() << "Unexpected vaule for operation " << operation);
508+
}
509+
}
510+
}
511+
463512
private:
464513
const TYdbSetupSettings Settings_;
465514

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class IYdbSetup : public TThrRefBase {
100100
virtual void WaitPoolState(const TPoolStateDescription& state, const TString& poolId = "") const = 0;
101101
virtual void WaitPoolHandlersCount(i64 finalCount, std::optional<i64> initialCount = std::nullopt, TDuration timeout = FUTURE_WAIT_TIMEOUT) const = 0;
102102
virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0;
103+
virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0;
103104

104105
virtual TTestActorRuntime* GetRuntime() const = 0;
105106
virtual const TYdbSetupSettings& GetSettings() const = 0;

ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,39 @@
55

66
namespace NKikimr::NKqp {
77

8+
namespace {
9+
810
using namespace NWorkload;
911
using namespace NYdb;
1012

1113

14+
void StartTestConcurrentQueryLimit(const ui64 activeCountLimit, const ui64 queueSize, const ui64 nodeCount = 1) {
15+
auto ydb = TYdbSetupSettings()
16+
.NodeCount(nodeCount)
17+
.ConcurrentQueryLimit(activeCountLimit)
18+
.QueueSize(queueSize)
19+
.QueryCancelAfter(FUTURE_WAIT_TIMEOUT * queueSize)
20+
.Create();
21+
22+
auto settings = TQueryRunnerSettings()
23+
.InFlightCoordinatorActorId(ydb->CreateInFlightCoordinator(queueSize, activeCountLimit))
24+
.HangUpDuringExecution(true);
25+
26+
// Initialize queue
27+
std::vector<TQueryRunnerResultAsync> asyncResults;
28+
for (size_t i = 0; i < queueSize; ++i) {
29+
asyncResults.emplace_back(ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings.NodeIndex(i % nodeCount)));
30+
}
31+
32+
for (const auto& asyncResult : asyncResults) {
33+
TSampleQueries::TSelect42::CheckResult(asyncResult.GetResult());
34+
}
35+
36+
ydb->ValidateWorkloadServiceCounters();
37+
}
38+
39+
} // anonymous namespace
40+
1241
Y_UNIT_TEST_SUITE(KqpWorkloadService) {
1342
Y_UNIT_TEST(WorkloadServiceDisabledByFeatureFlag) {
1443
auto ydb = TYdbSetupSettings()
@@ -123,10 +152,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
123152
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());
124153

125154
// Check that queue is free
126-
auto firstRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query);
127-
auto secondRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query);
128-
TSampleQueries::TSelect42::CheckResult(firstRequest.GetResult());
129-
TSampleQueries::TSelect42::CheckResult(secondRequest.GetResult());
155+
UNIT_ASSERT_VALUES_EQUAL(ydb->GetPoolDescription().AmountRequests(), 0);
130156
}
131157

132158
Y_UNIT_TEST(TestStartQueryAfterCancel) {
@@ -145,28 +171,12 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
145171
TSampleQueries::CheckCancelled(hangingRequest.GetResult());
146172
}
147173

148-
Y_UNIT_TEST(TestConcurrentQueryLimit) {
149-
const ui64 activeCountLimit = 5;
150-
const ui64 queueSize = 50;
151-
auto ydb = TYdbSetupSettings()
152-
.ConcurrentQueryLimit(activeCountLimit)
153-
.QueueSize(queueSize)
154-
.QueryCancelAfter(FUTURE_WAIT_TIMEOUT * queueSize)
155-
.Create();
156-
157-
auto settings = TQueryRunnerSettings()
158-
.InFlightCoordinatorActorId(ydb->CreateInFlightCoordinator(queueSize, activeCountLimit))
159-
.HangUpDuringExecution(true);
160-
161-
// Initialize queue
162-
std::vector<TQueryRunnerResultAsync> asyncResults;
163-
for (size_t i = 0; i < queueSize; ++i) {
164-
asyncResults.emplace_back(ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings));
165-
}
174+
Y_UNIT_TEST(TestLargeConcurrentQueryLimit) {
175+
StartTestConcurrentQueryLimit(5, 100);
176+
}
166177

167-
for (const auto& asyncResult : asyncResults) {
168-
TSampleQueries::TSelect42::CheckResult(asyncResult.GetResult());
169-
}
178+
Y_UNIT_TEST(TestLessConcurrentQueryLimit) {
179+
StartTestConcurrentQueryLimit(1, 100);
170180
}
171181

172182
Y_UNIT_TEST(TestZeroConcurrentQueryLimit) {
@@ -255,30 +265,12 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceDistributed) {
255265
TSampleQueries::TSelect42::CheckResult(request.GetResult(TDuration::Seconds(50)));
256266
}
257267

258-
Y_UNIT_TEST(TestDistributedConcurrentQueryLimit) {
259-
const ui64 nodeCount = 3;
260-
const ui64 activeCountLimit = 5;
261-
const ui64 queueSize = 50;
262-
auto ydb = TYdbSetupSettings()
263-
.NodeCount(nodeCount)
264-
.ConcurrentQueryLimit(activeCountLimit)
265-
.QueueSize(queueSize)
266-
.QueryCancelAfter(FUTURE_WAIT_TIMEOUT * queueSize)
267-
.Create();
268-
269-
auto settings = TQueryRunnerSettings()
270-
.InFlightCoordinatorActorId(ydb->CreateInFlightCoordinator(queueSize, activeCountLimit))
271-
.HangUpDuringExecution(true);
272-
273-
// Initialize queue
274-
std::vector<TQueryRunnerResultAsync> asyncResults;
275-
for (size_t i = 0; i < queueSize; ++i) {
276-
asyncResults.emplace_back(ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings.NodeIndex(i % nodeCount)));
277-
}
268+
Y_UNIT_TEST(TestDistributedLargeConcurrentQueryLimit) {
269+
StartTestConcurrentQueryLimit(5, 100, 3);
270+
}
278271

279-
for (const auto& asyncResult : asyncResults) {
280-
TSampleQueries::TSelect42::CheckResult(asyncResult.GetResult());
281-
}
272+
Y_UNIT_TEST(TestDistributedLessConcurrentQueryLimit) {
273+
StartTestConcurrentQueryLimit(1, 100, 5);
282274
}
283275
}
284276

0 commit comments

Comments
 (0)