Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions src/entity.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::configuration::{Column, Configuration};
use crate::persistence::entity::EntityMappingPersistor;
use log::info;
use smallvec::{smallvec, SmallVec};
use std::hash::Hasher;
use std::sync::Arc;
Expand Down Expand Up @@ -80,7 +79,6 @@ where
field_hashes: SmallVec<[u64; SMALL_VECTOR_SIZE]>,
not_ignored_columns_count: u16,
columns_count: u16,
rows_count: u64,
entity_mapping_persistor: Arc<T>,
hashes_handler: F,
}
Expand Down Expand Up @@ -117,7 +115,6 @@ where
field_hashes,
not_ignored_columns_count,
columns_count,
rows_count: 0u64,
entity_mapping_persistor: persistor,
hashes_handler,
}
Expand Down Expand Up @@ -179,12 +176,6 @@ where
for hash_row in hash_rows {
(self.hashes_handler)(hash_row);
}

self.rows_count += 1;

if self.rows_count % self.config.log_every_n as u64 == 0 {
info!("Number of lines processed: {}", self.rows_count);
}
}

#[inline(always)]
Expand Down
113 changes: 73 additions & 40 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::persistence::entity::InMemoryEntityMappingPersistor;
use crate::persistence::sparse_matrix::InMemorySparseMatrixPersistor;
use crate::sparse_matrix::{create_sparse_matrices, SparseMatrix};
use bus::Bus;
use log::{error, info};
use simdjson_rust::dom;
use smallvec::{smallvec, SmallVec};
use std::sync::Arc;
Expand Down Expand Up @@ -47,30 +48,22 @@ pub fn build_graphs(
bus.broadcast(hashes);
});

let input_file = File::open(&config.input).expect("Can't open file"); // handle error
let mut buffered = BufReader::new(input_file);

let mut line = String::new();
let mut parser = dom::Parser::default();
match &config.file_type {
FileType::JSON => {
while buffered.read_line(&mut line).unwrap() > 0 {
let row = read_json_columns(&line, &mut parser, &config.columns);
let mut parser = dom::Parser::default();
read_file(config, |line| {
let row = parse_json_line(line, &mut parser, &config.columns);
entity_processor.process_row(&row);
line.clear(); // clear to reuse the buffer
}
});
}
FileType::TSV => {
while buffered.read_line(&mut line).unwrap() > 0 {
{
let values = line.trim().split('\t');
let row: Vec<_> = values.map(|c| c.split(' ').collect()).collect();
entity_processor.process_row(&row);
}
line.clear(); // clear to reuse the buffer
}
read_file(config, |line| {
let row = parse_tsv_line(line);
entity_processor.process_row(&row);
});
}
}

entity_processor.finish();

let mut sparse_matrices = vec![];
Expand All @@ -84,42 +77,82 @@ pub fn build_graphs(
sparse_matrices
}

/// Read file line by line. Pass every valid line to handler for parsing.
fn read_file<F>(config: &Configuration, mut line_handler: F)
where
F: FnMut(&str),
{
let input_file = File::open(&config.input).expect("Can't open file");
let mut buffered = BufReader::new(input_file);

let mut line_number = 1u64;
let mut line = String::new();
loop {
match buffered.read_line(&mut line) {
Ok(bytes_read) => {
// EOF
if bytes_read == 0 {
break;
}

line_handler(&line);
}
Err(err) => {
error!("Can't read line number: {}. Error: {}.", line_number, err);
}
};

// clear to reuse the buffer
line.clear();

if line_number % config.log_every_n as u64 == 0 {
info!("Number of lines processed: {}", line_number);
}

line_number += 1;
}
}

/// Parse a line of JSON and read its columns into a vector for processing.
fn read_json_columns(
fn parse_json_line(
line: &str,
parser: &mut dom::Parser,
columns: &[Column],
) -> Vec<SmallVec<[String; SMALL_VECTOR_SIZE]>> {
let parsed = parser.parse(&line).unwrap();
columns
.iter()
.map({
|c| {
if !c.complex {
let elem = parsed.at_key(&c.name).unwrap();
let value = match elem.get_type() {
dom::element::ElementType::String => elem.get_string().unwrap(),
_ => elem.minify(),
};
smallvec![value]
} else {
parsed
.at_key(&c.name)
.unwrap()
.get_array()
.expect("values for complex columns must be arrays")
.into_iter()
.map(|v| match v.get_type() {
dom::element::ElementType::String => v.get_string().unwrap(),
_ => v.minify(),
})
.collect()
}
.map(|c| {
if !c.complex {
let elem = parsed.at_key(&c.name).unwrap();
let value = match elem.get_type() {
dom::element::ElementType::String => elem.get_string().unwrap(),
_ => elem.minify(),
};
smallvec![value]
} else {
parsed
.at_key(&c.name)
.unwrap()
.get_array()
.expect("Values for complex columns must be arrays")
.into_iter()
.map(|v| match v.get_type() {
dom::element::ElementType::String => v.get_string().unwrap(),
_ => v.minify(),
})
.collect()
}
})
.collect()
}

/// Parse a line of TSV and read its columns into a vector for processing.
fn parse_tsv_line(line: &str) -> Vec<SmallVec<[&str; SMALL_VECTOR_SIZE]>> {
let values = line.trim().split('\t');
values.map(|c| c.split(' ').collect()).collect()
}

/// Train SparseMatrix'es (graphs) in separated threads.
pub fn train(
config: Configuration,
Expand Down