Skip to content

Mark NYT::TFuture as [[nodiscard]] #919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ydb/library/yql/providers/dq/actors/yt/lock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ struct TLockRequest: public TActor<TLockRequest> {
auto* actorSystem = ctx.ExecutorThread.ActorSystem;
auto selfId = SelfId();
try {
Transaction->LockNode("#" + ToString(result.ValueOrThrow()), NYT::NCypressClient::ELockMode::Exclusive, options).As<void>()
YT_UNUSED_FUTURE(Transaction->LockNode("#" + ToString(result.ValueOrThrow()), NYT::NCypressClient::ELockMode::Exclusive, options).As<void>()
.Apply(BIND([actorSystem, selfId](const NYT::TErrorOr<void>& result) {
actorSystem->Send(selfId, new TEvSetNodeResponse(0, result));
}));
})));
} catch (...) {
Finish(ctx);
}
Expand All @@ -108,7 +108,7 @@ struct TLockRequest: public TActor<TLockRequest> {
}

void PassAway() override {
Transaction->Abort();
YT_UNUSED_FUTURE(Transaction->Abort());
Transaction.Reset();
Send(LockActorId, new TEvTick());
IActor::PassAway();
Expand Down Expand Up @@ -248,13 +248,13 @@ struct TLockRequest: public TActor<TLockRequest> {
try {
auto* actorSystem = ctx.ExecutorThread.ActorSystem;
auto selfId = SelfId();
Transaction->CreateNode(
YT_UNUSED_FUTURE(Transaction->CreateNode(
lockNode,
NYT::NObjectClient::EObjectType::StringNode,
NYT::NApi::TCreateNodeOptions())
.Apply(BIND([actorSystem, selfId](const NYT::TErrorOr<NYT::NCypressClient::TNodeId>& result) {
actorSystem->Send(selfId, new TEvCreateNodeResponse(0, result));
}));
})));
} catch (...) {
Finish(ctx);
}
Expand Down
56 changes: 28 additions & 28 deletions ydb/library/yql/providers/dq/actors/yt/yt_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ namespace NYql {
req->Digest = digest;
}

Client->GetNode(nodePath + "/@md5")
YT_UNUSED_FUTURE(Client->GetNode(nodePath + "/@md5")
.Apply(BIND([request, attributes, digest](const TErrorOr<NYT::NYson::TYsonString>& err) mutable {
auto req = request.Lock();
if (!req) {
Expand All @@ -302,10 +302,10 @@ namespace NYql {
if (err.IsOK() && digest == NYTree::ConvertTo<TString>(err.Value())) {
YQL_CLOG(INFO, ProviderDq) << "File already uploaded";
try {
req->Client->SetNode(req->NodePath + "/@yql_last_update",
YT_UNUSED_FUTURE(req->Client->SetNode(req->NodePath + "/@yql_last_update",
NYT::NYson::TYsonString(
NYT::NodeToYsonString(NYT::TNode(ToString(TInstant::Now()))
)));
))));
} catch (...) { }
return VoidFuture;
} else if (err.IsOK() || err.FindMatching(NYT::NYTree::EErrorCode::ResolveError)) {
Expand Down Expand Up @@ -363,7 +363,7 @@ namespace NYql {
if (auto req = request.Lock()) {
req->Complete(new TEvWriteFileResponse(requestId, err));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvWriteFileResponse(requestId, ex));
Expand All @@ -385,7 +385,7 @@ namespace NYql {

auto nodePath = remotePath.GetPath();

Client->GetNode(nodePath + "/@md5")
YT_UNUSED_FUTURE(Client->GetNode(nodePath + "/@md5")
.Apply(BIND([request, nodePath, readerOptions](const TErrorOr<NYT::NYson::TYsonString>& err) mutable {
auto req = request.Lock();
if (!req) {
Expand All @@ -411,7 +411,7 @@ namespace NYql {
if (auto req = request.Lock()) {
req->Complete(new TEvReadFileResponse(requestId, err));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvReadFileResponse(requestId, ex));
Expand Down Expand Up @@ -448,14 +448,14 @@ namespace NYql {
auto operationId = std::get<0>(*ev->Get());
auto options = std::get<1>(*ev->Get());

Client->GetOperation(operationId, options).Apply(BIND([=](const TErrorOr<TOperation>& result) {
YT_UNUSED_FUTURE(Client->GetOperation(operationId, options).Apply(BIND([=](const TErrorOr<TOperation>& result) {
return NYT::NYson::ConvertToYsonString(result.ValueOrThrow()).ToString();
}))
.Apply(BIND([=](const TErrorOr<TString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetOperationResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetOperationResponse(requestId, ex));
Expand All @@ -470,11 +470,11 @@ namespace NYql {
try {
auto options = std::get<0>(*ev->Get());

Client->ListOperations(options).Apply(BIND([=](const TErrorOr<NYT::NApi::TListOperationsResult>& result) {
YT_UNUSED_FUTURE(Client->ListOperations(options).Apply(BIND([=](const TErrorOr<NYT::NApi::TListOperationsResult>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvListOperationsResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvListOperationsResponse(requestId, ex));
Expand All @@ -491,14 +491,14 @@ namespace NYql {
auto jobId = std::get<1>(*ev->Get());
auto options = std::get<2>(*ev->Get());

Client->GetJob(operationId, jobId, options).Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
YT_UNUSED_FUTURE(Client->GetJob(operationId, jobId, options).Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
return result.ValueOrThrow().ToString();
}))
.Apply(BIND([=](const TErrorOr<TString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetJobResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetJobResponse(requestId, ex));
Expand All @@ -513,15 +513,15 @@ namespace NYql {
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

try {
Client->ListNode(path, options)
YT_UNUSED_FUTURE(Client->ListNode(path, options)
.Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
return result.ValueOrThrow().ToString();
}))
.Apply(BIND([=](const TErrorOr<TString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvListNodeResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvListNodeResponse(requestId, ex));
Expand All @@ -536,12 +536,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->SetNode(path, value, options)
YT_UNUSED_FUTURE(Client->SetNode(path, value, options)
.Apply(BIND([=](const TErrorOr<void>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvSetNodeResponse(requestId, result));
}
}));
})));
}

void OnGetNode(TEvGetNode::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -550,12 +550,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->GetNode(path, options)
YT_UNUSED_FUTURE(Client->GetNode(path, options)
.Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetNodeResponse(requestId, result));
}
}));
})));
}

void OnRemoveNode(TEvRemoveNode::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -564,12 +564,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->RemoveNode(path, options)
YT_UNUSED_FUTURE(Client->RemoveNode(path, options)
.Apply(BIND([=](const TErrorOr<void>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvRemoveNodeResponse(requestId, result));
}
}));
})));
}

void OnCreateNode(TEvCreateNode::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -579,12 +579,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->CreateNode(path, type, options)
YT_UNUSED_FUTURE(Client->CreateNode(path, type, options)
.Apply(BIND([=](const TErrorOr<NYT::NCypressClient::TNodeId>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvCreateNodeResponse(requestId, result));
}
}));
})));
}

void OnStartTransaction(TEvStartTransaction::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -593,12 +593,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->StartTransaction(type, options)
YT_UNUSED_FUTURE(Client->StartTransaction(type, options)
.Apply(BIND([=](const TErrorOr<ITransactionPtr>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvStartTransactionResponse(requestId, result));
}
}));
})));
}

void OnPrintJobStderr(TEvPrintJobStderr::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -607,23 +607,23 @@ namespace NYql {

YQL_CLOG(DEBUG, ProviderDq) << "Printing stderr of operation " << ToString(operationId);

Client->ListJobs(operationId)
YT_UNUSED_FUTURE(Client->ListJobs(operationId)
.Apply(BIND([operationId, client = MakeWeak(Client)](const TListJobsResult& result) {
if (auto cli = client.Lock()) {
for (const auto& job : result.Jobs) {
YQL_CLOG(DEBUG, ProviderDq) << "Printing stderr (" << ToString(operationId) << "," << ToString(job.Id) << ")";

cli->GetJobStderr(operationId, job.Id)
YT_UNUSED_FUTURE(cli->GetJobStderr(operationId, job.Id)
.Apply(BIND([jobId = job.Id, operationId](const TSharedRef& data) {
YQL_CLOG(DEBUG, ProviderDq)
<< "Stderr ("
<< ToString(operationId) << ","
<< ToString(jobId) << ")"
<< TString(data.Begin(), data.Size());
}));
})));
}
}
}));
})));
}

IClientPtr Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ TVector<TMaybe<NYT::TNode>> InferSchemaFromTablesContents(const TString& cluster
inferers.reserve(requests.size());

std::function<void(size_t)> runRead = [&](size_t i) {
inputs[i]->Read().ApplyUnique(BIND([&inferers, &promises, &runRead, i = i](NYT::TErrorOr<NYT::TSharedRef>&& res){
YT_UNUSED_FUTURE(inputs[i]->Read().ApplyUnique(BIND([&inferers, &promises, &runRead, i = i](NYT::TErrorOr<NYT::TSharedRef>&& res){
if (res.IsOK() && !res.Value()) {
// EOS
promises[i].Set();
Expand Down Expand Up @@ -75,7 +75,7 @@ TVector<TMaybe<NYT::TNode>> InferSchemaFromTablesContents(const TString& cluster
promises[i].Set(e);
}
runRead(i);
}));
})));
};

futures.reserve(requests.size());
Expand Down Expand Up @@ -107,13 +107,13 @@ TVector<TMaybe<NYT::TNode>> InferSchemaFromTablesContents(const TString& cluster
request->set_format("<format=text>yson");
promises.push_back(NYT::NewPromise<void>());
futures.push_back(promises.back().ToFuture());
CreateRpcClientInputStream(std::move(request)).ApplyUnique(BIND([&runRead, &inputs, i](NYT::NConcurrency::IAsyncZeroCopyInputStreamPtr&& stream) {
YT_UNUSED_FUTURE(CreateRpcClientInputStream(std::move(request)).ApplyUnique(BIND([&runRead, &inputs, i](NYT::NConcurrency::IAsyncZeroCopyInputStreamPtr&& stream) {
// first packet contains meta, skip it
return stream->Read().ApplyUnique(BIND([&runRead, stream = std::move(stream), i, &inputs](NYT::TSharedRef&&) {
inputs[i] = std::move(stream);
runRead(i);
}));
}));
})));
++i;
}
YQL_ENSURE(NYT::NConcurrency::WaitFor(AllSucceeded(futures)).IsOK(), "Excepted all promises to be resolved in InferSchemaFromTablesContents");
Expand Down