Skip to content

Commit 100fc57

Browse files
authored
[refactoring] Do not use "IFacilityProvider" iface for local rpc wrappers. (#9978)
The idea of IFacilityProvider is a provide a way to pass some side channel data and methods from grpc server related code (proxy, server, etc...) to the rpc calls. Using this iface in the local rpc wrappers makes difficult to add some new methods needed for real remote grpc calls.
1 parent 2355ac3 commit 100fc57

File tree

6 files changed

+28
-70
lines changed

6 files changed

+28
-70
lines changed

ydb/core/grpc_services/base/base.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,6 @@ class TGRpcRequestWrapperImpl
13761376
template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived>
13771377
class TGRpcRequestValidationWrapperImpl : public TGRpcRequestWrapperImpl<TRpcId, TReq, TResp, IsOperation, TDerived> {
13781378
public:
1379-
static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg);
13801379

13811380
TGRpcRequestValidationWrapperImpl(NYdbGrpc::IRequestContextBase* ctx)
13821381
: TGRpcRequestWrapperImpl<TRpcId, TReq, TResp, IsOperation, TDerived>(ctx)
@@ -1415,7 +1414,11 @@ class TGrpcRequestCall
14151414
using TRequestIface = typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type;
14161415

14171416
public:
1417+
template<typename TOptionalArg>
1418+
static IActor* CreateRpcActor(TRequestIface* msg, TOptionalArg arg);
1419+
14181420
static IActor* CreateRpcActor(TRequestIface* msg);
1421+
14191422
static constexpr bool IsOp = IsOperation;
14201423

14211424
using TBase = std::conditional_t<TProtoHasValidate<TReq>::Value,
@@ -1485,7 +1488,6 @@ class TGRpcRequestWrapper
14851488
TGRpcRequestWrapper<TRpcId, TReq, TResp, IsOperation, RlMode>>
14861489
{
14871490
public:
1488-
static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg);
14891491
static constexpr bool IsOp = IsOperation;
14901492
static constexpr TRateLimiterMode RateLimitMode = RlMode;
14911493

@@ -1538,7 +1540,6 @@ class TGRpcRequestValidationWrapper
15381540
TGRpcRequestValidationWrapper<TRpcId, TReq, TResp, IsOperation, RlMode>>
15391541
{
15401542
public:
1541-
static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg);
15421543
static constexpr bool IsOp = IsOperation;
15431544
static constexpr TRateLimiterMode RateLimitMode = RlMode;
15441545

ydb/core/grpc_services/local_rpc/local_rpc.cpp

Lines changed: 0 additions & 48 deletions
This file was deleted.

ydb/core/grpc_services/local_rpc/local_rpc.h

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -503,27 +503,29 @@ class TStreamReadProcessor : public NGRpcService::NLocalGrpc::TContextBase {
503503
template <typename TResponsePart>
504504
using TStreamReadProcessorPtr = TIntrusivePtr<TStreamReadProcessor<TResponsePart>>;
505505

506-
using TFacilityProviderPtr = std::shared_ptr<NGRpcService::IFacilityProvider>;
507-
TFacilityProviderPtr CreateFacilityProviderSameMailbox(TActorContext actorContext, ui64 channelBufferSize);
508-
509-
using TRpcActorCreator = std::function<void((std::unique_ptr<NGRpcService::IRequestNoOpCtx> p, const NGRpcService::IFacilityProvider& f))>;
510-
511-
template <typename TRpc>
512-
TStreamReadProcessorPtr<typename TRpc::TResponse> DoLocalRpcStreamSameMailbox(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& requestType, TFacilityProviderPtr facilityProvider, TRpcActorCreator actorCreator, bool internalCall = false) {
506+
template <typename TRpc, typename... TRpcActorArgs>
507+
TStreamReadProcessorPtr<typename TRpc::TResponse> DoLocalRpcStreamSameMailbox(typename TRpc::TRequest&& proto,
508+
const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& requestType,
509+
const TActorContext& ctx, bool internalCall, TRpcActorArgs... args)
510+
{
513511
using TCbWrapper = std::function<void(const typename TRpc::TResponse&)>;
514512
using TLocalRpcStreamCtx = TStreamReadProcessor<typename TRpc::TResponse>;
515513

516514
auto localRpcCtx = std::make_shared<TLocalRpcCtx<TRpc, TCbWrapper>>(std::move(proto), [](const typename TRpc::TResponse&) {}, database, token, requestType, internalCall);
517515
auto localRpcStreamCtx = MakeIntrusive<TLocalRpcStreamCtx>(std::move(localRpcCtx));
518516
auto localRpcRequest = std::make_unique<TRpc>(localRpcStreamCtx.Get(), [](std::unique_ptr<NGRpcService::IRequestNoOpCtx>, const NGRpcService::IFacilityProvider&) {});
519-
actorCreator(std::move(localRpcRequest), *facilityProvider);
517+
auto actor = TRpc::CreateRpcActor(localRpcRequest.release(), args...);
518+
ctx.RegisterWithSameMailbox(actor);
520519

521520
return localRpcStreamCtx;
522521
}
523522

524-
template <typename TRpc>
525-
TStreamReadProcessorPtr<typename TRpc::TResponse> DoLocalRpcStreamSameMailbox(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TFacilityProviderPtr facilityProvider, TRpcActorCreator actorCreator, bool internalCall = false) {
526-
return DoLocalRpcStreamSameMailbox<TRpc>(std::move(proto), database, token, Nothing(), std::move(facilityProvider), std::move(actorCreator), internalCall);
523+
template <typename TRpc, typename... TRpcActorArgs>
524+
TStreamReadProcessorPtr<typename TRpc::TResponse> DoLocalRpcStreamSameMailbox(typename TRpc::TRequest&& proto,
525+
const TString& database, const TMaybe<TString>& token, const TActorContext& ctx, bool internalCall,
526+
TRpcActorArgs... args)
527+
{
528+
return DoLocalRpcStreamSameMailbox<TRpc>(std::move(proto), database, token, Nothing(), ctx, internalCall, args...);
527529
}
528530

529531
} // namespace NRpcService

ydb/core/grpc_services/local_rpc/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
LIBRARY()
22

33
SRCS(
4-
local_rpc.cpp
54
local_rpc.h
65
)
76

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include "service_table.h"
22
#include <ydb/core/grpc_services/base/base.h>
33

4-
#include "rpc_common/rpc_common.h"
54
#include "rpc_kqp_base.h"
65
#include "service_table.h"
76

@@ -493,11 +492,17 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
493492

494493
} // namespace
495494

496-
void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
497-
ui64 rpcBufferSize = f.GetChannelBufferSize();
498-
auto* req = dynamic_cast<TEvStreamExecuteScanQueryRequest*>(p.release());
495+
template<>
496+
template<>
497+
IActor* TEvStreamExecuteScanQueryRequest::CreateRpcActor(IRequestNoOpCtx* msg, ui64 rpcBufferSize) {
498+
auto* req = dynamic_cast<TEvStreamExecuteScanQueryRequest*>(msg);
499499
Y_ABORT_UNLESS(req != nullptr, "Wrong using of TGRpcRequestWrapper");
500-
f.RegisterActor(new TStreamExecuteScanQueryRPC(req, rpcBufferSize));
500+
return new TStreamExecuteScanQueryRPC(req, rpcBufferSize);
501+
}
502+
503+
void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
504+
auto actor = TEvStreamExecuteScanQueryRequest::CreateRpcActor(p.release(), f.GetChannelBufferSize());
505+
f.RegisterActor(actor);
501506
}
502507

503508
} // namespace NGRpcService

ydb/library/query_actor/query_actor.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,7 @@ void TQueryBase::RunStreamQuery(const TString& sql, NYdb::TParamsBuilder* params
276276
*request.mutable_parameters() = NYdb::TProtoAccessor::GetProtoMap(params->Build());
277277
}
278278

279-
auto facilityProvider = CreateFacilityProviderSameMailbox(ActorContext(), channelBufferSize);
280-
StreamQueryProcessor = DoLocalRpcStreamSameMailbox<TExecuteStreamQueryRequest>(std::move(request), Database, Nothing(), facilityProvider, &DoExecuteScanQueryRequest, true);
279+
StreamQueryProcessor = DoLocalRpcStreamSameMailbox<TExecuteStreamQueryRequest>(std::move(request), Database, Nothing(), ActorContext(), true, channelBufferSize);
281280
ReadNextStreamPart();
282281
}
283282

0 commit comments

Comments
 (0)