Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 142 additions & 36 deletions arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@
//!
//! let file = File::open("test/data/basic.json").unwrap();
//!
//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None);
//! let mut json = json::Reader::new(
//! BufReader::new(file),
//! Arc::new(schema),
//! 1024,
//! Default::default()
//! );
//!
//! let batch = json.next().unwrap().unwrap();
//! ```

Expand All @@ -55,6 +61,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::util::bit_util;
use crate::util::reader_parser::Parser;
use crate::{array::*, buffer::Buffer};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -563,7 +570,7 @@ where
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
/// let batch_size = 1024;
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None);
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
///
/// // seek back to start so that the original file is usable again
/// reader.seek(SeekFrom::Start(0)).unwrap();
Expand All @@ -576,31 +583,35 @@ where
pub struct Decoder {
/// Explicit schema for the JSON file
schema: SchemaRef,
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
/// Batch size (number of records to load each time)
batch_size: usize,
/// This is a collection of options for json decoder
doptions: DecoderOptions,
}

#[derive(Default, Debug)]
pub struct DecoderOptions {
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
/// optional HashMap of column names to its format string
format_strings: Option<HashMap<String, String>>,
}

impl Decoder {
/// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
/// trait.
pub fn new(
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
Self {
schema,
projection,
batch_size,
doptions,
}
}

/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
match &self.projection {
match &self.doptions.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> = fields
Expand Down Expand Up @@ -645,7 +656,7 @@ impl Decoder {
}

let rows = &rows[..];
let projection = self.projection.clone().unwrap_or_default();
let projection = self.doptions.projection.clone().unwrap_or_default();
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);

let projected_fields: Vec<Field> = if projection.is_empty() {
Expand Down Expand Up @@ -913,7 +924,7 @@ impl Decoder {
}

#[allow(clippy::unnecessary_wraps)]
fn build_primitive_array<T: ArrowPrimitiveType>(
fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
&self,
rows: &[Value],
col_name: &str,
Expand All @@ -922,20 +933,30 @@ impl Decoder {
T: ArrowNumericType,
T::Native: num::NumCast,
{
let format_string = self
.doptions
.format_strings
.as_ref()
.and_then(|fmts| fmts.get(col_name));
Ok(Arc::new(
rows.iter()
.map(|row| {
row.get(&col_name)
.and_then(|value| {
if value.is_i64() {
value.as_i64().map(num::cast::cast)
} else if value.is_u64() {
value.as_u64().map(num::cast::cast)
} else {
value.as_f64().map(num::cast::cast)
row.get(&col_name).and_then(|value| {
if value.is_i64() {
value.as_i64().and_then(num::cast::cast)
} else if value.is_u64() {
value.as_u64().and_then(num::cast::cast)
} else if value.is_string() {
match format_string {
Some(fmt) => {
T::parse_formatted(value.as_str().unwrap(), fmt)
}
None => T::parse(value.as_str().unwrap()),
}
})
.flatten()
} else {
value.as_f64().and_then(num::cast::cast)
}
})
})
.collect::<PrimitiveArray<T>>(),
))
Expand Down Expand Up @@ -1539,9 +1560,9 @@ impl<R: Read> Reader<R> {
reader: R,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
doptions: DecoderOptions,
) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
}

/// Create a new JSON Reader from a `BufReader<R: Read>`
Expand All @@ -1551,11 +1572,11 @@ impl<R: Read> Reader<R> {
reader: BufReader<R>,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
doptions: DecoderOptions,
) -> Self {
Self {
reader,
decoder: Decoder::new(schema, batch_size, projection),
decoder: Decoder::new(schema, batch_size, doptions),
}
}

Expand Down Expand Up @@ -1591,6 +1612,8 @@ pub struct ReaderBuilder {
batch_size: usize,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<String>>,
/// optional HashMap of column names to format strings
format_strings: Option<HashMap<String, String>>,
}

impl Default for ReaderBuilder {
Expand All @@ -1600,6 +1623,7 @@ impl Default for ReaderBuilder {
max_records: None,
batch_size: 1024,
projection: None,
format_strings: None,
}
}
}
Expand Down Expand Up @@ -1658,6 +1682,15 @@ impl ReaderBuilder {
self
}

/// Set the decoder's format Strings param
pub fn with_format_strings(
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.format_strings = Some(format_strings);
self
}

/// Create a new `Reader` from the `ReaderBuilder`
pub fn build<R>(self, source: R) -> Result<Reader<R>>
where
Expand All @@ -1678,7 +1711,10 @@ impl ReaderBuilder {
buf_reader,
schema,
self.batch_size,
self.projection,
DecoderOptions {
projection: self.projection,
format_strings: self.format_strings,
},
))
}
}
Expand Down Expand Up @@ -1711,7 +1747,7 @@ mod tests {
.unwrap();
let batch = reader.next().unwrap().unwrap();

assert_eq!(4, batch.num_columns());
assert_eq!(5, batch.num_columns());
assert_eq!(12, batch.num_rows());

let schema = reader.schema();
Expand Down Expand Up @@ -1833,7 +1869,7 @@ mod tests {
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
1024,
None,
Default::default(),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, Arc::new(schema));
Expand Down Expand Up @@ -1870,6 +1906,41 @@ mod tests {
assert_eq!(-3.5, bb.value(1));
}

#[test]
fn test_json_format_strings_for_date() {
let schema =
Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
let e = schema.column_with_name("e").unwrap();
assert_eq!(&DataType::Date32, e.1.data_type());
let mut fmts = HashMap::new();
let date_format = "%Y-%m-%d".to_string();
fmts.insert("e".to_string(), date_format.clone());

let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
schema.clone(),
1024,
DecoderOptions {
format_strings: Some(fmts),
..Default::default()
},
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, schema);
let batch = reader.next().unwrap().unwrap();

let ee = batch
.column(e.0)
.as_any()
.downcast_ref::<Date32Array>()
.unwrap();
let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
assert_eq!(dt, ee.value(0));
let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
assert_eq!(dt, ee.value(1));
assert!(!ee.is_valid(2));
}

#[test]
fn test_json_basic_schema_projection() {
// We test implicit and explicit projection:
Expand All @@ -1885,7 +1956,10 @@ mod tests {
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
1024,
Some(vec!["a".to_string(), "c".to_string()]),
DecoderOptions {
projection: Some(vec!["a".to_string(), "c".to_string()]),
..Default::default()
},
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -2052,7 +2126,8 @@ mod tests {
file.seek(SeekFrom::Start(0)).unwrap();

let reader = BufReader::new(GzDecoder::new(&file));
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None);
let mut reader =
Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
let batch_gz = reader.next().unwrap().unwrap();

for batch in vec![batch, batch_gz] {
Expand Down Expand Up @@ -3081,6 +3156,37 @@ mod tests {
assert_eq!(5, aa.value(7));
}

#[test]
fn test_time_from_string() {
parse_string_column::<Time64NanosecondType>(4);
parse_string_column::<Time64MicrosecondType>(4);
parse_string_column::<Time32MillisecondType>(4);
parse_string_column::<Time32SecondType>(4);
}

fn parse_string_column<T>(value: T::Native)
where
T: ArrowPrimitiveType,
{
let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]);

let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();

let batch = reader.next().unwrap().unwrap();
let dd = batch
.column(0)
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap();
assert_eq!(value, dd.value(1));
assert!(!dd.is_valid(2));
}

#[test]
fn test_json_read_nested_list() {
let schema = Schema::new(vec![Field::new(
Expand All @@ -3093,7 +3199,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
&mut vec![
Expand Down Expand Up @@ -3128,7 +3234,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
// NOTE: total struct element count needs to be greater than
Expand Down Expand Up @@ -3157,7 +3263,7 @@ mod tests {
#[test]
fn test_json_read_binary_structs() {
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
&mut vec![
Expand Down Expand Up @@ -3200,7 +3306,7 @@ mod tests {
let mut sum_a = 0;
for batch in reader {
let batch = batch.unwrap();
assert_eq!(4, batch.num_columns());
assert_eq!(5, batch.num_columns());
sum_num_rows += batch.num_rows();
num_batches += 1;
let batch_schema = batch.schema();
Expand Down
Loading