Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions source/loaders/rpc_loader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,35 @@ if(NOT ConcurrentQueue_SOURCE_DIR)
FetchContent_MakeAvailable(ConcurrentQueue)
endif()

#
# Optional OpenTelemetry tracing
#

option(OPTION_BUILD_LOADERS_RPC_TRACING "Enable OpenTelemetry tracing for RPC loader" OFF)

if(OPTION_BUILD_LOADERS_RPC_TRACING)
message(STATUS "RPC loader: OpenTelemetry tracing ENABLED")

set(OPENTELEMETRY_CPP_SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/opentelemetry-cpp")

FetchContent_Declare(
opentelemetry-cpp
GIT_REPOSITORY https://github.com/open-telemetry/opentelemetry-cpp.git
GIT_TAG v1.19.0
SOURCE_DIR ${OPENTELEMETRY_CPP_SOURCE_DIR}
)

set(WITH_OTLP_GRPC OFF CACHE BOOL "" FORCE)
set(WITH_OTLP_HTTP OFF CACHE BOOL "" FORCE)
set(WITH_EXAMPLES OFF CACHE BOOL "" FORCE)
set(BUILD_TESTING OFF CACHE BOOL "" FORCE)
set(WITH_BENCHMARK OFF CACHE BOOL "" FORCE)

FetchContent_MakeAvailable(opentelemetry-cpp)
else()
message(STATUS "RPC loader: OpenTelemetry tracing DISABLED (no-op)")
endif()


# Copy cURL DLL into project output directory
# TODO: https://cmake.org/cmake/help/latest/command/file.html#get-runtime-dependencies
Expand Down Expand Up @@ -73,11 +102,19 @@ set(source_path "${CMAKE_CURRENT_SOURCE_DIR}/source")
set(headers
${include_path}/rpc_loader.h
${include_path}/rpc_loader_impl.h
${include_path}/rpc_loader_tracing.h
)

if(OPTION_BUILD_LOADERS_RPC_TRACING)
set(tracing_source ${source_path}/rpc_loader_tracing_otel.cpp)
else()
set(tracing_source ${source_path}/rpc_loader_tracing_noop.cpp)
endif()

set(sources
${source_path}/rpc_loader.c
${source_path}/rpc_loader_impl.cpp
${tracing_source}
)

# Group source files
Expand Down Expand Up @@ -162,6 +199,16 @@ target_link_libraries(${target}
INTERFACE
)

if(OPTION_BUILD_LOADERS_RPC_TRACING)
target_link_libraries(${target}
PRIVATE
opentelemetry_trace
opentelemetry_exporter_ostream_span
opentelemetry_resources
opentelemetry_common
)
endif()

#
# Compile definitions
#
Expand Down
55 changes: 55 additions & 0 deletions source/loaders/rpc_loader/include/rpc_loader/rpc_loader_tracing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Loader Library by Parra Studios
* A plugin for loading rpc endpoints at run-time into a process.
*
* Copyright (C) 2016 - 2026 Vicente Eduardo Ferrer Garcia <vic798@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

#ifndef RPC_LOADER_TRACING_H
#define RPC_LOADER_TRACING_H 1

#include <rpc_loader/rpc_loader_api.h>

/* RAII tracing scope for RPC loader calls.
*
* When OPTION_RPC_TRACING is enabled, this emits OpenTelemetry spans.
* When disabled, all methods are compiled as no-ops (zero overhead).
*/
class rpc_trace_scope
{
public:
rpc_trace_scope(const char *function_name, const char *target_url, bool is_async);

void set_error(const char *error_message);
void set_attribute(const char *key, const char *value);

~rpc_trace_scope();

rpc_trace_scope(const rpc_trace_scope &) = delete;
rpc_trace_scope &operator=(const rpc_trace_scope &) = delete;

private:
struct impl;
impl *pimpl;
};

/* One-time global initialization / shutdown for the tracing subsystem.
* Called from rpc_loader_impl_initialize / rpc_loader_impl_destroy.
* No-ops when tracing is disabled at compile time. */
void rpc_tracing_initialize(void);
void rpc_tracing_shutdown(void);

#endif /* RPC_LOADER_TRACING_H */
25 changes: 20 additions & 5 deletions source/loaders/rpc_loader/source/rpc_loader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#include <rpc_loader/rpc_loader_impl.h>
#include <rpc_loader/rpc_loader_tracing.h>

#include <loader/loader.h>
#include <loader/loader_impl.h>
Expand Down Expand Up @@ -169,11 +170,10 @@ function_return function_rpc_interface_invoke(function func, function_impl impl,
{
loader_impl_rpc_function rpc_function = static_cast<loader_impl_rpc_function>(impl);
loader_impl_rpc rpc_impl = rpc_function->rpc_impl;
rpc_trace_scope trace(function_name(func), rpc_function->url.c_str(), false);
value v = metacall_value_create_array(NULL, size);
size_t body_request_size = 0;

(void)func;

if (size > 0)
{
void **v_array = metacall_value_to_array(v);
Expand All @@ -191,10 +191,13 @@ function_return function_rpc_interface_invoke(function func, function_impl impl,

if (body_request_size == 0)
{
trace.set_error("serialization failed");
log_write("metacall", LOG_LEVEL_ERROR, "Invalid serialization of the values to the endpoint %s", rpc_function->url.c_str());
return NULL;
}

trace.set_attribute("rpc.request.body", buffer);

/* Execute a POST to the endpoint */
loader_impl_rpc_write_data_type write_data;

Expand All @@ -210,6 +213,7 @@ function_return function_rpc_interface_invoke(function func, function_impl impl,

if (res != CURLE_OK)
{
trace.set_error(curl_easy_strerror(res));
log_write("metacall", LOG_LEVEL_ERROR, "Could not call to the API endpoint %s [%]", rpc_function->url.c_str(), curl_easy_strerror(res));
return NULL;
}
Expand All @@ -221,6 +225,7 @@ function_return function_rpc_interface_invoke(function func, function_impl impl,

if (result_value == NULL)
{
trace.set_error("deserialization failed");
log_write("metacall", LOG_LEVEL_ERROR, "Could not deserialize the call result from API endpoint %s", rpc_function->url.c_str());
}

Expand Down Expand Up @@ -343,8 +348,7 @@ function_return function_rpc_interface_await(function func, function_impl impl,
{
loader_impl_rpc_function rpc_function = static_cast<loader_impl_rpc_function>(impl);
loader_impl_rpc rpc_impl = rpc_function->rpc_impl;

(void)func;
rpc_trace_scope trace(function_name(func), rpc_function->url.c_str(), true);

/* Serialize arguments */
value v = metacall_value_create_array(NULL, size);
Expand All @@ -367,7 +371,7 @@ function_return function_rpc_interface_await(function func, function_impl impl,

if (body_request_size == 0)
{
#if (!defined(NDEBUG) || defined(DEBUG) || defined(_DEBUG) || defined(__DEBUG) || defined(__DEBUG__))
trace.set_error("serialization failed");
log_write("metacall", LOG_LEVEL_ERROR, "Invalid serialization of the values to the endpoint %s", rpc_function->url.c_str());
#endif

Expand All @@ -382,6 +386,8 @@ function_return function_rpc_interface_await(function func, function_impl impl,
return NULL; // TODO: Return here the thrown exception?
}

trace.set_attribute("rpc.request.body", buffer);

/* Create async context */
rpc_async_context *async_ctx = new rpc_async_context();
async_ctx->url = rpc_function->url;
Expand All @@ -394,6 +400,7 @@ function_return function_rpc_interface_await(function func, function_impl impl,

if (easy == NULL)
{
trace.set_error("failed to create CURL handle");
log_write("metacall", LOG_LEVEL_ERROR, "Could not create CURL handle for async call to %s", rpc_function->url.c_str());
metacall_allocator_free(rpc_impl->allocator, buffer);
delete async_ctx;
Expand Down Expand Up @@ -425,6 +432,8 @@ function_return function_rpc_interface_await(function func, function_impl impl,
/* Wake poll thread from curl_multi_poll (thread-safe) */
curl_multi_wakeup(rpc_impl->async_multi);

//Trace span ends here

/* TODO: Implement future return? */
return NULL;
}
Expand Down Expand Up @@ -605,6 +614,9 @@ loader_impl_data rpc_loader_impl_initialize(loader_impl impl, configuration conf
return NULL;
}

// Initialize tracing subsystem (no-op when OPTION_RPC_TRACING is OFF)
rpc_tracing_initialize();

/* Register initialization */
loader_initialization_register(impl);

Expand Down Expand Up @@ -917,6 +929,9 @@ int rpc_loader_impl_destroy(loader_impl impl)
{
loader_impl_rpc rpc_impl = static_cast<loader_impl_rpc>(loader_impl_get(impl));

// Shutdown tracing subsystem (no-op when OPTION_RPC_TRACING is OFF)
rpc_tracing_shutdown();

/* Destroy children loaders */
loader_unload_children(impl);

Expand Down
60 changes: 60 additions & 0 deletions source/loaders/rpc_loader/source/rpc_loader_tracing_noop.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Loader Library by Parra Studios
* A plugin for loading rpc endpoints at run-time into a process.
*
* Copyright (C) 2016 - 2026 Vicente Eduardo Ferrer Garcia <vic798@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/* No-op tracing implementation.
* Compiled when OPTION_RPC_TRACING is OFF (the default).
* All methods are empty so that would mean zero runtime overhead. */

#include <rpc_loader/rpc_loader_tracing.h>

struct rpc_trace_scope::impl
{
};

rpc_trace_scope::rpc_trace_scope(const char *function_name, const char *target_url, bool is_async)
{
(void)function_name;
(void)target_url;
(void)is_async;
pimpl = nullptr;
}

void rpc_trace_scope::set_error(const char *error_message)
{
(void)error_message;
}

void rpc_trace_scope::set_attribute(const char *key, const char *value)
{
(void)key;
(void)value;
}

rpc_trace_scope::~rpc_trace_scope()
{
}

void rpc_tracing_initialize(void)
{
}

void rpc_tracing_shutdown(void)
{
}
Loading
Loading