Skip to content

Commit afef7d1

Browse files
committed
Revert "Disable coordinator if no ingress tasks (#472)"
This reverts commit 7c308ff.
1 parent bcf764f commit afef7d1

File tree

2 files changed

+10
-27
lines changed

2 files changed

+10
-27
lines changed

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,6 @@ void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) {
103103
AllActorsSet.insert(actorId);
104104
}
105105

106-
CC_LOG_D("ActorsToTrigger count: " << ActorsToTrigger.size() << ", ActorsToNotify count: " << ActorsToNotify.size() << ", ActorsToWaitFor count: " << ActorsToWaitFor.size());
107-
108-
if (ActorsToTrigger.empty()) {
109-
CC_LOG_D("No ingress tasks, coordinator was disabled");
110-
return;
111-
}
112106
PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size());
113107

114108
CC_LOG_D("Send TEvRegisterCoordinatorRequest");

ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ enum ETestGraphFlags : ui64 {
1919
SourceWithChannelInOneTask = 2,
2020
};
2121

22-
NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType) {
22+
NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) {
2323

2424
NYql::NDqProto::TReadyState result;
2525

@@ -29,7 +29,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType
2929
ingressOutput->AddChannels();
3030
if (flags & ETestGraphFlags::InputWithSource) {
3131
auto* source = ingress->AddInputs()->MutableSource();
32-
source->SetType(sourceType);
32+
source->SetType("PqSource");
3333
}
3434

3535
auto* map = result.AddTask();
@@ -40,7 +40,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags, const TString& sourceType
4040
mapOutput->AddChannels();
4141
if (flags & ETestGraphFlags::SourceWithChannelInOneTask) {
4242
auto* source = map->AddInputs()->MutableSource();
43-
source->SetType(sourceType);
43+
source->SetType("PqSource");
4444
}
4545

4646
auto* egress = result.AddTask();
@@ -70,9 +70,9 @@ struct TTestBootstrap : public TTestActorRuntime {
7070

7171
::NMonitoring::TDynamicCounterPtr Counters = new ::NMonitoring::TDynamicCounters();
7272

73-
explicit TTestBootstrap(ui64 graphFlags, const TString& sourceType)
73+
explicit TTestBootstrap(ui64 graphFlags = 0)
7474
: TTestActorRuntime(true)
75-
, GraphState(BuildTestGraph(graphFlags, sourceType))
75+
, GraphState(BuildTestGraph(graphFlags))
7676
, CoordinatorId("my-graph-id", 42)
7777
, CheckpointId(CoordinatorId.Generation, 1)
7878
{
@@ -281,8 +281,8 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
281281
class CheckpointsTestHelper : public TTestBootstrap
282282
{
283283
public:
284-
CheckpointsTestHelper(ui64 graphFlags, const TString& sourceType)
285-
: TTestBootstrap(graphFlags, sourceType) {
284+
CheckpointsTestHelper(ui64 graphFlags)
285+
: TTestBootstrap(graphFlags) {
286286
}
287287

288288
void InjectCheckpoint() {
@@ -372,33 +372,22 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) {
372372
};
373373

374374
Y_UNIT_TEST(ShouldTriggerCheckpointWithSource) {
375-
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource");
375+
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource);
376376
test.InjectCheckpoint();
377377
test.AllSavedAndCommited();
378378
}
379379

380380
Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) {
381-
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask, "PqSource");
381+
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource | ETestGraphFlags::SourceWithChannelInOneTask);
382382
test.InjectCheckpoint();
383383
test.AllSavedAndCommited();
384384
}
385385

386386
Y_UNIT_TEST(ShouldAbortPreviousCheckpointsIfNodeStateCantBeSaved) {
387-
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "PqSource");
387+
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource);
388388
test.InjectCheckpoint();
389389
test.SaveFailed();
390390
}
391-
392-
Y_UNIT_TEST(ShouldDoNothingIfNoIngressTasks) {
393-
CheckpointsTestHelper test(ETestGraphFlags::InputWithSource, "S3Source");
394-
bool empty = false;
395-
try {
396-
test.GrabEdgeEvent<TEvCheckpointStorage::TEvRegisterCoordinatorRequest>(test.StorageProxy, TDuration::Seconds(10));
397-
} catch (TEmptyEventQueueException&) {
398-
empty = true;
399-
}
400-
UNIT_ASSERT(empty);
401-
}
402391
}
403392

404393
} // namespace NFq

0 commit comments

Comments
 (0)