Skip to content

Commit d7e38e4

Browse files
RUST-995 Use the tokio runtime for the sync API
1 parent 3b3e6f9 commit d7e38e4

File tree

15 files changed

+138
-166
lines changed

15 files changed

+138
-166
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ exclude = [
2323
default = ["tokio-runtime"]
2424
tokio-runtime = ["tokio/macros", "tokio/net", "tokio/rt", "tokio/time", "serde_bytes"]
2525
async-std-runtime = ["async-std", "async-std/attributes", "async-std-resolver", "tokio-util/compat"]
26-
sync = ["async-std-runtime"]
26+
sync = ["tokio-runtime"]
2727

2828
# Enable support for v0.4 of the chrono crate in the public API of the BSON library.
2929
bson-chrono-0_4 = ["bson/chrono-0_4"]
@@ -47,6 +47,7 @@ bson = { git = "https://github.com/mongodb/bson-rust", branch = "master" }
4747
chrono = "0.4.7"
4848
derivative = "2.1.1"
4949
flate2 = { version = "1.0", optional = true }
50+
futures = "0.3"
5051
futures-core = "0.3.14"
5152
futures-io = "0.3.14"
5253
futures-util = { version = "0.3.14", features = ["io"] }
@@ -126,7 +127,6 @@ features = ["v4"]
126127
approx = "0.4.0"
127128
derive_more = "0.99.13"
128129
function_name = "0.2.0"
129-
futures = "0.3"
130130
home = "0.5"
131131
pretty_assertions = "0.7.1"
132132
serde_json = "1.0.64"

src/client/options/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ impl ClientOptions {
10481048
#[cfg(any(feature = "sync", docsrs))]
10491049
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
10501050
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
1051-
crate::RUNTIME.block_on(Self::parse_uri(s.as_ref(), None))
1051+
crate::sync::TOKIO_RUNTIME.block_on(Self::parse_uri(s.as_ref(), None))
10521052
}
10531053

10541054
/// Parses a MongoDB connection string into a `ClientOptions` struct.
@@ -1078,7 +1078,7 @@ impl ClientOptions {
10781078
#[cfg(any(feature = "sync", docsrs))]
10791079
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
10801080
pub fn parse_with_resolver_config(uri: &str, resolver_config: ResolverConfig) -> Result<Self> {
1081-
crate::RUNTIME.block_on(Self::parse_uri(uri, Some(resolver_config)))
1081+
crate::sync::TOKIO_RUNTIME.block_on(Self::parse_uri(uri, Some(resolver_config)))
10821082
}
10831083

10841084
/// Populate this `ClientOptions` from the given URI, optionally using the resolver config for

src/cmap/establish/handshake/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ use crate::{
1818
sdam::Topology,
1919
};
2020

21-
#[cfg(feature = "tokio-runtime")]
21+
#[cfg(all(feature = "tokio-runtime", not(feature = "sync")))]
2222
const RUNTIME_NAME: &str = "tokio";
2323

24-
#[cfg(all(feature = "async-std-runtime", not(feature = "sync")))]
24+
#[cfg(feature = "async-std-runtime")]
2525
const RUNTIME_NAME: &str = "async-std";
2626

2727
#[cfg(feature = "sync")]
28-
const RUNTIME_NAME: &str = "sync (with async-std)";
28+
const RUNTIME_NAME: &str = "sync (with tokio)";
2929

3030
#[derive(Clone, Debug)]
3131
struct ClientMetadata {

src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -367,10 +367,9 @@ compile_error!(
367367
`async-std-runtime` or set `default-features = false` in your Cargo.toml"
368368
);
369369

370-
#[cfg(all(feature = "tokio-runtime", feature = "sync"))]
370+
#[cfg(all(feature = "async-std-runtime", feature = "sync"))]
371371
compile_error!(
372-
"`tokio-runtime` and `sync` can't both be enabled; either disable `sync` or set \
373-
`default-features = false` in your Cargo.toml"
372+
"`async-std-runtime` and `sync` can't both be enabled"
374373
);
375374

376375
#[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))]

src/runtime/mod.rs

Lines changed: 9 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,15 @@ impl AsyncRuntime {
5151
{
5252
match self {
5353
#[cfg(feature = "tokio-runtime")]
54-
Self::Tokio => match TokioCallingContext::current() {
55-
TokioCallingContext::Async(handle) => {
56-
Some(AsyncJoinHandle::Tokio(handle.spawn(fut)))
57-
}
58-
TokioCallingContext::Sync => None,
59-
},
54+
Self::Tokio => {
55+
#[cfg(feature = "sync")]
56+
let handle = crate::sync::TOKIO_RUNTIME.handle();
57+
58+
#[cfg(not(feature = "sync"))]
59+
let handle = tokio::runtime::Handle::current();
60+
61+
Some(AsyncJoinHandle::Tokio(handle.spawn(fut)))
62+
}
6063

6164
#[cfg(feature = "async-std-runtime")]
6265
Self::AsyncStd => Some(AsyncJoinHandle::AsyncStd(async_std::task::spawn(fut))),
@@ -74,32 +77,6 @@ impl AsyncRuntime {
7477
self.spawn(fut);
7578
}
7679

77-
/// Run a future in the foreground, blocking on it completing.
78-
///
79-
/// This will panic if called from a sychronous context when tokio is being used.
80-
#[cfg(any(feature = "sync", test))]
81-
pub(crate) fn block_on<F, T>(self, fut: F) -> T
82-
where
83-
F: Future<Output = T>,
84-
{
85-
#[cfg(all(feature = "tokio-runtime", not(feature = "async-std-runtime")))]
86-
{
87-
match TokioCallingContext::current() {
88-
TokioCallingContext::Async(_handle) => {
89-
tokio::task::block_in_place(|| futures::executor::block_on(fut))
90-
}
91-
TokioCallingContext::Sync => {
92-
panic!("block_on called from tokio outside of async context")
93-
}
94-
}
95-
}
96-
97-
#[cfg(feature = "async-std-runtime")]
98-
{
99-
async_std::task::block_on(fut)
100-
}
101-
}
102-
10380
/// Run a future in the foreground, blocking on it completing.
10481
/// This does not notify the runtime that it will be blocking and should only be used for
10582
/// operations that will immediately (or quickly) succeed.
@@ -177,25 +154,3 @@ impl AsyncRuntime {
177154
}
178155
}
179156
}
180-
181-
/// Represents the context in which a given runtime method is being called from.
182-
#[cfg(feature = "tokio-runtime")]
183-
enum TokioCallingContext {
184-
/// From a syncronous setting (i.e. not from a runtime thread).
185-
Sync,
186-
187-
/// From an asyncronous setting (i.e. from an async block or function being run on a runtime).
188-
/// Includes a handle to the current runtime.
189-
Async(tokio::runtime::Handle),
190-
}
191-
192-
#[cfg(feature = "tokio-runtime")]
193-
impl TokioCallingContext {
194-
/// Get the current calling context.
195-
fn current() -> Self {
196-
match tokio::runtime::Handle::try_current() {
197-
Ok(handle) => TokioCallingContext::Async(handle),
198-
Err(_) => TokioCallingContext::Sync,
199-
}
200-
}
201-
}

src/sync/change_stream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
ChangeStream as AsyncChangeStream,
99
},
1010
error::Result,
11-
RUNTIME,
11+
sync::TOKIO_RUNTIME,
1212
};
1313

1414
use super::ClientSession;
@@ -112,7 +112,7 @@ where
112112
/// # }
113113
/// ```
114114
pub fn next_if_any(&mut self) -> Result<Option<T>> {
115-
RUNTIME.block_on(self.async_stream.next_if_any())
115+
TOKIO_RUNTIME.block_on(self.async_stream.next_if_any())
116116
}
117117
}
118118

@@ -123,7 +123,7 @@ where
123123
type Item = Result<T>;
124124

125125
fn next(&mut self) -> Option<Self::Item> {
126-
RUNTIME.block_on(self.async_stream.next())
126+
TOKIO_RUNTIME.block_on(self.async_stream.next())
127127
}
128128
}
129129

@@ -200,7 +200,7 @@ where
200200
/// # }
201201
/// ```
202202
pub fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
203-
RUNTIME.block_on(self.async_stream.next(&mut session.async_client_session))
203+
TOKIO_RUNTIME.block_on(self.async_stream.next(&mut session.async_client_session))
204204
}
205205

206206
/// Returns whether the change stream will continue to receive events.
@@ -234,7 +234,7 @@ where
234234
/// # }
235235
/// ```
236236
pub fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
237-
RUNTIME.block_on(
237+
TOKIO_RUNTIME.block_on(
238238
self.async_stream
239239
.next_if_any(&mut session.async_client_session),
240240
)

src/sync/client/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use crate::{
1414
SessionOptions,
1515
},
1616
results::DatabaseSpecification,
17+
sync::TOKIO_RUNTIME,
1718
Client as AsyncClient,
18-
RUNTIME,
1919
};
2020

2121
/// This is the main entry point for the synchronous API. A `Client` is used to connect to a MongoDB
@@ -75,7 +75,7 @@ impl Client {
7575
/// [`ClientOptions::parse`](../options/struct.ClientOptions.html#method.parse) for more
7676
/// details.
7777
pub fn with_uri_str(uri: impl AsRef<str>) -> Result<Self> {
78-
let async_client = RUNTIME.block_on(AsyncClient::with_uri_str(uri.as_ref()))?;
78+
let async_client = TOKIO_RUNTIME.block_on(AsyncClient::with_uri_str(uri.as_ref()))?;
7979
Ok(Self { async_client })
8080
}
8181

@@ -134,7 +134,7 @@ impl Client {
134134
filter: impl Into<Option<Document>>,
135135
options: impl Into<Option<ListDatabasesOptions>>,
136136
) -> Result<Vec<DatabaseSpecification>> {
137-
RUNTIME.block_on(
137+
TOKIO_RUNTIME.block_on(
138138
self.async_client
139139
.list_databases(filter.into(), options.into()),
140140
)
@@ -146,15 +146,15 @@ impl Client {
146146
filter: impl Into<Option<Document>>,
147147
options: impl Into<Option<ListDatabasesOptions>>,
148148
) -> Result<Vec<String>> {
149-
RUNTIME.block_on(
149+
TOKIO_RUNTIME.block_on(
150150
self.async_client
151151
.list_database_names(filter.into(), options.into()),
152152
)
153153
}
154154

155155
/// Starts a new `ClientSession`.
156156
pub fn start_session(&self, options: Option<SessionOptions>) -> Result<ClientSession> {
157-
RUNTIME
157+
TOKIO_RUNTIME
158158
.block_on(self.async_client.start_session(options))
159159
.map(Into::into)
160160
}
@@ -182,7 +182,7 @@ impl Client {
182182
pipeline: impl IntoIterator<Item = Document>,
183183
options: impl Into<Option<ChangeStreamOptions>>,
184184
) -> Result<ChangeStream<ChangeStreamEvent<Document>>> {
185-
RUNTIME
185+
TOKIO_RUNTIME
186186
.block_on(self.async_client.watch(pipeline, options))
187187
.map(ChangeStream::new)
188188
}
@@ -195,7 +195,7 @@ impl Client {
195195
options: impl Into<Option<ChangeStreamOptions>>,
196196
session: &mut ClientSession,
197197
) -> Result<SessionChangeStream<ChangeStreamEvent<Document>>> {
198-
RUNTIME
198+
TOKIO_RUNTIME
199199
.block_on(self.async_client.watch_with_session(
200200
pipeline,
201201
options,

src/sync/client/session.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use crate::{
44
client::session::ClusterTime,
55
error::Result,
66
options::{SessionOptions, TransactionOptions},
7+
sync::TOKIO_RUNTIME,
78
ClientSession as AsyncClientSession,
8-
RUNTIME,
99
};
1010

1111
/// A MongoDB client session. This struct represents a logical session used for ordering sequential
@@ -76,7 +76,7 @@ impl ClientSession {
7676
&mut self,
7777
options: impl Into<Option<TransactionOptions>>,
7878
) -> Result<()> {
79-
RUNTIME.block_on(self.async_client_session.start_transaction(options))
79+
TOKIO_RUNTIME.block_on(self.async_client_session.start_transaction(options))
8080
}
8181

8282
/// Commits the transaction that is currently active on this session.
@@ -100,7 +100,7 @@ impl ClientSession {
100100
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
101101
/// retryable writes.
102102
pub fn commit_transaction(&mut self) -> Result<()> {
103-
RUNTIME.block_on(self.async_client_session.commit_transaction())
103+
TOKIO_RUNTIME.block_on(self.async_client_session.commit_transaction())
104104
}
105105

106106
/// Aborts the transaction that is currently active on this session. Any open transaction will
@@ -133,6 +133,6 @@ impl ClientSession {
133133
/// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on
134134
/// retryable writes.
135135
pub fn abort_transaction(&mut self) -> Result<()> {
136-
RUNTIME.block_on(self.async_client_session.abort_transaction())
136+
TOKIO_RUNTIME.block_on(self.async_client_session.abort_transaction())
137137
}
138138
}

0 commit comments

Comments
 (0)