Skip to content

Commit c1ea3ff

Browse files
committed
feat: Add async-no-send feature flag to parquet
1 parent 026356b commit c1ea3ff

File tree

8 files changed

+74
-51
lines changed

8 files changed

+74
-51
lines changed

parquet/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ test_common = ["arrow/test_utils"]
109109
experimental = []
110110
# Enable async APIs
111111
async = ["futures", "tokio"]
112+
# Enable `!Send` with async APIs
113+
async-no-send = ["async"]
112114
# Enable object_store integration
113115
object_store = ["dep:object_store", "async"]
114116
# Group Zstd dependencies

parquet/src/arrow/async_reader/metadata.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
2121
use crate::file::page_index::index::Index;
2222
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
2323
use crate::file::FOOTER_SIZE;
24+
use crate::util::async_util::{BoxFuture, Send};
2425
use bytes::Bytes;
25-
use futures::future::BoxFuture;
26-
use futures::FutureExt;
2726
use std::future::Future;
2827
use std::ops::Range;
2928

@@ -66,7 +65,7 @@ pub trait MetadataFetch {
6665
/// Return a future that fetches the specified range of bytes asynchronously
6766
///
6867
/// Note the returned type is a boxed future, often created by
69-
/// [FutureExt::boxed]. See the trait documentation for an example
68+
/// [futures::future::FutureExt::boxed]. See the trait documentation for an example
7069
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
7170
}
7271

@@ -82,7 +81,7 @@ pub trait MetadataSuffixFetch: MetadataFetch {
8281
/// Return a future that fetches the last `n` bytes asynchronously
8382
///
8483
/// Note the returned type is a boxed future, often created by
85-
/// [FutureExt::boxed]. See the trait documentation for an example
84+
/// [futures::future::FutureExt::boxed]. See the trait documentation for an example
8685
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>>;
8786
}
8887

@@ -267,7 +266,7 @@ where
267266
Fut: Future<Output = Result<Bytes>> + Send,
268267
{
269268
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
270-
async move { self.0(range.start.try_into()?..range.end.try_into()?).await }.boxed()
269+
Box::pin(async move { self.0(range.start.try_into()?..range.end.try_into()?).await })
271270
}
272271
}
273272

parquet/src/arrow/async_reader/mod.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::sync::Arc;
3030
use std::task::{Context, Poll};
3131

3232
use bytes::{Buf, Bytes};
33-
use futures::future::{BoxFuture, FutureExt};
33+
use futures::future::FutureExt;
3434
use futures::ready;
3535
use futures::stream::Stream;
3636
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
@@ -54,6 +54,7 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
5454
use crate::file::page_index::offset_index::OffsetIndexMetaData;
5555
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
5656
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
57+
use crate::util::async_util::{BoxFuture, Send};
5758

5859
mod metadata;
5960
pub use metadata::*;
@@ -85,7 +86,7 @@ pub trait AsyncFileReader: Send {
8586

8687
/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
8788
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
88-
async move {
89+
Box::pin(async move {
8990
let mut result = Vec::with_capacity(ranges.len());
9091

9192
for range in ranges.into_iter() {
@@ -94,8 +95,7 @@ pub trait AsyncFileReader: Send {
9495
}
9596

9697
Ok(result)
97-
}
98-
.boxed()
98+
})
9999
}
100100

101101
/// Return a future which results in the [`ParquetMetaData`] for this Parquet file.
@@ -140,19 +140,18 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
140140

141141
impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> MetadataSuffixFetch for T {
142142
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
143-
async move {
143+
Box::pin(async move {
144144
self.seek(SeekFrom::End(-(suffix as i64))).await?;
145145
let mut buf = Vec::with_capacity(suffix);
146146
self.take(suffix as _).read_to_end(&mut buf).await?;
147147
Ok(buf.into())
148-
}
149-
.boxed()
148+
})
150149
}
151150
}
152151

153152
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
154153
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
155-
async move {
154+
Box::pin(async move {
156155
self.seek(SeekFrom::Start(range.start)).await?;
157156

158157
let to_read = range.end - range.start;
@@ -163,15 +162,14 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
163162
}
164163

165164
Ok(buffer.into())
166-
}
167-
.boxed()
165+
})
168166
}
169167

170168
fn get_metadata<'a>(
171169
&'a mut self,
172170
options: Option<&'a ArrowReaderOptions>,
173171
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
174-
async move {
172+
Box::pin(async move {
175173
let metadata_reader = ParquetMetaDataReader::new()
176174
.with_page_indexes(options.is_some_and(|o| o.page_index));
177175

@@ -182,8 +180,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
182180

183181
let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?;
184182
Ok(Arc::new(parquet_metadata))
185-
}
186-
.boxed()
183+
})
187184
}
188185
}
189186

@@ -844,14 +841,12 @@ where
844841

845842
let selection = self.selection.as_mut().map(|s| s.split_off(row_count));
846843

847-
let fut = reader
848-
.read_row_group(
849-
row_group_idx,
850-
selection,
851-
self.projection.clone(),
852-
self.batch_size,
853-
)
854-
.boxed();
844+
let fut = Box::pin(reader.read_row_group(
845+
row_group_idx,
846+
selection,
847+
self.projection.clone(),
848+
self.batch_size,
849+
));
855850

856851
self.state = StreamState::Reading(fut)
857852
}

parquet/src/arrow/async_reader/store.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ use crate::arrow::arrow_reader::ArrowReaderOptions;
2121
use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
2222
use crate::errors::{ParquetError, Result};
2323
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
24+
use crate::util::async_util::BoxFuture;
2425
use bytes::Bytes;
25-
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
26+
use futures::{FutureExt, TryFutureExt};
2627
use object_store::{path::Path, ObjectStore};
2728
use object_store::{GetOptions, GetRange};
2829
use tokio::runtime::Handle;
@@ -59,6 +60,7 @@ pub struct ParquetObjectReader {
5960
metadata_size_hint: Option<usize>,
6061
preload_column_index: bool,
6162
preload_offset_index: bool,
63+
#[cfg_attr(feature = "async-no-send", allow(dead_code))]
6264
runtime: Option<Handle>,
6365
}
6466

@@ -132,6 +134,18 @@ impl ParquetObjectReader {
132134
}
133135
}
134136

137+
#[cfg(feature = "async-no-send")]
138+
fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
139+
where
140+
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
141+
+ 'static,
142+
O: 'static,
143+
E: Into<ParquetError> + 'static,
144+
{
145+
Box::pin(f(&self.store, &self.path).map_err(|e| e.into()))
146+
}
147+
148+
#[cfg(not(feature = "async-no-send"))]
135149
fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
136150
where
137151
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
@@ -144,18 +158,19 @@ impl ParquetObjectReader {
144158
Some(handle) => {
145159
let path = self.path.clone();
146160
let store = Arc::clone(&self.store);
147-
handle
148-
.spawn(async move { f(&store, &path).await })
149-
.map_ok_or_else(
150-
|e| match e.try_into_panic() {
151-
Err(e) => Err(ParquetError::External(Box::new(e))),
152-
Ok(p) => std::panic::resume_unwind(p),
153-
},
154-
|res| res.map_err(|e| e.into()),
155-
)
156-
.boxed()
161+
Box::pin(
162+
handle
163+
.spawn(async move { f(&store, &path).await })
164+
.map_ok_or_else(
165+
|e| match e.try_into_panic() {
166+
Err(e) => Err(ParquetError::External(Box::new(e))),
167+
Ok(p) => std::panic::resume_unwind(p),
168+
},
169+
|res| res.map_err(|e| e.into()),
170+
),
171+
)
157172
}
158-
None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
173+
None => Box::pin(f(&self.store, &self.path).map_err(|e| e.into())),
159174
}
160175
}
161176
}
@@ -181,10 +196,7 @@ impl AsyncFileReader for ParquetObjectReader {
181196
self.spawn(|store, path| store.get_range(path, range))
182197
}
183198

184-
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
185-
where
186-
Self: Send,
187-
{
199+
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
188200
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
189201
}
190202

parquet/src/arrow/async_writer/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,11 @@ use crate::{
6666
errors::{ParquetError, Result},
6767
file::{metadata::RowGroupMetaData, properties::WriterProperties},
6868
format::{FileMetaData, KeyValue},
69+
util::async_util::{BoxFuture, Send},
6970
};
7071
use arrow_array::RecordBatch;
7172
use arrow_schema::SchemaRef;
7273
use bytes::Bytes;
73-
use futures::future::BoxFuture;
74-
use futures::FutureExt;
7574
use std::mem;
7675
use tokio::io::{AsyncWrite, AsyncWriteExt};
7776

@@ -103,20 +102,18 @@ impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
103102

104103
impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
105104
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
106-
async move {
105+
Box::pin(async move {
107106
self.write_all(&bs).await?;
108107
Ok(())
109-
}
110-
.boxed()
108+
})
111109
}
112110

113111
fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
114-
async move {
112+
Box::pin(async move {
115113
self.flush().await?;
116114
self.shutdown().await?;
117115
Ok(())
118-
}
119-
.boxed()
116+
})
120117
}
121118
}
122119

parquet/src/arrow/async_writer/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
// under the License.
1717

1818
use bytes::Bytes;
19-
use futures::future::BoxFuture;
2019
use std::sync::Arc;
2120

2221
use crate::arrow::async_writer::AsyncFileWriter;
2322
use crate::errors::{ParquetError, Result};
23+
use crate::util::async_util::BoxFuture;
2424
use object_store::buffered::BufWriter;
2525
use object_store::path::Path;
2626
use object_store::ObjectStore;

parquet/src/util/async_util.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#[cfg(feature = "async-no-send")]
2+
mod send_impl {
3+
pub trait Send {}
4+
impl<T> Send for T {}
5+
pub type BoxFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;
6+
}
7+
8+
#[cfg(not(feature = "async-no-send"))]
9+
mod send_impl {
10+
pub trait Send: std::marker::Send {}
11+
impl<T> Send for T where T: std::marker::Send {}
12+
pub type BoxFuture<'a, T> = futures::future::BoxFuture<'a, T>;
13+
}
14+
15+
pub use send_impl::*;

parquet/src/util/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ pub(crate) mod interner;
2424
pub(crate) mod test_common;
2525
pub mod utf8;
2626

27+
#[cfg(feature = "async")]
28+
pub(crate) mod async_util;
29+
2730
#[cfg(any(test, feature = "test_common"))]
2831
pub use self::test_common::page_util::{
2932
DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,

0 commit comments

Comments
 (0)