Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39675: [C++][FlightRPC] Expose additional RPC call info to middleware #39678

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/server_middleware.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class ARROW_FLIGHT_EXPORT ServerMiddleware {

/// \brief A callback after the call has completed.
virtual void CallCompleted(const Status& status) = 0;

/// \brief Callbacks before forwarding a request to its transport-specific server
virtual void HandlingRequest(FlightMethod method, const Criteria& criteria) {}
virtual void HandlingRequest(FlightMethod method, const FlightDescriptor& descriptor) {}
virtual void HandlingRequest(FlightMethod method, const Ticket& ticket) {}
virtual void HandlingRequest(FlightMethod method, const Action& action) {}
};

/// \brief A factory for new middleware instances.
Expand Down
31 changes: 20 additions & 11 deletions cpp/src/arrow/flight/server_tracing_middleware.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ namespace otel = opentelemetry;
// TODO: Remove once we drop support for opentelemetry-cpp < 1.8.0
// They switched from ALL_CAPS to kConstantFormat in 1.8.0. But we can't check
// the minor version until they expose that. So, for now, we vendor these constants.
//
// Attributes from trace semantic conventions spec
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md
namespace SemanticConventions {
static constexpr const char* kRpcGrpcStatusCode = "rpc.grpc.status_code";
static constexpr const char* kRpcSystem = "rpc.system";
static constexpr const char* kRpcService = "rpc.service";
static constexpr const char* kRpcMethod = "rpc.method";
static constexpr const char* kCodeFunction = "code.function";
namespace RpcSystemValues {
static constexpr const char* kGrpc = "grpc";
}
Expand Down Expand Up @@ -115,6 +119,10 @@ class TracingServerMiddleware::Impl {
return result;
}

void SetAction(const Action& action) {
span_->SetAttribute(SemanticConventions::kCodeFunction, action.type);
}

private:
otel::trace::Scope scope_;
otel::nostd::shared_ptr<otel::trace::Span> span_;
Expand All @@ -139,17 +147,14 @@ class TracingServerMiddlewareFactory : public ServerMiddlewareFactory {

auto* tracer = arrow::internal::tracing::GetTracer();
auto method_name = ToString(info.method);
auto span = tracer->StartSpan(
method_name,
{
// Attributes from trace semantic conventions spec
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/semantic_conventions/trace/rpc.yaml
{SemanticConventions::kRpcSystem,
SemanticConventions::RpcSystemValues::kGrpc},
{SemanticConventions::kRpcService, kServiceName},
{SemanticConventions::kRpcMethod, method_name},
},
options);
auto span = tracer->StartSpan(method_name,
{
{SemanticConventions::kRpcSystem,
SemanticConventions::RpcSystemValues::kGrpc},
{SemanticConventions::kRpcService, kServiceName},
{SemanticConventions::kRpcMethod, method_name},
},
options);
auto scope = tracer->WithActiveSpan(span);

std::unique_ptr<TracingServerMiddleware::Impl> impl(
Expand All @@ -164,6 +169,7 @@ class TracingServerMiddleware::Impl {
public:
void CallCompleted(const Status&) {}
std::vector<TraceKey> GetTraceContext() const { return {}; }
void SetAction(const Action&) {}
};
class TracingServerMiddlewareFactory : public ServerMiddlewareFactory {
public:
Expand All @@ -190,6 +196,9 @@ std::vector<TracingServerMiddleware::TraceKey> TracingServerMiddleware::GetTrace
const {
return impl_->GetTraceContext();
}
void TracingServerMiddleware::HandlingRequest(FlightMethod, const Action& action) {
impl_->SetAction(action);
}
constexpr char const TracingServerMiddleware::kMiddlewareName[];

std::shared_ptr<ServerMiddlewareFactory> MakeTracingServerMiddlewareFactory() {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/server_tracing_middleware.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class ARROW_FLIGHT_EXPORT TracingServerMiddleware : public ServerMiddleware {
void SendingHeaders(AddCallHeaders*) override;
void CallCompleted(const Status&) override;

void HandlingRequest(FlightMethod, const Action&) override;

struct TraceKey {
std::string key;
std::string value;
Expand Down
61 changes: 42 additions & 19 deletions cpp/src/arrow/flight/transport/grpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ class GrpcServerCallContext : public ServerCallContext {
context_->AddTrailingMetadata(key, value);
}

template <typename T>
void ReportRequest(FlightMethod method, const T& message) const {
for (const auto& instance : middleware_) {
instance->HandlingRequest(method, message);
}
}

ServerMiddleware* GetMiddleware(const std::string& key) const override {
const auto& instance = middleware_map_.find(key);
if (instance == middleware_map_.end()) {
Expand Down Expand Up @@ -345,7 +352,8 @@ class GrpcServiceHandler final : public FlightService::Service {

::grpc::Status Handshake(
ServerContext* context,
::grpc::ServerReaderWriter<pb::HandshakeResponse, pb::HandshakeRequest>* stream) {
::grpc::ServerReaderWriter<pb::HandshakeResponse, pb::HandshakeRequest>* stream)
override {
GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(
MakeCallContext(FlightMethod::Handshake, context, flight_context));
Expand All @@ -364,10 +372,11 @@ class GrpcServiceHandler final : public FlightService::Service {
}

::grpc::Status ListFlights(ServerContext* context, const pb::Criteria* request,
ServerWriter<pb::FlightInfo>* writer) {
ServerWriter<pb::FlightInfo>* writer) override {
constexpr auto method = FlightMethod::ListFlights;

GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(
CheckAuth(FlightMethod::ListFlights, context, flight_context));
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(method, context, flight_context));

// Retrieve the listing from the implementation
std::unique_ptr<FlightListing> listing;
Expand All @@ -376,6 +385,7 @@ class GrpcServiceHandler final : public FlightService::Service {
if (request) {
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &criteria));
}
flight_context.ReportRequest(method, criteria);
SERVICE_RETURN_NOT_OK(
flight_context, impl_->base()->ListFlights(flight_context, &criteria, &listing));
if (!listing) {
Expand All @@ -388,15 +398,17 @@ class GrpcServiceHandler final : public FlightService::Service {

::grpc::Status GetFlightInfo(ServerContext* context,
const pb::FlightDescriptor* request,
pb::FlightInfo* response) {
pb::FlightInfo* response) override {
constexpr auto method = FlightMethod::GetFlightInfo;

GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(
CheckAuth(FlightMethod::GetFlightInfo, context, flight_context));
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(method, context, flight_context));

CHECK_ARG_NOT_NULL(flight_context, request, "FlightDescriptor cannot be null");

FlightDescriptor descr;
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &descr));
flight_context.ReportRequest(method, descr);

std::unique_ptr<FlightInfo> info;
SERVICE_RETURN_NOT_OK(flight_context,
Expand All @@ -414,15 +426,17 @@ class GrpcServiceHandler final : public FlightService::Service {

::grpc::Status PollFlightInfo(ServerContext* context,
const pb::FlightDescriptor* request,
pb::PollInfo* response) {
pb::PollInfo* response) override {
constexpr auto method = FlightMethod::PollFlightInfo;

GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(
CheckAuth(FlightMethod::PollFlightInfo, context, flight_context));
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(method, context, flight_context));

CHECK_ARG_NOT_NULL(flight_context, request, "FlightDescriptor cannot be null");

FlightDescriptor descr;
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &descr));
flight_context.ReportRequest(method, descr);

std::unique_ptr<PollInfo> info;
SERVICE_RETURN_NOT_OK(flight_context,
Expand All @@ -439,14 +453,17 @@ class GrpcServiceHandler final : public FlightService::Service {
}

::grpc::Status GetSchema(ServerContext* context, const pb::FlightDescriptor* request,
pb::SchemaResult* response) {
pb::SchemaResult* response) override {
constexpr auto method = FlightMethod::GetSchema;

GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::GetSchema, context, flight_context));
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(method, context, flight_context));

CHECK_ARG_NOT_NULL(flight_context, request, "FlightDescriptor cannot be null");

FlightDescriptor descr;
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &descr));
flight_context.ReportRequest(method, descr);

std::unique_ptr<SchemaResult> result;
SERVICE_RETURN_NOT_OK(flight_context,
Expand All @@ -463,14 +480,17 @@ class GrpcServiceHandler final : public FlightService::Service {
}

::grpc::Status DoGet(ServerContext* context, const pb::Ticket* request,
ServerWriter<pb::FlightData>* writer) {
ServerWriter<pb::FlightData>* writer) override {
constexpr auto method = FlightMethod::DoGet;

GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::DoGet, context, flight_context));
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(method, context, flight_context));

CHECK_ARG_NOT_NULL(flight_context, request, "ticket cannot be null");

Ticket ticket;
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &ticket));
flight_context.ReportRequest(method, ticket);

GetDataStream stream(writer);
RETURN_WITH_MIDDLEWARE(flight_context,
Expand All @@ -479,7 +499,7 @@ class GrpcServiceHandler final : public FlightService::Service {

::grpc::Status DoPut(
ServerContext* context,
::grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader) {
::grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader) override {
GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::DoPut, context, flight_context));

Expand All @@ -489,7 +509,7 @@ class GrpcServiceHandler final : public FlightService::Service {

::grpc::Status DoExchange(
ServerContext* context,
::grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream) {
::grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream) override {
GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::DoExchange, context, flight_context));

Expand All @@ -499,7 +519,7 @@ class GrpcServiceHandler final : public FlightService::Service {
}

::grpc::Status ListActions(ServerContext* context, const pb::Empty* request,
ServerWriter<pb::ActionType>* writer) {
ServerWriter<pb::ActionType>* writer) override {
GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(
CheckAuth(FlightMethod::ListActions, context, flight_context));
Expand All @@ -511,12 +531,15 @@ class GrpcServiceHandler final : public FlightService::Service {
}

::grpc::Status DoAction(ServerContext* context, const pb::Action* request,
ServerWriter<pb::Result>* writer) {
ServerWriter<pb::Result>* writer) override {
constexpr auto method = FlightMethod::DoAction;

GrpcServerCallContext flight_context(context);
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::DoAction, context, flight_context));
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(method, context, flight_context));
CHECK_ARG_NOT_NULL(flight_context, request, "Action cannot be null");
Action action;
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &action));
flight_context.ReportRequest(method, action);

std::unique_ptr<ResultStream> results;
SERVICE_RETURN_NOT_OK(flight_context,
Expand Down
Loading