Skip to content
Open
13 changes: 12 additions & 1 deletion arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,26 @@ sha256 = ["dep:sha2"]
small_decimals = []
avro_custom_types = ["dep:arrow-select"]

# Enable async APIs
async = ["futures"]
# Enable object_store integration
object_store = ["dep:object_store", "async"]
Comment on lines +48 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend updating the README.md and docs with details on these new features.


[dependencies]
arrow-schema = { workspace = true }
arrow-buffer = { workspace = true }
arrow-array = { workspace = true }
arrow-select = { workspace = true, optional = true }

object_store = { version = "0.12.0", default-features = false, optional = true }

bytes = { version = "1.11.0", default-features = false, features = ["std"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0.188", features = ["derive"] }
flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
], optional = true }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
snap = { version = "1.0", default-features = false, optional = true }
zstd = { version = "0.13", default-features = false, optional = true }
bzip2 = { version = "0.6.0", optional = true }
Expand All @@ -78,11 +88,12 @@ criterion = { workspace = true, default-features = false }
tempfile = "3.3"
arrow = { workspace = true }
futures = "0.3.31"
bytes = "1.10.1"
async-stream = "0.3.6"
apache-avro = "0.21.0"
num-bigint = "0.4"
object_store = { version = "0.12.0", default-features = false, features = ["fs"] }
once_cell = "1.21.3"
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] }

[[bench]]
name = "avro_reader"
Expand Down
18 changes: 18 additions & 0 deletions arrow-avro/src/reader/async_reader/async_file_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::reader::async_reader::DataFetchFutureBoxed;
use std::ops::Range;

/// A broad generic trait definition allowing fetching bytes from any source asynchronously.
/// This trait has very few limitations, mostly in regard to ownership and lifetime,
/// but it must return a boxed Future containing [`bytes::Bytes`] or an error.
Comment on lines +4 to +6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to provide examples on how to use this for the docs.

pub trait AsyncFileReader: Send + Unpin {
/// Fetch a range of bytes asynchronously using a custom reading method
fn fetch_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed;

/// Fetch a range that is beyond the originally provided file range,
/// such as reading the header before reading the file,
/// or fetching the remainder of the block in case the range ended before the block's end.
/// By default, this will simply point to the fetch_range function.
fn fetch_extra_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed {
self.fetch_range(range)
}
}
Comment on lines +7 to +18
Copy link
Contributor

@jecsand838 jecsand838 Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's your take on aligning this a bit more with the trait used in parquet and arrow/async_reader?

Suggested change
pub trait AsyncFileReader: Send + Unpin {
/// Fetch a range of bytes asynchronously using a custom reading method
fn fetch_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed;
/// Fetch a range that is beyond the originally provided file range,
/// such as reading the header before reading the file,
/// or fetching the remainder of the block in case the range ended before the block's end.
/// By default, this will simply point to the fetch_range function.
fn fetch_extra_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed {
self.fetch_range(range)
}
}
pub trait AsyncFileReader: Send {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
}

My thinking is this:

  1. The get_bytes trait method is just "fetch these bytes". It doesn't know or care whether the range is within some "expected" range. The out-of-band reads (header, partial block completion) could be a concern of the reader logic, not the I/O trait.
  2. Users already understand get_bytes / get_byte_ranges. Reusing that mental model reduces friction. Plus consistency across crates is generally a best practice.
  3. This would unlock a clean default impl for AsyncRead + AsyncSeek (like tokio::fs::File) the same way Parquet does . The current'static requirement forces all implementations to be fully owned or Arc-wrapped, which seems unnecessarily rigid for simple file readers.

149 changes: 149 additions & 0 deletions arrow-avro/src/reader/async_reader/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use crate::codec::AvroFieldBuilder;
use crate::reader::async_reader::ReaderState;
use crate::reader::header::{Header, HeaderDecoder};
use crate::reader::record::RecordDecoder;
use crate::reader::{AsyncAvroReader, AsyncFileReader, Decoder};
use crate::schema::{AvroSchema, FingerprintAlgorithm};
use arrow_schema::{ArrowError, SchemaRef};
use indexmap::IndexMap;
use std::ops::Range;

/// Builder for an asynchronous Avro file reader.
pub struct AsyncAvroReaderBuilder<R: AsyncFileReader> {
pub(super) reader: R,
pub(super) file_size: u64,
pub(super) schema: SchemaRef,
pub(super) batch_size: usize,
pub(super) range: Option<Range<u64>>,
pub(super) reader_schema: Option<AvroSchema>,
}

impl<R: AsyncFileReader> AsyncAvroReaderBuilder<R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but I'd consider naming this either AsyncFileReaderBuilder or AsyncOcfReaderBuilder

/// Specify a byte range to read from the Avro file.
/// If this is provided, the reader will read all the blocks within the specified range,
/// if the range ends mid-block, it will attempt to fetch the remaining bytes to complete the block,
/// but no further blocks will be read.
/// If this is omitted, the full file will be read.
pub fn with_range(self, range: Range<u64>) -> Self {
Self {
range: Some(range),
..self
}
}

/// Specify a reader schema to use when reading the Avro file.
/// This can be useful to project specific columns or handle schema evolution.
/// If this is not provided, the schema will be derived from the Arrow schema provided.
pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
Self {
reader_schema: Some(reader_schema),
..self
}
}

async fn read_header(&mut self) -> Result<(Header, u64), ArrowError> {
let mut decoder = HeaderDecoder::default();
let mut position = 0;
loop {
let range_to_fetch = position..(position + 64 * 1024).min(self.file_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for hardcoding position + 64 * 1024?

let current_data = self
.reader
.fetch_extra_range(range_to_fetch)
.await
.map_err(|err| {
ArrowError::AvroError(format!(
"Error fetching Avro header from object store: {err}"
))
})?;
if current_data.is_empty() {
break;
}
let read = current_data.len();
let decoded = decoder.decode(&current_data)?;
if decoded != read {
position += decoded as u64;
break;
}
position += read as u64;
}

decoder
.flush()
.map(|header| (header, position))
.ok_or_else(|| ArrowError::AvroError("Unexpected EOF while reading Avro header".into()))
}

/// Build the asynchronous Avro reader with the provided parameters.
/// This reads the header first to initialize the reader state.
pub async fn try_build(mut self) -> Result<AsyncAvroReader<R>, ArrowError> {
if self.file_size == 0 {
return Err(ArrowError::AvroError("File size cannot be 0".into()));
}

// Start by reading the header from the beginning of the avro file
let (header, header_len) = self.read_header().await?;
let writer_schema = header
.schema()
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?
.ok_or_else(|| {
ArrowError::ParseError("No Avro schema present in file header".into())
})?;

let root = {
let field_builder = AvroFieldBuilder::new(&writer_schema);
match self.reader_schema.as_ref() {
None => {
let devised_avro_schema = AvroSchema::try_from(self.schema.as_ref())?;
let devised_reader_schema = devised_avro_schema.schema()?;
field_builder
.with_reader_schema(&devised_reader_schema)
.build()
}
Comment on lines +95 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just execute field_builder.build without a reader_schema in this case?

The Reader treats this scenario as one where the caller simply wants to decode an OCF file without schema resolution, purely using the writer_schema.

Some(provided_schema) => {
let reader_schema = provided_schema.schema()?;
field_builder.with_reader_schema(&reader_schema).build()
}
}
}?;

let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;

let decoder = Decoder::from_parts(
self.batch_size,
record_decoder,
None,
IndexMap::new(),
FingerprintAlgorithm::Rabin,
);
let range = match self.range {
Some(r) => {
// If this PartitionedFile's range starts at 0, we need to skip the header bytes.
// But then we need to seek back 16 bytes to include the sync marker for the first block,
// as the logic in this reader searches the data for the first sync marker(after which a block starts),
// then reads blocks from the count, size etc.
let start = r.start.max(header_len.checked_sub(16).unwrap());
let end = r.end.max(start).min(self.file_size); // Ensure end is not less than start, worst case range is empty
start..end
}
None => 0..self.file_size,
};

let reader_state = if range.start == range.end {
ReaderState::Finished
} else {
ReaderState::Idle
};
let codec = header.compression()?;
let sync_marker = header.sync();

Ok(AsyncAvroReader::new(
self.reader,
range,
self.file_size,
decoder,
codec,
sync_marker,
reader_state,
))
}
}
Loading