diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 7a0af3ff0809..9cae6675e825 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -423,9 +423,8 @@ impl CsvSerializer { } } -#[async_trait] impl BatchSerializer for CsvSerializer { - async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result { + fn serialize(&self, batch: RecordBatch, initial: bool) -> Result { let mut buffer = Vec::with_capacity(4096); let builder = self.builder.clone(); let header = self.header && initial; @@ -829,7 +828,7 @@ mod tests { .await?; let batch = concat_batches(&batches[0].schema(), &batches)?; let serializer = CsvSerializer::new(); - let bytes = serializer.serialize(batch, true).await?; + let bytes = serializer.serialize(batch, true)?; assert_eq!( "c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n", String::from_utf8(bytes.into()).unwrap() @@ -853,7 +852,7 @@ mod tests { .await?; let batch = concat_batches(&batches[0].schema(), &batches)?; let serializer = CsvSerializer::new().with_header(false); - let bytes = serializer.serialize(batch, true).await?; + let bytes = serializer.serialize(batch, true)?; assert_eq!( "2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n", String::from_utf8(bytes.into()).unwrap() diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 8c02955ad363..0f6d3648d120 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -204,9 +204,8 @@ impl JsonSerializer { } } -#[async_trait] impl BatchSerializer for JsonSerializer { - async fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result { + fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result { let mut buffer = Vec::with_capacity(4096); let mut writer = json::LineDelimitedWriter::new(&mut buffer); writer.write(&batch)?; diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index c481f2accf19..410a32a19cc1 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -29,7 +29,6 @@ use crate::error::Result; use arrow_array::RecordBatch; use datafusion_common::DataFusionError; -use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; use object_store::path::Path; @@ -144,12 +143,11 @@ impl AsyncWrite for AbortableWrite { } /// A trait that defines the methods required for a RecordBatch serializer. -#[async_trait] pub trait BatchSerializer: Sync + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. /// Parameter `initial` signals whether the given batch is the first batch. /// This distinction is important for certain serializers (like CSV). - async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result; + fn serialize(&self, batch: RecordBatch, initial: bool) -> Result; } /// Returns an [`AbortableWrite`] which writes to the given object store location diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 9b820a15b280..106b4e0d50e5 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -60,7 +60,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store( let serializer_clone = serializer.clone(); let handle = tokio::spawn(async move { let num_rows = batch.num_rows(); - let bytes = serializer_clone.serialize(batch, initial).await?; + let bytes = serializer_clone.serialize(batch, initial)?; Ok((num_rows, bytes)) }); if initial { diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index bb4c8313642c..353662397648 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -535,7 +535,6 @@ mod tests { use arrow_schema::Schema; use datafusion_common::{internal_err, DataFusionError, Statistics}; - use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; @@ -989,9 +988,8 @@ mod tests { bytes: Bytes, } - #[async_trait] impl BatchSerializer for TestSerializer { - async fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result { + fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result { Ok(self.bytes.clone()) } }