Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added feature flag for asynchronous export #1295

Merged
merged 2 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ if(WITH_LOGS_PREVIEW)
add_definitions(-DENABLE_LOGS_PREVIEW)
endif()

option(WITH_ASYNC_EXPORT_PREVIEW "Whether enable async export" OFF)

if(WITH_ASYNC_EXPORT_PREVIEW)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add INTERFACE definitions into api/CMakeLists.txt.

if(WITH_LOGS_PREVIEW)
  target_compile_definitions(opentelemetry_api INTERFACE ENABLE_ASYNC_EXPORT)
endif()

So that when we use optentelemetry-cpp as a cmake module(find_package(opentelemetry-cpp CONFIG)) , this definition can be auto added into all targets that direct or indirectly depend on opentelemetry-cpp.

I find there are severval missing definitions in api/CMakeLists.txt and be add by add_definitions(). All definitions should be added by target_compile_definitions(opentelemetry_api INTERFACE ...) to be expoted by cmake.
We should avoid to use add_definitions(...) unless the definitions is only used by unit tests, should I create another issue to fix it? @lalitb @ThomsonTan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

add_definitions(-DENABLE_ASYNC_EXPORT)
endif()

find_package(Threads)

function(install_windows_deps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records) noexcept override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records to the Elasticsearch instance asynchronously.
* @param records A list of log records to send to Elasticsearch.
Expand All @@ -99,6 +100,7 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor
&records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
4 changes: 4 additions & 0 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ResponseHandler : public http_client::EventHandler
bool console_debug_ = false;
};

# ifdef ENABLE_ASYNC_EXPORT
/**
* This class handles the async response message from the Elasticsearch request
*/
Expand Down Expand Up @@ -192,6 +193,7 @@ class AsyncResponseHandler : public http_client::EventHandler
// Whether to print the results from the callback
bool console_debug_ = false;
};
# endif

ElasticsearchLogExporter::ElasticsearchLogExporter()
: options_{ElasticsearchExporterOptions()},
Expand Down Expand Up @@ -281,6 +283,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void ElasticsearchLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>>
&records,
Expand Down Expand Up @@ -325,6 +328,7 @@ void ElasticsearchLogExporter::Export(
options_.console_debug_);
session->SendRequest(handler);
}
# endif

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -69,6 +70,7 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
void Export(const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shutdown the exporter.
Expand Down
2 changes: 2 additions & 0 deletions exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ sdk_common::ExportResult JaegerExporter::Export(
return sdk_common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -78,6 +79,7 @@ void JaegerExporter::Export(
auto status = Export(spans);
result_callback(status);
}
#endif

void JaegerExporter::InitializeEndpoint()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -77,6 +78,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
auto status = Export(spans);
result_callback(status);
}
#endif

/**
* @param timeout an optional value containing the timeout of the exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a span of logs sent from the processor asynchronously.
*/
void Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept;
# endif

/**
* Marks the OStream Log Exporter as shut down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
void Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>>
&spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
Expand Down
2 changes: 2 additions & 0 deletions exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OStreamLogExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -188,6 +189,7 @@ void OStreamLogExporter::Export(
auto result = Export(records);
result_callback(result);
}
# endif

bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ sdk::common::ExportResult OStreamSpanExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OStreamSpanExporter::Export(
const opentelemetry::nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
{
auto result = Export(spans);
result_callback(result);
}
#endif

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept override;

#ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -60,6 +61,7 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
virtual void Export(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)>
&&result_callback) noexcept override;
#endif

/**
* Shut down the exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
Expand All @@ -64,6 +65,7 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
override;

#ifdef ENABLE_ASYNC_EXPORT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also put max_concurrent_requests in OtlpHttpExporterOptions into ENABLE_ASYNC_EXPORT ? It's useless when async exporting is disabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Exports a batch of span recordables asynchronously.
* @param spans a span of unique pointers to span recordables
Expand All @@ -96,6 +97,7 @@ class OtlpHttpExporter final : public opentelemetry::sdk::trace::SpanExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
#endif

/**
* Shut down the exporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records) noexcept
override;

# ifdef ENABLE_ASYNC_EXPORT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also remove max_concurrent_requests in OtlpHttpExporterOptions and OtlpHttpLogExporterOptions when we do not decalre ENABLE_ASYNC_EXPORT ? It's useless when async exporting is disabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Exports a vector of log records asynchronously.
* @param records A list of log records.
Expand All @@ -97,6 +98,7 @@ class OtlpHttpLogExporter final : public opentelemetry::sdk::logs::LogExporter
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &records,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
override;
# endif

/**
* Shutdown this exporter.
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

#ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -152,6 +153,7 @@ void OtlpGrpcExporter::Export(
auto status = Export(spans);
result_callback(status);
}
#endif

bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_grpc_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

# ifdef ENABLE_ASYNC_EXPORT
void OtlpGrpcLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -170,6 +171,7 @@ void OtlpGrpcLogExporter::Export(
auto status = Export(logs);
result_callback(status);
}
# endif

bool OtlpGrpcLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export(
return http_client_->Export(service_request);
}

#ifdef ENABLE_ASYNC_EXPORT
void OtlpHttpExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -72,6 +73,7 @@ void OtlpHttpExporter::Export(
OtlpRecordableUtils::PopulateRequest(spans, &service_request);
http_client_->Export(service_request, std::move(result_callback));
}
#endif

bool OtlpHttpExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/src/otlp_http_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export(
return http_client_->Export(service_request);
}

# ifdef ENABLE_ASYNC_EXPORT
void OtlpHttpLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs,
std::function<bool(opentelemetry::sdk::common::ExportResult)> &&result_callback) noexcept
Expand All @@ -72,6 +73,7 @@ void OtlpHttpLogExporter::Export(
OtlpRecordableUtils::PopulateRequest(logs, &service_request);
http_client_->Export(service_request, std::move(result_callback));
}
# endif

bool OtlpHttpLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
Expand Down
15 changes: 11 additions & 4 deletions exporters/otlp/test/otlp_http_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);
processor_opts.is_export_async = is_async;
auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
# ifdef ENABLE_ASYNC_EXPORT
processor_opts.is_export_async = is_async;
# endif
auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts));
auto provider = nostd::shared_ptr<trace::TracerProvider>(
new sdk::trace::TracerProvider(std::move(processor), resource));
Expand Down Expand Up @@ -210,8 +212,9 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);
processor_opts.is_export_async = is_async;

# ifdef ENABLE_ASYNC_EXPORT
processor_opts.is_export_async = is_async;
# endif
auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts));
auto provider = nostd::shared_ptr<trace::TracerProvider>(
Expand Down Expand Up @@ -282,21 +285,25 @@ TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestSync)
ExportJsonIntegrationTest(false);
}

# ifdef ENABLE_ASYNC_EXPORT
TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestAsync)
{
ExportJsonIntegrationTest(true);
}
# endif

// Create spans, let processor call Export()
TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestSync)
{
ExportBinaryIntegrationTest(false);
}

# ifdef ENABLE_ASYNC_EXPORT
TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestAsync)
{
ExportBinaryIntegrationTest(true);
}
# endif

// Test exporter configuration options
TEST_F(OtlpHttpExporterTestPeer, ConfigTest)
Expand Down
8 changes: 7 additions & 1 deletion exporters/otlp/test/otlp_http_log_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test
processor_options.max_export_batch_size = 5;
processor_options.max_queue_size = 5;
processor_options.schedule_delay_millis = std::chrono::milliseconds(256);
processor_options.is_export_async = is_async;
# ifdef ENABLE_ASYNC_EXPORT
processor_options.is_export_async = is_async;
# endif
auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), processor_options)));
Expand Down Expand Up @@ -306,21 +308,25 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestSync)
ExportJsonIntegrationTest(false);
}

# ifdef ENABLE_ASYNC_EXPORT
TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTestAsync)
{
ExportJsonIntegrationTest(true);
}
# endif

// Create log records, let processor call Export()
TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestSync)
{
ExportBinaryIntegrationTest(false);
}

# ifdef ENABLE_ASYNC_EXPORT
TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTestAsync)
{
ExportBinaryIntegrationTest(true);
}
# endif

// Test exporter configuration options
TEST_F(OtlpHttpLogExporterTestPeer, ConfigTest)
Expand Down
Loading