Skip to content

Serialize function signature simplifications #8802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,8 @@ impl CsvSerializer {
}
}

#[async_trait]
impl BatchSerializer for CsvSerializer {
async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes> {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let builder = self.builder.clone();
let header = self.header && initial;
Expand Down Expand Up @@ -829,7 +828,7 @@ mod tests {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let serializer = CsvSerializer::new();
let bytes = serializer.serialize(batch, true).await?;
let bytes = serializer.serialize(batch, true)?;
assert_eq!(
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
Expand All @@ -853,7 +852,7 @@ mod tests {
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let serializer = CsvSerializer::new().with_header(false);
let bytes = serializer.serialize(batch, true).await?;
let bytes = serializer.serialize(batch, true)?;
assert_eq!(
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ impl JsonSerializer {
}
}

#[async_trait]
impl BatchSerializer for JsonSerializer {
async fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
fn serialize(&self, batch: RecordBatch, _initial: bool) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(4096);
let mut writer = json::LineDelimitedWriter::new(&mut buffer);
writer.write(&batch)?;
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/file_format/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::error::Result;
use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::path::Path;
Expand Down Expand Up @@ -144,12 +143,11 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
}

/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
/// Parameter `initial` signals whether the given batch is the first batch.
/// This distinction is important for certain serializers (like CSV).
async fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}

/// Returns an [`AbortableWrite`] which writes to the given object store location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
let serializer_clone = serializer.clone();
let handle = tokio::spawn(async move {
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch, initial).await?;
let bytes = serializer_clone.serialize(batch, initial)?;
Ok((num_rows, bytes))
});
if initial {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ mod tests {
use arrow_schema::Schema;
use datafusion_common::{internal_err, DataFusionError, Statistics};

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;

Expand Down Expand Up @@ -989,9 +988,8 @@ mod tests {
bytes: Bytes,
}

#[async_trait]
impl BatchSerializer for TestSerializer {
async fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result<Bytes> {
fn serialize(&self, _batch: RecordBatch, _initial: bool) -> Result<Bytes> {
Ok(self.bytes.clone())
}
}
Expand Down