forked from dathere/qsv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtojsonl.rs
285 lines (255 loc) · 10.5 KB
/
tojsonl.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#![allow(unused_assignments)]
static USAGE: &str = r#"
Smartly converts CSV to a newline-delimited JSON (JSONL/NDJSON).
By scanning the CSV first, it "smartly" infers the appropriate JSON data type
for each column.
It will infer a column as boolean if it only has a domain of two values,
and the first character of the values are one of the following case-insensitive
combinations: t/f; t/null; 1/0; 1/null; y/n & y/null are treated as true/false.
For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_tojsonl.rs.
Usage:
qsv tojsonl [options] [<input>]
qsv tojsonl --help
Tojsonl optionns:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the
number of CPUs detected.
Common options:
-h, --help Display this message
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. (default: ,)
-o, --output <file> Write output to <file> instead of stdout.
"#;
use std::{env::temp_dir, fmt::Write, fs::File, path::Path, str::FromStr};
use serde::Deserialize;
use serde_json::{Map, Value};
use strum_macros::EnumString;
use uuid::Uuid;
use super::schema::infer_schema_from_stats;
use crate::{
config::{Config, Delimiter},
util, CliError, CliResult,
};
#[derive(Deserialize, Clone)]
struct Args {
arg_input: Option<String>,
flag_jobs: Option<usize>,
flag_delimiter: Option<Delimiter>,
flag_output: Option<String>,
}
impl From<std::fmt::Error> for CliError {
fn from(err: std::fmt::Error) -> CliError {
CliError::Other(err.to_string())
}
}
#[derive(PartialEq, EnumString)]
#[strum(ascii_case_insensitive)]
enum JsonlType {
Boolean,
String,
Number,
Integer,
Null,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let preargs: Args = util::get_args(USAGE, argv)?;
let mut args = preargs.clone();
let conf = Config::new(&args.arg_input).delimiter(args.flag_delimiter);
let mut is_stdin = false;
let stdin_fpath = format!("{}/{}.csv", temp_dir().to_string_lossy(), Uuid::new_v4());
let stdin_temp = stdin_fpath.clone();
// if using stdin, we create a stdin.csv file as stdin is not seekable and we need to
// open the file multiple times to compile stats/unique values, etc.
let input_filename = if preargs.arg_input.is_none() {
let mut stdin_file = File::create(stdin_fpath.clone())?;
let stdin = std::io::stdin();
let mut stdin_handle = stdin.lock();
std::io::copy(&mut stdin_handle, &mut stdin_file)?;
args.arg_input = Some(stdin_fpath.clone());
is_stdin = true;
stdin_fpath
} else {
let filename = Path::new(args.arg_input.as_ref().unwrap())
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
filename
};
// we're calling the schema command to infer data types and enums
let schema_args = crate::cmd::schema::Args {
// we only do three, as we're only inferring boolean based on enum
// i.e. we only inspect a field if its boolean if its domain
// is just two values. if its more than 2, that's all we need know
// for boolean inferencing
flag_enum_threshold: 3,
flag_strict_dates: false,
flag_pattern_columns: crate::select::SelectColumns::parse("")?,
// json doesn't have a date type, so don't infer dates
flag_dates_whitelist: "none".to_string(),
flag_prefer_dmy: false,
flag_stdout: false,
flag_jobs: Some(util::njobs(args.flag_jobs)),
flag_no_headers: false,
flag_delimiter: args.flag_delimiter,
arg_input: args.arg_input.clone(),
};
// build schema for each field by their inferred type, min/max value/length, and unique values
let properties_map: Map<String, Value> =
match infer_schema_from_stats(&schema_args, &input_filename) {
Ok(map) => map,
Err(e) => {
return fail_clierror!("Failed to infer field types: {e}");
}
};
let mut rdr = if is_stdin {
Config::new(&Some(stdin_temp))
.delimiter(args.flag_delimiter)
.reader()?
} else {
conf.reader()?
};
// TODO: instead of abusing csv writer to write jsonl file
// just use a normal buffered writer
let mut wtr = Config::new(&args.flag_output)
.flexible(true)
.no_headers(true)
.quote_style(csv::QuoteStyle::Never)
.writer()?;
let headers = rdr.headers()?.clone();
// create a vec lookup about inferred field data types
let mut field_type_vec: Vec<JsonlType> = Vec::with_capacity(headers.len());
for (_field_name, field_def) in properties_map.iter() {
let Some(field_map) = field_def.as_object() else { return fail!("Cannot create field map") };
let prelim_type = field_map.get("type").unwrap();
let field_values_enum = field_map.get("enum");
// log::debug!("prelim_type: {prelim_type} field_values_enum: {field_values_enum:?}");
// check if a field has a boolean data type
// by checking its enum constraint
if let Some(domain) = field_values_enum {
if let Some(vals) = domain.as_array() {
// if this field only has a domain of two values
if vals.len() == 2 {
let val1 = if vals[0].is_null() {
'_'
} else {
// check the first domain value, if its a string
// get the first character of val1 lowercase
if let Some(str_val) = vals[0].as_str() {
first_lower_char(str_val)
} else if let Some(int_val) = vals[0].as_u64() {
// else, its an integer (as we only do enum constraints
// for string and integers), and see if its 1 or 0
match int_val {
1 => '1',
0 => '0',
_ => '*', // its something else
}
} else {
'*'
}
};
// same as above, but for the 2nd domain value
let val2 = if vals[1].is_null() {
'_'
} else if let Some(str_val) = vals[1].as_str() {
first_lower_char(str_val)
} else if let Some(int_val) = vals[1].as_u64() {
match int_val {
1 => '1',
0 => '0',
_ => '*',
}
} else {
'*'
};
// log::debug!("val1: {val1} val2: {val2}");
// check if the domain of two values is truthy or falsy
// i.e. if first character, case-insensitive is "t", "1" or "y" - truthy
// "f", "0", "n" or null - falsy
// if it is, infer a boolean field
if let ('t', 'f' | '_')
| ('f' | '_', 't')
| ('1', '0' | '_')
| ('0' | '_', '1')
| ('y', 'n' | '_')
| ('n' | '_', 'y') = (val1, val2)
{
field_type_vec.push(JsonlType::Boolean);
continue;
}
}
}
}
// ok to use index access and unwrap here as we know
// we have at least one element in the prelim_type as_array
field_type_vec.push(
JsonlType::from_str(
prelim_type.as_array().unwrap()[0]
.as_str()
.unwrap_or("null"),
)
.unwrap_or(JsonlType::String),
);
}
// amortize allocs
let mut record = csv::StringRecord::new();
let mut temp_string = String::with_capacity(100);
let mut temp_string2 = String::with_capacity(50);
let mut header_key = Value::String(String::with_capacity(50));
let mut temp_val = Value::String(String::with_capacity(50));
// TODO: see if its worth it to do rayon here after benchmarking
// with large files. We have --jobs option, but we only pass it
// thru to stats/frequency to infer data types & enum constraints.
// now that we have type mappings, iterate thru input csv
// and write jsonl file
while rdr.read_record(&mut record)? {
temp_string.clear();
record.trim();
write!(temp_string, "{{")?;
for (idx, field) in record.iter().enumerate() {
let field_val = if let Some(field_type) = field_type_vec.get(idx) {
match field_type {
JsonlType::String => {
if field.is_empty() {
"null"
} else {
// we round-trip thru serde_json to escape the str
// per json spec (https://www.json.org/json-en.html)
temp_val = field.into();
temp_string2 = temp_val.to_string();
&temp_string2
}
}
JsonlType::Null => "null",
JsonlType::Integer | JsonlType::Number => field,
JsonlType::Boolean => {
if let 't' | 'y' | '1' = first_lower_char(field) {
"true"
} else {
"false"
}
}
}
} else {
"null"
};
header_key = headers[idx].into();
if field_val.is_empty() {
write!(temp_string, r#"{header_key}:null,"#)?;
} else {
write!(temp_string, r#"{header_key}:{field_val},"#)?;
}
}
temp_string.pop(); // remove last comma
temp_string.push('}');
record.clear();
record.push_field(&temp_string);
wtr.write_record(&record)?;
}
Ok(wtr.flush()?)
}
#[inline]
fn first_lower_char(field_str: &str) -> char {
field_str.chars().next().unwrap_or('_').to_ascii_lowercase()
}