Skip to content

Commit

Permalink
Added feature flag for asynchronous export (#1295)
Browse files Browse the repository at this point in the history
  • Loading branch information
DebajitDas authored Mar 31, 2022
1 parent 15e7725 commit ad3bdfe
Show file tree
Hide file tree
Showing 36 changed files with 182 additions and 26 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ if(WITH_LOGS_PREVIEW)
add_definitions(-DENABLE_LOGS_PREVIEW)
endif()

option(WITH_ASYNC_EXPORT_PREVIEW "Whether enable async export" OFF)

if(WITH_ASYNC_EXPORT_PREVIEW)
add_definitions(-DENABLE_ASYNC_EXPORT)
endif()

find_package(Threads)

function(install_windows_deps)
Expand Down
4 changes: 4 additions & 0 deletions api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ endif()
if(CORE_RUNTIME_LIBS)
target_link_libraries(opentelemetry_api INTERFACE ${CORE_RUNTIME_LIBS})
endif()

if(WITH_ASYNC_EXPORT_PREVIEW)
target_compile_definitions(opentelemetry_api INTERFACE ENABLE_ASYNC_EXPORT)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records to the Elasticsearch instance asynchronously.
* @param records A list of log records to send to Elasticsearch.
Expand All @@ -99,6 +100,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
&records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
4 changes: 4 additions & 0 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ResponseHandler : public http_client::EventHandler
bool console_debug_ = false;
};

# ifdef ENABLE_ASYNC_EXPORT
/**
* This class handles the async response message from the Elasticsearch request
*/
Expand Down Expand Up @@ -192,6 +193,7 @@ class AsyncResponseHandler : public http_client::EventHandler
// Whether to print the results from the callback
bool console_debug_ = false;
};
# endif

ElasticsearchLogExporter::ElasticsearchLogExporter()
: options_{ElasticsearchExporterOptions()},
Expand Down Expand Up @@ -281,6 +283,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void ElasticsearchLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records,
Expand Down Expand Up @@ -325,6 +328,7 @@ void ElasticsearchLogExporter::Export(
options_.console_debug_);
session->SendRequest(handler);
}
# endif

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -69,6 +70,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
void Export(const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shutdown the exporter.
Expand Down
2 changes: 2 additions & 0 deletions exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ sdk_common::ExportResult JaegerExporter::Export(
return sdk_common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -78,6 +79,7 @@ void JaegerExporter::Export(
auto status = Export(spans);
result_callback(status);
}
#endif

void JaegerExporter::InitializeEndpoint()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -77,6 +78,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
auto status = Export(spans);
result_callback(status);
}
#endif

/**
* @param timeout an optional value containing the timeout of the exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a span of logs sent from the processor asynchronously.
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept;
# endif

/**
* Marks the OStream Log Exporter as shut down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
Expand Down
2 changes: 2 additions & 0 deletions exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OStreamLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -188,6 +189,7 @@ void OStreamLogExporter::Export(
auto result = Export(records);
result_callback(result);
}
# endif

bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OStreamSpanExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
auto result = Export(spans);
result_callback(result);
}
#endif

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -60,6 +61,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
virtual void Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shut down the exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
Expand All @@ -64,6 +65,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ struct OtlpHttpExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

#ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
std::size_t max_concurrent_requests = 8;
#endif
};

/**
Expand Down Expand Up @@ -87,6 +89,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -96,6 +99,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

/**
* Shut down the exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ struct OtlpHttpLogExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultLogHeaders();

# ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
std::size_t max_concurrent_requests = 8;
# endif
};

/**
Expand Down Expand Up @@ -88,6 +90,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
Expand All @@ -97,6 +100,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -152,6 +153,7 @@ void OtlpGrpcExporter::Export(
auto status = Export(spans);
result_callback(status);
}
#endif

bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_grpc_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -170,6 +171,7 @@ void OtlpGrpcLogExporter::Export(
auto status = Export(logs);
result_callback(status);
}
# endif

bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
10 changes: 8 additions & 2 deletions exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options)
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers,
options.max_concurrent_requests)))
options.http_headers
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests
#endif
)))
{}

OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr<OtlpHttpClient> http_client)
Expand All @@ -59,6 +63,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export(
return http_client_->Export(service_request);
}

#ifdef ENABLE_ASYNC_EXPORT
void OtlpHttpExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -72,6 +77,7 @@ void OtlpHttpExporter::Export(
OtlpRecordableUtils::PopulateRequest(spans, &service_request);
http_client_->Export(service_request, std::move(result_callback));
}
#endif

bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
10 changes: 8 additions & 2 deletions exporters/otlp/src/otlp_http_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers,
options.max_concurrent_requests)))
options.http_headers
# ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests
# endif
)))
{}

OtlpHttpLogExporter::OtlpHttpLogExporter(std::unique_ptr<OtlpHttpClient> http_client)
Expand All @@ -60,6 +64,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export(
return http_client_->Export(service_request);
}

# ifdef ENABLE_ASYNC_EXPORT
void OtlpHttpLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -72,6 +77,7 @@ void OtlpHttpLogExporter::Export(
OtlpRecordableUtils::PopulateRequest(logs, &service_request);
http_client_->Export(service_request, std::move(result_callback));
}
# endif

bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Loading

0 comments on commit ad3bdfe

Please sign in to comment.