Skip to content

Commit 321fda4

Browse files
authored
NdJson support (#404)
1 parent c9ed34c commit 321fda4

File tree

9 files changed

+803
-85
lines changed

9 files changed

+803
-85
lines changed

datafusion/src/datasource/csv.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,13 @@ use std::string::String;
4040
use std::sync::{Arc, Mutex};
4141

4242
use crate::datasource::datasource::Statistics;
43-
use crate::datasource::TableProvider;
43+
use crate::datasource::{Source, TableProvider};
4444
use crate::error::{DataFusionError, Result};
4545
use crate::logical_plan::Expr;
4646
use crate::physical_plan::csv::CsvExec;
4747
pub use crate::physical_plan::csv::CsvReadOptions;
4848
use crate::physical_plan::{common, ExecutionPlan};
4949

50-
enum Source {
51-
/// Path to a single CSV file or a directory containing one of more CSV files
52-
Path(String),
53-
54-
/// Read CSV data from a reader
55-
Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
56-
}
57-
5850
/// Represents a CSV file with a provided schema
5951
pub struct CsvFile {
6052
source: Source,

datafusion/src/datasource/json.rs

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Line-delimited JSON data source
19+
//!
20+
//! This data source allows Line-delimited JSON string or files to be used as input for queries.
21+
//!
22+
23+
use std::{
24+
any::Any,
25+
io::{BufReader, Read, Seek},
26+
sync::{Arc, Mutex},
27+
};
28+
29+
use crate::{
30+
datasource::{Source, TableProvider},
31+
error::{DataFusionError, Result},
32+
physical_plan::{
33+
common,
34+
json::{NdJsonExec, NdJsonReadOptions},
35+
ExecutionPlan,
36+
},
37+
};
38+
use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};
39+
40+
use super::datasource::Statistics;
41+
42+
trait SeekRead: Read + Seek {}
43+
44+
impl<T: Seek + Read> SeekRead for T {}
45+
46+
/// Represents a line-delimited JSON file with a provided schema
47+
pub struct NdJsonFile {
48+
source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
49+
schema: SchemaRef,
50+
file_extension: String,
51+
statistics: Statistics,
52+
}
53+
54+
impl NdJsonFile {
55+
/// Attempt to initialize a `NdJsonFile` from a path. The schema can be inferred automatically.
56+
pub fn try_new(path: &str, options: NdJsonReadOptions) -> Result<Self> {
57+
let schema = if let Some(schema) = options.schema {
58+
schema
59+
} else {
60+
let filenames = common::build_file_list(path, options.file_extension)?;
61+
if filenames.is_empty() {
62+
return Err(DataFusionError::Plan(format!(
63+
"No files found at {path} with file extension {file_extension}",
64+
path = path,
65+
file_extension = options.file_extension
66+
)));
67+
}
68+
69+
NdJsonExec::try_infer_schema(
70+
filenames,
71+
Some(options.schema_infer_max_records),
72+
)?
73+
.into()
74+
};
75+
76+
Ok(Self {
77+
source: Source::Path(path.to_string()),
78+
schema,
79+
file_extension: options.file_extension.to_string(),
80+
statistics: Statistics::default(),
81+
})
82+
}
83+
84+
/// Attempt to initialize a `NdJsonFile` from a reader impls `Seek`. The schema can be inferred automatically.
85+
pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
86+
mut reader: R,
87+
options: NdJsonReadOptions,
88+
) -> Result<Self> {
89+
let schema = if let Some(schema) = options.schema {
90+
schema
91+
} else {
92+
let mut bufr = BufReader::new(reader);
93+
let schema = infer_json_schema_from_seekable(
94+
&mut bufr,
95+
Some(options.schema_infer_max_records),
96+
)?
97+
.into();
98+
reader = bufr.into_inner();
99+
schema
100+
};
101+
Ok(Self {
102+
source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
103+
schema,
104+
statistics: Statistics::default(),
105+
file_extension: String::new(),
106+
})
107+
}
108+
}
109+
impl TableProvider for NdJsonFile {
110+
fn as_any(&self) -> &dyn Any {
111+
self
112+
}
113+
114+
fn schema(&self) -> SchemaRef {
115+
self.schema.clone()
116+
}
117+
118+
fn scan(
119+
&self,
120+
projection: &Option<Vec<usize>>,
121+
batch_size: usize,
122+
_filters: &[crate::logical_plan::Expr],
123+
limit: Option<usize>,
124+
) -> Result<Arc<dyn ExecutionPlan>> {
125+
let opts = NdJsonReadOptions {
126+
schema: Some(self.schema.clone()),
127+
schema_infer_max_records: 0, // schema will always be provided, so it's unnecessary to infer schema
128+
file_extension: self.file_extension.as_str(),
129+
};
130+
let batch_size = limit
131+
.map(|l| std::cmp::min(l, batch_size))
132+
.unwrap_or(batch_size);
133+
134+
let exec = match &self.source {
135+
Source::Reader(maybe_reader) => {
136+
if let Some(rdr) = maybe_reader.lock().unwrap().take() {
137+
NdJsonExec::try_new_from_reader(
138+
rdr,
139+
opts,
140+
projection.clone(),
141+
batch_size,
142+
limit,
143+
)?
144+
} else {
145+
return Err(DataFusionError::Execution(
146+
"You can only read once if the data comes from a reader"
147+
.to_string(),
148+
));
149+
}
150+
}
151+
Source::Path(p) => {
152+
NdJsonExec::try_new(&p, opts, projection.clone(), batch_size, limit)?
153+
}
154+
};
155+
Ok(Arc::new(exec))
156+
}
157+
158+
fn statistics(&self) -> Statistics {
159+
self.statistics.clone()
160+
}
161+
}
162+
163+
#[cfg(test)]
164+
mod tests {
165+
use super::*;
166+
use crate::prelude::*;
167+
const TEST_DATA_BASE: &str = "tests/jsons";
168+
169+
#[tokio::test]
170+
async fn csv_file_from_reader() -> Result<()> {
171+
let mut ctx = ExecutionContext::new();
172+
let path = format!("{}/2.json", TEST_DATA_BASE);
173+
ctx.register_table(
174+
"ndjson",
175+
Arc::new(NdJsonFile::try_new(&path, Default::default())?),
176+
)?;
177+
let df = ctx.sql("select sum(a) from ndjson")?;
178+
let batches = df.collect().await?;
179+
assert_eq!(
180+
batches[0]
181+
.column(0)
182+
.as_any()
183+
.downcast_ref::<arrow::array::Int64Array>()
184+
.unwrap()
185+
.value(0),
186+
100000000000011
187+
);
188+
Ok(())
189+
}
190+
}

datafusion/src/datasource/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,18 @@
2020
pub mod csv;
2121
pub mod datasource;
2222
pub mod empty;
23+
pub mod json;
2324
pub mod memory;
2425
pub mod parquet;
2526

2627
pub use self::csv::{CsvFile, CsvReadOptions};
2728
pub use self::datasource::{TableProvider, TableType};
2829
pub use self::memory::MemTable;
30+
31+
pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
32+
/// Path to a single file or a directory containing one of more files
33+
Path(String),
34+
35+
/// Read data from a reader
36+
Reader(std::sync::Mutex<Option<R>>),
37+
}

datafusion/src/physical_plan/csv.rs

Lines changed: 3 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
//! Execution plan for reading CSV files
1919
2020
use crate::error::{DataFusionError, Result};
21-
use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning};
21+
use crate::physical_plan::ExecutionPlan;
22+
use crate::physical_plan::{common, source::Source, Partitioning};
2223
use arrow::csv;
2324
use arrow::datatypes::{Schema, SchemaRef};
2425
use arrow::error::Result as ArrowResult;
@@ -32,7 +33,7 @@ use std::sync::Arc;
3233
use std::sync::Mutex;
3334
use std::task::{Context, Poll};
3435

35-
use super::{RecordBatchStream, SendableRecordBatchStream};
36+
use super::{DisplayFormatType, RecordBatchStream, SendableRecordBatchStream};
3637
use async_trait::async_trait;
3738

3839
/// CSV file read option
@@ -106,77 +107,6 @@ impl<'a> CsvReadOptions<'a> {
106107
}
107108
}
108109

109-
/// Source represents where the data comes from.
110-
enum Source {
111-
/// The data comes from partitioned files
112-
PartitionedFiles {
113-
/// Path to directory containing partitioned files with the same schema
114-
path: String,
115-
/// The individual files under path
116-
filenames: Vec<String>,
117-
},
118-
119-
/// The data comes from anything impl Read trait
120-
Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
121-
}
122-
123-
impl std::fmt::Debug for Source {
124-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125-
match self {
126-
Source::PartitionedFiles { path, filenames } => f
127-
.debug_struct("PartitionedFiles")
128-
.field("path", path)
129-
.field("filenames", filenames)
130-
.finish()?,
131-
Source::Reader(_) => f.write_str("Reader")?,
132-
};
133-
Ok(())
134-
}
135-
}
136-
137-
impl std::fmt::Display for Source {
138-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139-
match self {
140-
Source::PartitionedFiles { path, filenames } => {
141-
write!(f, "Path({}: [{}])", path, filenames.join(","))
142-
}
143-
Source::Reader(_) => {
144-
write!(f, "Reader(...)")
145-
}
146-
}
147-
}
148-
}
149-
150-
impl Clone for Source {
151-
fn clone(&self) -> Self {
152-
match self {
153-
Source::PartitionedFiles { path, filenames } => Self::PartitionedFiles {
154-
path: path.clone(),
155-
filenames: filenames.clone(),
156-
},
157-
Source::Reader(_) => Self::Reader(Mutex::new(None)),
158-
}
159-
}
160-
}
161-
162-
impl Source {
163-
/// Path to directory containing partitioned files with the same schema
164-
pub fn path(&self) -> &str {
165-
match self {
166-
Source::PartitionedFiles { path, .. } => path.as_str(),
167-
Source::Reader(_) => "",
168-
}
169-
}
170-
171-
/// The individual files under path
172-
pub fn filenames(&self) -> &[String] {
173-
match self {
174-
Source::PartitionedFiles { filenames, .. } => filenames,
175-
Source::Reader(_) => &[],
176-
}
177-
}
178-
}
179-
180110
/// Execution plan for scanning a CSV file
181111
#[derive(Debug, Clone)]
182112
pub struct CsvExec {

0 commit comments

Comments
 (0)