Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added feature flag for asynchronous export #1295

Merged
merged 2 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ if(WITH_LOGS_PREVIEW)
add_definitions(-DENABLE_LOGS_PREVIEW)
endif()

option(WITH_ASYNC_EXPORT_PREVIEW "Whether enable async export" OFF)

if(WITH_ASYNC_EXPORT_PREVIEW)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add INTERFACE definitions into api/CMakeLists.txt.

if(WITH_LOGS_PREVIEW)
  target_compile_definitions(opentelemetry_api INTERFACE ENABLE_ASYNC_EXPORT)
endif()

So that when we use optentelemetry-cpp as a cmake module(find_package(opentelemetry-cpp CONFIG)) , this definition can be auto added into all targets that direct or indirectly depend on opentelemetry-cpp.

I find there are severval missing definitions in api/CMakeLists.txt and be add by add_definitions(). All definitions should be added by target_compile_definitions(opentelemetry_api INTERFACE ...) to be expoted by cmake.
We should avoid to use add_definitions(...) unless the definitions is only used by unit tests, should I create another issue to fix it? @lalitb @ThomsonTan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

add_definitions(-DENABLE_ASYNC_EXPORT)
endif()

find_package(Threads)

function(install_windows_deps)
Expand Down
4 changes: 4 additions & 0 deletions api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ endif()
if(CORE_RUNTIME_LIBS)
target_link_libraries(opentelemetry_api INTERFACE ${CORE_RUNTIME_LIBS})
endif()

if(WITH_ASYNC_EXPORT_PREVIEW)
target_compile_definitions(opentelemetry_api INTERFACE ENABLE_ASYNC_EXPORT)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records to the Elasticsearch instance asynchronously.
* @param records A list of log records to send to Elasticsearch.
Expand All @@ -99,6 +100,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
&records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
4 changes: 4 additions & 0 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ResponseHandler : public http_client::EventHandler
bool console_debug_ = false;
};

# ifdef ENABLE_ASYNC_EXPORT
/**
* This class handles the async response message from the Elasticsearch request
*/
Expand Down Expand Up @@ -192,6 +193,7 @@ class AsyncResponseHandler : public http_client::EventHandler
// Whether to print the results from the callback
bool console_debug_ = false;
};
# endif

ElasticsearchLogExporter::ElasticsearchLogExporter()
: options_{ElasticsearchExporterOptions()},
Expand Down Expand Up @@ -281,6 +283,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void ElasticsearchLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records,
Expand Down Expand Up @@ -325,6 +328,7 @@ void ElasticsearchLogExporter::Export(
options_.console_debug_);
session->SendRequest(handler);
}
# endif

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -69,6 +70,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
void Export(const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shutdown the exporter.
Expand Down
2 changes: 2 additions & 0 deletions exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ sdk_common::ExportResult JaegerExporter::Export(
return sdk_common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -78,6 +79,7 @@ void JaegerExporter::Export(
auto status = Export(spans);
result_callback(status);
}
#endif

void JaegerExporter::InitializeEndpoint()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -77,6 +78,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
auto status = Export(spans);
result_callback(status);
}
#endif

/**
* @param timeout an optional value containing the timeout of the exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a span of logs sent from the processor asynchronously.
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept;
# endif

/**
* Marks the OStream Log Exporter as shut down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
Expand Down
2 changes: 2 additions & 0 deletions exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OStreamLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -188,6 +189,7 @@ void OStreamLogExporter::Export(
auto result = Export(records);
result_callback(result);
}
# endif

bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OStreamSpanExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
auto result = Export(spans);
result_callback(result);
}
#endif

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -60,6 +61,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
virtual void Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shut down the exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
Expand All @@ -64,6 +65,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ struct OtlpHttpExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

#ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
std::size_t max_concurrent_requests = 8;
#endif
};

/**
Expand Down Expand Up @@ -87,6 +89,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also put max_concurrent_requests in OtlpHttpExporterOptions into ENABLE_ASYNC_EXPORT ? It's useless when async exporting is disabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -96,6 +99,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

/**
* Shut down the exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ struct OtlpHttpLogExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultLogHeaders();

# ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
std::size_t max_concurrent_requests = 8;
# endif
};

/**
Expand Down Expand Up @@ -88,6 +90,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also remove max_concurrent_requests in OtlpHttpExporterOptions and OtlpHttpLogExporterOptions when we do not decalre ENABLE_ASYNC_EXPORT ? It's useless when async exporting is disabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
Expand All @@ -97,6 +100,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -152,6 +153,7 @@ void OtlpGrpcExporter::Export(
auto status = Export(spans);
result_callback(status);
}
#endif

bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_grpc_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -170,6 +171,7 @@ void OtlpGrpcLogExporter::Export(
auto status = Export(logs);
result_callback(status);
}
# endif

bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
10 changes: 8 additions & 2 deletions exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options)
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers,
options.max_concurrent_requests)))
options.http_headers
#ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests
#endif
)))
{}

OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr<OtlpHttpClient> http_client)
Expand All @@ -59,6 +63,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export(
return http_client_->Export(service_request);
}

#ifdef ENABLE_ASYNC_EXPORT
void OtlpHttpExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -72,6 +77,7 @@ void OtlpHttpExporter::Export(
OtlpRecordableUtils::PopulateRequest(spans, &service_request);
http_client_->Export(service_request, std::move(result_callback));
}
#endif

bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
10 changes: 8 additions & 2 deletions exporters/otlp/src/otlp_http_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ OtlpHttpLogExporter::OtlpHttpLogExporter(const OtlpHttpLogExporterOptions &optio
options.use_json_name,
options.console_debug,
options.timeout,
options.http_headers,
options.max_concurrent_requests)))
options.http_headers
# ifdef ENABLE_ASYNC_EXPORT
,
options.max_concurrent_requests
# endif
)))
{}

OtlpHttpLogExporter::OtlpHttpLogExporter(std::unique_ptr<OtlpHttpClient> http_client)
Expand All @@ -60,6 +64,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export(
return http_client_->Export(service_request);
}

# ifdef ENABLE_ASYNC_EXPORT
void OtlpHttpLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -72,6 +77,7 @@ void OtlpHttpLogExporter::Export(
OtlpRecordableUtils::PopulateRequest(logs, &service_request);
http_client_->Export(service_request, std::move(result_callback));
}
# endif

bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Loading