Skip to content

Commit d39e76c

Browse files
committed
added format strings (hashmap) to json reader
the format_string map's key is column name. The value will be used to parse the date64/date32 types from json if the read value is of string type add tests for formatted parser for date{32,64}type for json readers
1 parent 717216f commit d39e76c

File tree

2 files changed

+121
-20
lines changed

2 files changed

+121
-20
lines changed

arrow/src/json/reader.rs

Lines changed: 117 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
//!
3939
//! let file = File::open("test/data/basic.json").unwrap();
4040
//!
41-
//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None);
41+
//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None, None);
4242
//! let batch = json.next().unwrap().unwrap();
4343
//! ```
4444
@@ -55,6 +55,7 @@ use crate::datatypes::*;
5555
use crate::error::{ArrowError, Result};
5656
use crate::record_batch::RecordBatch;
5757
use crate::util::bit_util;
58+
use crate::util::reader_parser::Parser;
5859
use crate::{array::*, buffer::Buffer};
5960

6061
#[derive(Debug, Clone)]
@@ -563,7 +564,7 @@ where
563564
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
564565
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
565566
/// let batch_size = 1024;
566-
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None);
567+
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None, None);
567568
///
568569
/// // seek back to start so that the original file is usable again
569570
/// reader.seek(SeekFrom::Start(0)).unwrap();
@@ -580,6 +581,8 @@ pub struct Decoder {
580581
projection: Option<Vec<String>>,
581582
/// Batch size (number of records to load each time)
582583
batch_size: usize,
584+
/// optional HashMap of column names to its format strings
585+
format_strings: Option<HashMap<String, String>>,
583586
}
584587

585588
impl Decoder {
@@ -589,11 +592,13 @@ impl Decoder {
589592
schema: SchemaRef,
590593
batch_size: usize,
591594
projection: Option<Vec<String>>,
595+
format_strings: Option<HashMap<String, String>>,
592596
) -> Self {
593597
Self {
594598
schema,
595599
projection,
596600
batch_size,
601+
format_strings,
597602
}
598603
}
599604

@@ -941,6 +946,29 @@ impl Decoder {
941946
))
942947
}
943948

949+
fn build_primitive_array_using_format_string<T>(
950+
&self,
951+
rows: &[Value],
952+
col_name: &str,
953+
format_string: &str,
954+
) -> Result<ArrayRef>
955+
where
956+
T: ArrowNumericType + Parser,
957+
T: Parser,
958+
{
959+
Ok(Arc::new(
960+
rows.iter()
961+
.map(|row| {
962+
row.get(&col_name).and_then(|value| {
963+
value
964+
.as_str()
965+
.and_then(|value| T::parse_formatted(value, format_string))
966+
})
967+
})
968+
.collect::<PrimitiveArray<T>>(),
969+
))
970+
}
971+
944972
/// Build a nested GenericListArray from a list of unnested `Value`s
945973
fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
946974
&self,
@@ -1122,6 +1150,10 @@ impl Decoder {
11221150
.iter()
11231151
.filter(|field| projection.is_empty() || projection.contains(field.name()))
11241152
.map(|field| {
1153+
let format_string = self
1154+
.format_strings
1155+
.as_ref()
1156+
.and_then(|fmts| fmts.get(field.name()));
11251157
match field.data_type() {
11261158
DataType::Null => {
11271159
Ok(Arc::new(NullArray::new(rows.len())) as ArrayRef)
@@ -1180,12 +1212,24 @@ impl Decoder {
11801212
field.name(),
11811213
),
11821214
},
1183-
DataType::Date64 => {
1184-
self.build_primitive_array::<Date64Type>(rows, field.name())
1185-
}
1186-
DataType::Date32 => {
1187-
self.build_primitive_array::<Date32Type>(rows, field.name())
1188-
}
1215+
DataType::Date64 => match format_string {
1216+
Some(format_string) => self
1217+
.build_primitive_array_using_format_string::<Date64Type>(
1218+
rows,
1219+
field.name(),
1220+
format_string.as_str(),
1221+
),
1222+
_ => self.build_primitive_array::<Date64Type>(rows, field.name()),
1223+
},
1224+
DataType::Date32 => match format_string {
1225+
Some(format_string) => self
1226+
.build_primitive_array_using_format_string::<Date32Type>(
1227+
rows,
1228+
field.name(),
1229+
format_string.as_str(),
1230+
),
1231+
_ => self.build_primitive_array::<Date32Type>(rows, field.name()),
1232+
},
11891233
DataType::Time64(unit) => match unit {
11901234
TimeUnit::Microsecond => self
11911235
.build_primitive_array::<Time64MicrosecondType>(
@@ -1540,8 +1584,15 @@ impl<R: Read> Reader<R> {
15401584
schema: SchemaRef,
15411585
batch_size: usize,
15421586
projection: Option<Vec<String>>,
1587+
format_strings: Option<HashMap<String, String>>,
15431588
) -> Self {
1544-
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
1589+
Self::from_buf_reader(
1590+
BufReader::new(reader),
1591+
schema,
1592+
batch_size,
1593+
projection,
1594+
format_strings,
1595+
)
15451596
}
15461597

15471598
/// Create a new JSON Reader from a `BufReader<R: Read>`
@@ -1552,10 +1603,11 @@ impl<R: Read> Reader<R> {
15521603
schema: SchemaRef,
15531604
batch_size: usize,
15541605
projection: Option<Vec<String>>,
1606+
format_strings: Option<HashMap<String, String>>,
15551607
) -> Self {
15561608
Self {
15571609
reader,
1558-
decoder: Decoder::new(schema, batch_size, projection),
1610+
decoder: Decoder::new(schema, batch_size, projection, format_strings),
15591611
}
15601612
}
15611613

@@ -1591,6 +1643,8 @@ pub struct ReaderBuilder {
15911643
batch_size: usize,
15921644
/// Optional projection for which columns to load (zero-based column indices)
15931645
projection: Option<Vec<String>>,
1646+
/// optional HashMap of column names to its format strings
1647+
format_strings: Option<HashMap<String, String>>,
15941648
}
15951649

15961650
impl Default for ReaderBuilder {
@@ -1600,6 +1654,7 @@ impl Default for ReaderBuilder {
16001654
max_records: None,
16011655
batch_size: 1024,
16021656
projection: None,
1657+
format_strings: None,
16031658
}
16041659
}
16051660
}
@@ -1658,6 +1713,15 @@ impl ReaderBuilder {
16581713
self
16591714
}
16601715

1716+
/// Set the decoder's format Strings param
1717+
pub fn with_format_strings(
1718+
mut self,
1719+
format_strings: HashMap<String, String>,
1720+
) -> Self {
1721+
self.format_strings = Some(format_strings);
1722+
self
1723+
}
1724+
16611725
/// Create a new `Reader` from the `ReaderBuilder`
16621726
pub fn build<R>(self, source: R) -> Result<Reader<R>>
16631727
where
@@ -1679,6 +1743,7 @@ impl ReaderBuilder {
16791743
schema,
16801744
self.batch_size,
16811745
self.projection,
1746+
self.format_strings,
16821747
))
16831748
}
16841749
}
@@ -1711,7 +1776,7 @@ mod tests {
17111776
.unwrap();
17121777
let batch = reader.next().unwrap().unwrap();
17131778

1714-
assert_eq!(4, batch.num_columns());
1779+
assert_eq!(5, batch.num_columns());
17151780
assert_eq!(12, batch.num_rows());
17161781

17171782
let schema = reader.schema();
@@ -1834,6 +1899,7 @@ mod tests {
18341899
Arc::new(schema.clone()),
18351900
1024,
18361901
None,
1902+
None,
18371903
);
18381904
let reader_schema = reader.schema();
18391905
assert_eq!(reader_schema, Arc::new(schema));
@@ -1870,6 +1936,39 @@ mod tests {
18701936
assert_eq!(-3.5, bb.value(1));
18711937
}
18721938

1939+
#[test]
1940+
fn test_json_format_strings_for_date() {
1941+
let schema =
1942+
Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
1943+
let e = schema.column_with_name("e").unwrap();
1944+
assert_eq!(&DataType::Date32, e.1.data_type());
1945+
let mut fmts = HashMap::new();
1946+
let date_format = "%Y-%m-%d".to_string();
1947+
fmts.insert("e".to_string(), date_format.clone());
1948+
1949+
let mut reader: Reader<File> = Reader::new(
1950+
File::open("test/data/basic.json").unwrap(),
1951+
schema.clone(),
1952+
1024,
1953+
None,
1954+
Some(fmts),
1955+
);
1956+
let reader_schema = reader.schema();
1957+
assert_eq!(reader_schema, schema);
1958+
let batch = reader.next().unwrap().unwrap();
1959+
1960+
let ee = batch
1961+
.column(e.0)
1962+
.as_any()
1963+
.downcast_ref::<Date32Array>()
1964+
.unwrap();
1965+
let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
1966+
assert_eq!(dt, ee.value(0));
1967+
let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
1968+
assert_eq!(dt, ee.value(1));
1969+
assert!(!ee.is_valid(2));
1970+
}
1971+
18731972
#[test]
18741973
fn test_json_basic_schema_projection() {
18751974
// We test implicit and explicit projection:
@@ -1886,6 +1985,7 @@ mod tests {
18861985
Arc::new(schema),
18871986
1024,
18881987
Some(vec!["a".to_string(), "c".to_string()]),
1988+
None,
18891989
);
18901990
let reader_schema = reader.schema();
18911991
let expected_schema = Arc::new(Schema::new(vec![
@@ -2052,7 +2152,8 @@ mod tests {
20522152
file.seek(SeekFrom::Start(0)).unwrap();
20532153

20542154
let reader = BufReader::new(GzDecoder::new(&file));
2055-
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None);
2155+
let mut reader =
2156+
Reader::from_buf_reader(reader, Arc::new(schema), 64, None, None);
20562157
let batch_gz = reader.next().unwrap().unwrap();
20572158

20582159
for batch in vec![batch, batch_gz] {
@@ -3093,7 +3194,7 @@ mod tests {
30933194
true,
30943195
)]);
30953196

3096-
let decoder = Decoder::new(Arc::new(schema), 1024, None);
3197+
let decoder = Decoder::new(Arc::new(schema), 1024, None, None);
30973198
let batch = decoder
30983199
.next_batch(
30993200
&mut vec![
@@ -3128,7 +3229,7 @@ mod tests {
31283229
true,
31293230
)]);
31303231

3131-
let decoder = Decoder::new(Arc::new(schema), 1024, None);
3232+
let decoder = Decoder::new(Arc::new(schema), 1024, None, None);
31323233
let batch = decoder
31333234
.next_batch(
31343235
// NOTE: total struct element count needs to be greater than
@@ -3157,7 +3258,7 @@ mod tests {
31573258
#[test]
31583259
fn test_json_read_binary_structs() {
31593260
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
3160-
let decoder = Decoder::new(Arc::new(schema), 1024, None);
3261+
let decoder = Decoder::new(Arc::new(schema), 1024, None, None);
31613262
let batch = decoder
31623263
.next_batch(
31633264
&mut vec![
@@ -3200,7 +3301,7 @@ mod tests {
32003301
let mut sum_a = 0;
32013302
for batch in reader {
32023303
let batch = batch.unwrap();
3203-
assert_eq!(4, batch.num_columns());
3304+
assert_eq!(5, batch.num_columns());
32043305
sum_num_rows += batch.num_rows();
32053306
num_batches += 1;
32063307
let batch_schema = batch.schema();

arrow/test/data/basic.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
{"a":1, "b":2.0, "c":false, "d":"4"}
2-
{"a":-10, "b":-3.5, "c":true, "d":"4"}
3-
{"a":2, "b":0.6, "c":false, "d":"text"}
1+
{"a":1, "b":2.0, "c":false, "d":"4", "e":"1970-1-2"}
2+
{"a":-10, "b":-3.5, "c":true, "d":"4", "e": "1969-12-31"}
3+
{"a":2, "b":0.6, "c":false, "d":"text", "e": "1970-01-02 11:11:11"}
44
{"a":1, "b":2.0, "c":false, "d":"4"}
55
{"a":7, "b":-3.5, "c":true, "d":"4"}
66
{"a":1, "b":0.6, "c":false, "d":"text"}
@@ -9,4 +9,4 @@
99
{"a":1, "b":0.6, "c":false, "d":"text"}
1010
{"a":1, "b":2.0, "c":false, "d":"4"}
1111
{"a":1, "b":-3.5, "c":true, "d":"4"}
12-
{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
12+
{"a":100000000000000, "b":0.6, "c":false, "d":"text"}

0 commit comments

Comments
 (0)