Skip to content

Commit 68038f5

Browse files
sum12alamb
andauthored
Allow json reader/decoder to work with format_strings for each field (#1451)
* implement parser for remaining types used by json decoder * added format strings (hashmap) to json reader the format_string map's key is column name. The value will be used to parse the date64/date32 types from json if the read value is of string type add tests for formatted parser for date{32,64}type for json readers all-parsers start fixup! added format strings (hashmap) to json reader * add DecoderOptions struct for holding options for decoder that way later extensions to the decoder can be added to this struct without breaking API. * Fixup some comments * added test for string parsing json reader for time{32,64} types Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent b12f7cd commit 68038f5

File tree

3 files changed

+171
-53
lines changed

3 files changed

+171
-53
lines changed

arrow/src/json/reader.rs

Lines changed: 142 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@
3838
//!
3939
//! let file = File::open("test/data/basic.json").unwrap();
4040
//!
41-
//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None);
41+
//! let mut json = json::Reader::new(
42+
//! BufReader::new(file),
43+
//! Arc::new(schema),
44+
//! 1024,
45+
//! Default::default()
46+
//! );
47+
//!
4248
//! let batch = json.next().unwrap().unwrap();
4349
//! ```
4450
@@ -55,6 +61,7 @@ use crate::datatypes::*;
5561
use crate::error::{ArrowError, Result};
5662
use crate::record_batch::RecordBatch;
5763
use crate::util::bit_util;
64+
use crate::util::reader_parser::Parser;
5865
use crate::{array::*, buffer::Buffer};
5966

6067
#[derive(Debug, Clone)]
@@ -563,7 +570,7 @@ where
563570
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
564571
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
565572
/// let batch_size = 1024;
566-
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None);
573+
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
567574
///
568575
/// // seek back to start so that the original file is usable again
569576
/// reader.seek(SeekFrom::Start(0)).unwrap();
@@ -576,31 +583,35 @@ where
576583
pub struct Decoder {
577584
/// Explicit schema for the JSON file
578585
schema: SchemaRef,
579-
/// Optional projection for which columns to load (case-sensitive names)
580-
projection: Option<Vec<String>>,
581586
/// Batch size (number of records to load each time)
582587
batch_size: usize,
588+
/// This is a collection of options for json decoder
589+
doptions: DecoderOptions,
590+
}
591+
592+
#[derive(Default, Debug)]
593+
pub struct DecoderOptions {
594+
/// Optional projection for which columns to load (case-sensitive names)
595+
projection: Option<Vec<String>>,
596+
/// optional HashMap of column names to its format string
597+
format_strings: Option<HashMap<String, String>>,
583598
}
584599

585600
impl Decoder {
586601
/// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
587602
/// trait.
588-
pub fn new(
589-
schema: SchemaRef,
590-
batch_size: usize,
591-
projection: Option<Vec<String>>,
592-
) -> Self {
603+
pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
593604
Self {
594605
schema,
595-
projection,
596606
batch_size,
607+
doptions,
597608
}
598609
}
599610

600611
/// Returns the schema of the reader, useful for getting the schema without reading
601612
/// record batches
602613
pub fn schema(&self) -> SchemaRef {
603-
match &self.projection {
614+
match &self.doptions.projection {
604615
Some(projection) => {
605616
let fields = self.schema.fields();
606617
let projected_fields: Vec<Field> = fields
@@ -645,7 +656,7 @@ impl Decoder {
645656
}
646657

647658
let rows = &rows[..];
648-
let projection = self.projection.clone().unwrap_or_default();
659+
let projection = self.doptions.projection.clone().unwrap_or_default();
649660
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);
650661

651662
let projected_fields: Vec<Field> = if projection.is_empty() {
@@ -913,7 +924,7 @@ impl Decoder {
913924
}
914925

915926
#[allow(clippy::unnecessary_wraps)]
916-
fn build_primitive_array<T: ArrowPrimitiveType>(
927+
fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
917928
&self,
918929
rows: &[Value],
919930
col_name: &str,
@@ -922,20 +933,30 @@ impl Decoder {
922933
T: ArrowNumericType,
923934
T::Native: num::NumCast,
924935
{
936+
let format_string = self
937+
.doptions
938+
.format_strings
939+
.as_ref()
940+
.and_then(|fmts| fmts.get(col_name));
925941
Ok(Arc::new(
926942
rows.iter()
927943
.map(|row| {
928-
row.get(&col_name)
929-
.and_then(|value| {
930-
if value.is_i64() {
931-
value.as_i64().map(num::cast::cast)
932-
} else if value.is_u64() {
933-
value.as_u64().map(num::cast::cast)
934-
} else {
935-
value.as_f64().map(num::cast::cast)
944+
row.get(&col_name).and_then(|value| {
945+
if value.is_i64() {
946+
value.as_i64().and_then(num::cast::cast)
947+
} else if value.is_u64() {
948+
value.as_u64().and_then(num::cast::cast)
949+
} else if value.is_string() {
950+
match format_string {
951+
Some(fmt) => {
952+
T::parse_formatted(value.as_str().unwrap(), fmt)
953+
}
954+
None => T::parse(value.as_str().unwrap()),
936955
}
937-
})
938-
.flatten()
956+
} else {
957+
value.as_f64().and_then(num::cast::cast)
958+
}
959+
})
939960
})
940961
.collect::<PrimitiveArray<T>>(),
941962
))
@@ -1539,9 +1560,9 @@ impl<R: Read> Reader<R> {
15391560
reader: R,
15401561
schema: SchemaRef,
15411562
batch_size: usize,
1542-
projection: Option<Vec<String>>,
1563+
doptions: DecoderOptions,
15431564
) -> Self {
1544-
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
1565+
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
15451566
}
15461567

15471568
/// Create a new JSON Reader from a `BufReader<R: Read>`
@@ -1551,11 +1572,11 @@ impl<R: Read> Reader<R> {
15511572
reader: BufReader<R>,
15521573
schema: SchemaRef,
15531574
batch_size: usize,
1554-
projection: Option<Vec<String>>,
1575+
doptions: DecoderOptions,
15551576
) -> Self {
15561577
Self {
15571578
reader,
1558-
decoder: Decoder::new(schema, batch_size, projection),
1579+
decoder: Decoder::new(schema, batch_size, doptions),
15591580
}
15601581
}
15611582

@@ -1591,6 +1612,8 @@ pub struct ReaderBuilder {
15911612
batch_size: usize,
15921613
/// Optional projection for which columns to load (zero-based column indices)
15931614
projection: Option<Vec<String>>,
1615+
/// optional HashMap of column names to format strings
1616+
format_strings: Option<HashMap<String, String>>,
15941617
}
15951618

15961619
impl Default for ReaderBuilder {
@@ -1600,6 +1623,7 @@ impl Default for ReaderBuilder {
16001623
max_records: None,
16011624
batch_size: 1024,
16021625
projection: None,
1626+
format_strings: None,
16031627
}
16041628
}
16051629
}
@@ -1658,6 +1682,15 @@ impl ReaderBuilder {
16581682
self
16591683
}
16601684

1685+
/// Set the decoder's format Strings param
1686+
pub fn with_format_strings(
1687+
mut self,
1688+
format_strings: HashMap<String, String>,
1689+
) -> Self {
1690+
self.format_strings = Some(format_strings);
1691+
self
1692+
}
1693+
16611694
/// Create a new `Reader` from the `ReaderBuilder`
16621695
pub fn build<R>(self, source: R) -> Result<Reader<R>>
16631696
where
@@ -1678,7 +1711,10 @@ impl ReaderBuilder {
16781711
buf_reader,
16791712
schema,
16801713
self.batch_size,
1681-
self.projection,
1714+
DecoderOptions {
1715+
projection: self.projection,
1716+
format_strings: self.format_strings,
1717+
},
16821718
))
16831719
}
16841720
}
@@ -1711,7 +1747,7 @@ mod tests {
17111747
.unwrap();
17121748
let batch = reader.next().unwrap().unwrap();
17131749

1714-
assert_eq!(4, batch.num_columns());
1750+
assert_eq!(5, batch.num_columns());
17151751
assert_eq!(12, batch.num_rows());
17161752

17171753
let schema = reader.schema();
@@ -1833,7 +1869,7 @@ mod tests {
18331869
File::open("test/data/basic.json").unwrap(),
18341870
Arc::new(schema.clone()),
18351871
1024,
1836-
None,
1872+
Default::default(),
18371873
);
18381874
let reader_schema = reader.schema();
18391875
assert_eq!(reader_schema, Arc::new(schema));
@@ -1870,6 +1906,41 @@ mod tests {
18701906
assert_eq!(-3.5, bb.value(1));
18711907
}
18721908

1909+
#[test]
1910+
fn test_json_format_strings_for_date() {
1911+
let schema =
1912+
Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
1913+
let e = schema.column_with_name("e").unwrap();
1914+
assert_eq!(&DataType::Date32, e.1.data_type());
1915+
let mut fmts = HashMap::new();
1916+
let date_format = "%Y-%m-%d".to_string();
1917+
fmts.insert("e".to_string(), date_format.clone());
1918+
1919+
let mut reader: Reader<File> = Reader::new(
1920+
File::open("test/data/basic.json").unwrap(),
1921+
schema.clone(),
1922+
1024,
1923+
DecoderOptions {
1924+
format_strings: Some(fmts),
1925+
..Default::default()
1926+
},
1927+
);
1928+
let reader_schema = reader.schema();
1929+
assert_eq!(reader_schema, schema);
1930+
let batch = reader.next().unwrap().unwrap();
1931+
1932+
let ee = batch
1933+
.column(e.0)
1934+
.as_any()
1935+
.downcast_ref::<Date32Array>()
1936+
.unwrap();
1937+
let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
1938+
assert_eq!(dt, ee.value(0));
1939+
let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
1940+
assert_eq!(dt, ee.value(1));
1941+
assert!(!ee.is_valid(2));
1942+
}
1943+
18731944
#[test]
18741945
fn test_json_basic_schema_projection() {
18751946
// We test implicit and explicit projection:
@@ -1885,7 +1956,10 @@ mod tests {
18851956
File::open("test/data/basic.json").unwrap(),
18861957
Arc::new(schema),
18871958
1024,
1888-
Some(vec!["a".to_string(), "c".to_string()]),
1959+
DecoderOptions {
1960+
projection: Some(vec!["a".to_string(), "c".to_string()]),
1961+
..Default::default()
1962+
},
18891963
);
18901964
let reader_schema = reader.schema();
18911965
let expected_schema = Arc::new(Schema::new(vec![
@@ -2052,7 +2126,8 @@ mod tests {
20522126
file.seek(SeekFrom::Start(0)).unwrap();
20532127

20542128
let reader = BufReader::new(GzDecoder::new(&file));
2055-
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None);
2129+
let mut reader =
2130+
Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
20562131
let batch_gz = reader.next().unwrap().unwrap();
20572132

20582133
for batch in vec![batch, batch_gz] {
@@ -3081,6 +3156,37 @@ mod tests {
30813156
assert_eq!(5, aa.value(7));
30823157
}
30833158

3159+
#[test]
3160+
fn test_time_from_string() {
3161+
parse_string_column::<Time64NanosecondType>(4);
3162+
parse_string_column::<Time64MicrosecondType>(4);
3163+
parse_string_column::<Time32MillisecondType>(4);
3164+
parse_string_column::<Time32SecondType>(4);
3165+
}
3166+
3167+
fn parse_string_column<T>(value: T::Native)
3168+
where
3169+
T: ArrowPrimitiveType,
3170+
{
3171+
let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]);
3172+
3173+
let builder = ReaderBuilder::new()
3174+
.with_schema(Arc::new(schema))
3175+
.with_batch_size(64);
3176+
let mut reader: Reader<File> = builder
3177+
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
3178+
.unwrap();
3179+
3180+
let batch = reader.next().unwrap().unwrap();
3181+
let dd = batch
3182+
.column(0)
3183+
.as_any()
3184+
.downcast_ref::<PrimitiveArray<T>>()
3185+
.unwrap();
3186+
assert_eq!(value, dd.value(1));
3187+
assert!(!dd.is_valid(2));
3188+
}
3189+
30843190
#[test]
30853191
fn test_json_read_nested_list() {
30863192
let schema = Schema::new(vec![Field::new(
@@ -3093,7 +3199,7 @@ mod tests {
30933199
true,
30943200
)]);
30953201

3096-
let decoder = Decoder::new(Arc::new(schema), 1024, None);
3202+
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
30973203
let batch = decoder
30983204
.next_batch(
30993205
&mut vec![
@@ -3128,7 +3234,7 @@ mod tests {
31283234
true,
31293235
)]);
31303236

3131-
let decoder = Decoder::new(Arc::new(schema), 1024, None);
3237+
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
31323238
let batch = decoder
31333239
.next_batch(
31343240
// NOTE: total struct element count needs to be greater than
@@ -3157,7 +3263,7 @@ mod tests {
31573263
#[test]
31583264
fn test_json_read_binary_structs() {
31593265
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
3160-
let decoder = Decoder::new(Arc::new(schema), 1024, None);
3266+
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
31613267
let batch = decoder
31623268
.next_batch(
31633269
&mut vec![
@@ -3200,7 +3306,7 @@ mod tests {
32003306
let mut sum_a = 0;
32013307
for batch in reader {
32023308
let batch = batch.unwrap();
3203-
assert_eq!(4, batch.num_columns());
3309+
assert_eq!(5, batch.num_columns());
32043310
sum_num_rows += batch.num_rows();
32053311
num_batches += 1;
32063312
let batch_schema = batch.schema();

0 commit comments

Comments
 (0)