Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove async std #2820

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ It's important to regularly review and remove the `otel_unstable` flag from the

The potential features include:

- Stable and non-experimental features that compliant to specification, and have a feature flag to minimize compilation size. Example: feature flags for signals (like `logs`, `traces`, `metrics`) and runtimes (`rt-tokio`, `rt-tokio-current-thread`, `rt-async-std`).
- Stable and non-experimental features that are compliant with the specification and have a feature flag to minimize compilation size. Example: feature flags for signals (like `logs`, `traces`, `metrics`) and runtimes (`rt-tokio`, `rt-tokio-current-thread`).
- Stable and non-experimental features, although not part of the specification, are crucial for enhancing the tracing/log crate's functionality or boosting performance. These features are also subject to discussion and approval by the OpenTelemetry Rust Maintainers.

All such features should adhere to naming convention `<signal>_<feature_name>`
Expand Down
1 change: 0 additions & 1 deletion opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@
//! ```
//!
//! [`tokio`]: https://tokio.rs
//! [`async-std`]: https://async.rs
//!
//! # Feature Flags
//! The following feature flags can enable exporters for different telemetry signals:
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- **Breaking**: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641)
for the changes.
- Removed `async-std` support for `Runtime`, as [`async-std` crate is deprecated](https://crates.io/crates/async-std).
- Calls to `MeterProviderBuilder::with_resource`, `TracerProviderBuilder::with_resource`,
`LoggerProviderBuilder::with_resource` are now additive ([#2677](https://github.com/open-telemetry/opentelemetry-rust/pull/2677)).
- Moved `ExportError` trait from `opentelemetry::trace::ExportError` to `opentelemetry_sdk::export::ExportError`
Expand Down
4 changes: 1 addition & 3 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ rust-version = "1.75.0"
[dependencies]
opentelemetry = { version = "0.28", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.28", path = "../opentelemetry-http", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
futures-channel = { workspace = true }
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
Expand Down Expand Up @@ -47,11 +46,10 @@ jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_
logs = ["opentelemetry/logs", "serde_json"]
spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"]
metrics = ["opentelemetry/metrics", "glob"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
experimental_async_runtime = []
rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"]
rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"]
rt-async-std = ["async-std", "experimental_async_runtime"]
internal-logs = ["tracing"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
spec_unstable_metrics_views = ["metrics"]
Expand Down
2 changes: 0 additions & 2 deletions opentelemetry-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@
//! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality.
//! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime.
//! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked.
//! * `rt-async-std`: Spawn telemetry tasks using [async-std]'s runtime.
//!
//! [tokio]: https://crates.io/crates/tokio
//! [async-std]: https://crates.io/crates/async-std
#![warn(
future_incompatible,
missing_debug_implementations,
Expand Down
65 changes: 3 additions & 62 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
//! Provides an abstraction of several async runtimes
//!
//! This allows OpenTelemetry to work with any current or future runtime. There are currently
//! builtin implementations for [Tokio] and [async-std].
//! This allows OpenTelemetry to work with any current or future runtime. There is currently
//! built-in implementation for [Tokio].
//!
//! [Tokio]: https://crates.io/crates/tokio
//! [async-std]: https://crates.io/crates/async-std

use futures_util::stream::{unfold, Stream};
use std::{fmt::Debug, future::Future, time::Duration};
use thiserror::Error;

/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
/// A runtime is an abstraction of an async runtime like [Tokio]. It allows
/// OpenTelemetry to work with any current and hopefully future runtime implementations.
///
/// [Tokio]: https://crates.io/crates/tokio
/// [async-std]: https://crates.io/crates/async-std
///
/// # Note
///
Expand Down Expand Up @@ -139,34 +137,6 @@ impl Runtime for TokioCurrentThread {
}
}

/// Runtime implementation, which works with async-std.
#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
#[derive(Debug, Clone)]
pub struct AsyncStd;

#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
impl Runtime for AsyncStd {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
let _ = async_std::task::spawn(future);
}

fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
async_std::task::sleep(duration)
}
}

/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
/// channel that is used by the [log] and [span] batch processors.
///
Expand Down Expand Up @@ -275,32 +245,3 @@ impl RuntimeChannel for TokioCurrentThread {
)
}
}

#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
impl<T: Send> TrySend for async_std::channel::Sender<T> {
type Message = T;

fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
self.try_send(item).map_err(|err| match err {
async_std::channel::TrySendError::Full(_) => TrySendError::ChannelFull,
async_std::channel::TrySendError::Closed(_) => TrySendError::ChannelClosed,
})
}
}

#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
impl RuntimeChannel for AsyncStd {
type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
type Sender<T: Debug + Send> = async_std::channel::Sender<T>;

fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
async_std::channel::bounded(capacity)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::sync::Arc;
///
/// This processor can be configured with an [`executor`] of your choice to
/// batch and upload spans asynchronously when they end. If you have added a
/// library like [`tokio`] or [`async-std`], you can pass in their respective
/// library like [`tokio`], you can pass in their respective
/// `spawn` and `interval` functions to have batching performed in those
/// contexts.
///
Expand Down Expand Up @@ -79,7 +79,6 @@ use std::sync::Arc;
///
/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html
/// [`tokio`]: https://tokio.rs
/// [`async-std`]: https://async.rs
pub struct BatchSpanProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,

Expand Down Expand Up @@ -577,41 +576,4 @@ mod tests {
.unwrap();
runtime.block_on(timeout_test_tokio(false));
}

#[test]
#[cfg(feature = "rt-async-std")]
fn test_timeout_async_std_timeout() {
async_std::task::block_on(timeout_test_std_async(true));
}

#[test]
#[cfg(feature = "rt-async-std")]
fn test_timeout_async_std_not_timeout() {
async_std::task::block_on(timeout_test_std_async(false));
}

// If the time_out is true, then the result suppose to ended with timeout.
// otherwise the exporter should be able to export within time out duration.
#[cfg(feature = "rt-async-std")]
async fn timeout_test_std_async(time_out: bool) {
let config = BatchConfig {
max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
..Default::default()
};
let exporter = BlockingExporter {
delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
delay_fn: async_std::task::sleep,
};
let processor = BatchSpanProcessor::new(exporter, config, runtime::AsyncStd);
processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
if time_out {
assert!(flush_res.is_err());
} else {
assert!(flush_res.is_ok());
}
let shutdown_res = processor.shutdown();
assert!(shutdown_res.is_ok());
}
}
5 changes: 2 additions & 3 deletions opentelemetry-zipkin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
## Performance

For optimal performance, a batch exporter is recommended as the simple exporter
will export each span synchronously on drop. You can enable the [`rt-tokio`],
[`rt-tokio-current-thread`] or [`rt-async-std`] features and specify a runtime
will export each span synchronously on drop. You can enable the [`rt-tokio`] or
[`rt-tokio-current-thread`] features and specify a runtime
on the pipeline builder to have a batch exporter configured for you
automatically.

Expand All @@ -83,7 +83,6 @@ let tracer = opentelemetry_zipkin::new_pipeline()
```

[`rt-tokio`]: https://tokio.rs
[`async-std`]: https://async.rs

## Choosing an HTTP client

Expand Down
3 changes: 1 addition & 2 deletions opentelemetry/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@
//!
//! Exporting spans often involves sending data over a network or performing
//! other I/O tasks. OpenTelemetry allows you to schedule these tasks using
//! whichever runtime you are already using such as [Tokio] or [async-std].
//! whichever runtime you are already using such as [Tokio].
//! When using an async runtime it's best to use the batch span processor
//! where the spans will be sent in batches as opposed to being sent once ended,
//! which often ends up being more efficient.
//!
//! [Tokio]: https://tokio.rs
//! [async-std]: https://async.rs
//!
//! ## Managing Active Spans
//!
Expand Down
Loading