Skip to content

Commit

Permalink
Serialize function signature simplifications (#8802)
Browse files Browse the repository at this point in the history
* Remove async from batch serializer

* Clippy

* Update orchestration.rs
  • Loading branch information
metesynnada authored Jan 12, 2024
1 parent d9a1d42 commit 1c49152
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 13 deletions.
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,8 @@ impl CsvSerializer {
}
}

#[async_trait]
impl BatchSerializer for CsvSerializer {
async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes> {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let builder = self.builder.clone();
let header = self.header && initial;
Expand Down Expand Up @@ -829,7 +828,7 @@ mod tests {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let serializer = CsvSerializer::new();
let bytes = serializer.serialize(batch, true).await?;
let bytes = serializer.serialize(batch, true)?;
assert_eq!(
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
Expand All @@ -853,7 +852,7 @@ mod tests {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let serializer = CsvSerializer::new().with_header(false);
let bytes = serializer.serialize(batch, true).await?;
let bytes = serializer.serialize(batch, true)?;
assert_eq!(
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ impl JsonSerializer {
}
}

#[async_trait]
impl BatchSerializer for JsonSerializer {
async fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let mut writer = json::LineDelimitedWriter::new(&mut buffer);
writer.write(&batch)?;
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/file_format/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::error::Result;
use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::path::Path;
Expand Down Expand Up @@ -144,12 +143,11 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
}

/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
/// Parameter `initial` signals whether the given batch is the first batch.
/// This distinction is important for certain serializers (like CSV).
async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}

/// Returns an [`AbortableWrite`] which writes to the given object store location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
let serializer_clone = serializer.clone();
let handle = tokio::spawn(async move {
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch, initial).await?;
let bytes = serializer_clone.serialize(batch, initial)?;
Ok((num_rows, bytes))
});
if initial {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ mod tests {
use arrow_schema::Schema;
use datafusion_common::{internal_err, DataFusionError, Statistics};

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;

Expand Down Expand Up @@ -989,9 +988,8 @@ mod tests {
bytes: Bytes,
}

#[async_trait]
impl BatchSerializer for TestSerializer {
async fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result<Bytes> {
fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result<Bytes> {
Ok(self.bytes.clone())
}
}
Expand Down

0 comments on commit 1c49152

Please sign in to comment.