Skip to content

Commit 29e13aa

Browse files
authored
YQ-2704 Use ActorSystem() instead of AsActorContext() (#1892)
1 parent 20d160e commit 29e13aa

File tree

2 files changed

+48
-48
lines changed

2 files changed

+48
-48
lines changed

ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::
141141
.Apply([coordinatorId = event->CoordinatorId,
142142
cookie = ev->Cookie,
143143
sender = ev->Sender,
144-
context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
144+
actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
145145
auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>();
146146
response->Issues = issuesFuture.GetValue();
147147
if (response->Issues) {
148-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
148+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
149149
} else {
150-
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] Graph registered")
150+
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] Graph registered")
151151
}
152-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
153-
context.Send(sender, response.release(), 0, cookie);
152+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
153+
actorSystem->Send(sender, response.release(), 0, cookie);
154154
});
155155
}
156156

@@ -164,14 +164,14 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
164164
cookie = ev->Cookie,
165165
sender = ev->Sender,
166166
totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits().GetMaxGraphCheckpointsSizeBytes(),
167-
context = TActivationContext::AsActorContext()]
167+
actorSystem = TActivationContext::ActorSystem()]
168168
(const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) {
169169
auto result = resultFuture.GetValue();
170170
auto issues = result.second;
171171

172172
if (issues) {
173-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
174-
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
173+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
174+
actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
175175
return false;
176176
}
177177

@@ -181,10 +181,10 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
181181
TStringStream ss;
182182
ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
183183
auto message = ss.Str();
184-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, message)
184+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, message)
185185
issues.AddIssue(message);
186-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
187-
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
186+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
187+
actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
188188
return false;
189189
}
190190
return true;
@@ -209,7 +209,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
209209
coordinatorId = event->CoordinatorId,
210210
cookie = ev->Cookie,
211211
sender = ev->Sender,
212-
context = TActivationContext::AsActorContext()]
212+
actorSystem = TActivationContext::ActorSystem()]
213213
(const NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>& resultFuture) {
214214
if (!resultFuture.Initialized()) { // didn't pass the size limit check
215215
return;
@@ -218,12 +218,12 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
218218
auto issues = result.second;
219219
auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), result.first);
220220
if (response->Issues) {
221-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
221+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
222222
} else {
223-
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
223+
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
224224
}
225-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
226-
context.Send(sender, response.release(), 0, cookie);
225+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
226+
actorSystem->Send(sender, response.release(), 0, cookie);
227227
});
228228
}
229229

@@ -235,17 +235,17 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitSt
235235
coordinatorId = event->CoordinatorId,
236236
cookie = ev->Cookie,
237237
sender = ev->Sender,
238-
context = TActivationContext::AsActorContext()]
238+
actorSystem = TActivationContext::ActorSystem()]
239239
(const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
240240
auto issues = issuesFuture.GetValue();
241241
auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues));
242242
if (response->Issues) {
243-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
243+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
244244
} else {
245-
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
245+
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
246246
}
247-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
248-
context.Send(sender, response.release(), 0, cookie);
247+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
248+
actorSystem->Send(sender, response.release(), 0, cookie);
249249
});
250250
}
251251

@@ -259,22 +259,22 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T
259259
sender = ev->Sender,
260260
gcEnabled = Config.GetCheckpointGarbageConfig().GetEnabled(),
261261
actorGC = ActorGC,
262-
context = TActivationContext::AsActorContext()]
262+
actorSystem = TActivationContext::ActorSystem()]
263263
(const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
264264
auto issues = issuesFuture.GetValue();
265265
auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues));
266266
if (response->Issues) {
267-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
267+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
268268
} else {
269-
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
269+
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
270270
if (gcEnabled) {
271271
auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId);
272-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
273-
context.Send(actorGC, request.release(), 0);
272+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
273+
actorSystem->Send(actorGC, request.release(), 0);
274274
}
275275
}
276-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
277-
context.Send(sender, response.release(), 0, cookie);
276+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
277+
actorSystem->Send(sender, response.release(), 0, cookie);
278278
});
279279
}
280280

@@ -286,16 +286,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr
286286
coordinatorId = event->CoordinatorId,
287287
cookie = ev->Cookie,
288288
sender = ev->Sender,
289-
context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
289+
actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
290290
auto issues = issuesFuture.GetValue();
291291
auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues));
292292
if (response->Issues) {
293-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
293+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
294294
} else {
295-
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
295+
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
296296
}
297-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
298-
context.Send(sender, response.release(), 0, cookie);
297+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
298+
actorSystem->Send(sender, response.release(), 0, cookie);
299299
});
300300
}
301301

@@ -306,14 +306,14 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataReques
306306
.Apply([graphId = event->GraphId,
307307
cookie = ev->Cookie,
308308
sender = ev->Sender,
309-
context = TActivationContext::AsActorContext()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
309+
actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
310310
auto result = futureResult.GetValue();
311311
auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second);
312312
if (response->Issues) {
313-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
313+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
314314
}
315-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
316-
context.Send(sender, response.release(), 0, cookie);
315+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
316+
actorSystem->Send(sender, response.release(), 0, cookie);
317317
});
318318
}
319319

@@ -343,7 +343,7 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
343343
cookie = ev->Cookie,
344344
sender = ev->Sender,
345345
stateSize = stateSize,
346-
context = TActivationContext::AsActorContext()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
346+
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
347347
const auto& issues = futureResult.GetValue();
348348
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
349349
response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
@@ -352,13 +352,13 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
352352
response->Record.SetTaskId(taskId);
353353

354354
if (issues) {
355-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
355+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
356356
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
357357
} else {
358358
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
359359
}
360-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
361-
context.Send(sender, response.release(), 0, cookie);
360+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
361+
actorSystem->Send(sender, response.release(), 0, cookie);
362362
});
363363
}
364364

@@ -373,16 +373,16 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
373373
taskIds = event->TaskIds,
374374
cookie = ev->Cookie,
375375
sender = ev->Sender,
376-
context = TActivationContext::AsActorContext()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
376+
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
377377
auto result = resultFuture.GetValue();
378378

379379
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvGetTaskStateResult>(checkpointId, result.second, generation);
380380
std::swap(response->States, result.first);
381381
if (response->Issues) {
382-
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
382+
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
383383
}
384-
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult");
385-
context.Send(sender, response.release(), 0, cookie);
384+
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << checkpointId << "] Send TEvGetTaskStateResult");
385+
actorSystem->Send(sender, response.release(), 0, cookie);
386386
});
387387
}
388388

ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
10561056
auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
10571057
auto future = YdbConnection->TableClient.RetryOperation(
10581058
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result,
1059-
context = NActors::TActivationContext::AsActorContext()](TSession session) {
1059+
actorSystem = NActors::TActivationContext::ActorSystem()](TSession session) {
10601060
NYdb::TParamsBuilder paramsBuilder;
10611061
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
10621062
auto params = paramsBuilder.Build();
@@ -1078,12 +1078,12 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
10781078
params,
10791079
thisPtr->DefaultExecDataQuerySettings())
10801080
.Apply(
1081-
[graphId, result, context](const TFuture<TDataQueryResult>& future) {
1081+
[graphId, result, actorSystem](const TFuture<TDataQueryResult>& future) {
10821082
const auto& queryResult = future.GetValue();
10831083
auto status = TStatus(queryResult);
10841084

10851085
if (!queryResult.IsSuccess()) {
1086-
LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(context, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status;
1086+
LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(*actorSystem, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status;
10871087
}
10881088

10891089
TResultSetParser parser = queryResult.GetResultSetParser(0);

0 commit comments

Comments
 (0)