forked from dathere/qsv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextsort.rs
119 lines (99 loc) · 3.86 KB
/
extsort.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
static USAGE: &str = r#"
Sort an arbitrarily large CSV/text file using a multithreaded external sort algorithm.
This command does not work with <stdin>/<stdout>. Valid input, and output
files are expected.
Also, this command is not specific to CSV data, it sorts any text file on a
line-by-line basis. If sorting a non-CSV file, be sure to set --no-headers,
otherwise, the first line will not be included in the external sort.
Usage:
qsv extsort [options] <input> <output>
qsv extsort --help
External sort option:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the
number of CPUs detected.
Common options:
-h, --help Display this message
-n, --no-headers When set, the first row will not be interpreted
as headers and will be sorted with the rest
of the rows. Otherwise, the first row will always
appear as the header row in the output.
"#;
use std::{
fs,
io::{self, prelude::*},
path,
};
use ext_sort::{buffer::mem::MemoryLimitedBufferBuilder, ExternalSorter, ExternalSorterBuilder};
use serde::Deserialize;
use sysinfo::{System, SystemExt};
use crate::{util, CliResult};
#[derive(Deserialize)]
struct Args {
arg_input: String,
arg_output: String,
flag_jobs: Option<usize>,
flag_no_headers: bool,
}
const MEMORY_LIMITED_BUFFER: u64 = 100 * 1_000_000; // 100 MB
const RW_BUFFER_CAPACITY: usize = 1_000_000; // 1 MB
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
// buffer to use for sorting in memory,
// use 10% of total memory if we can detect it, otherwise
// set it to MEMORY_LIMITED_BUFFER
let mem_limited_buffer = if System::IS_SUPPORTED {
let mut sys = System::new_all();
sys.refresh_memory();
(sys.total_memory() * 1000) / 10 // 10 percent of total memory
} else {
MEMORY_LIMITED_BUFFER
};
log::info!("{mem_limited_buffer} bytes used for in memory mergesort buffer...");
let mut input_reader = io::BufReader::new(match fs::File::open(&args.arg_input) {
Ok(f) => f,
Err(e) => return fail_clierror!("Cannot read input file {e}"),
});
let mut output_writer = io::BufWriter::new(match fs::File::create(&args.arg_output) {
Ok(f) => f,
Err(e) => return fail_clierror!("Cannot create output file: {e}"),
});
let sorter: ExternalSorter<String, io::Error, MemoryLimitedBufferBuilder> =
match ExternalSorterBuilder::new()
.with_tmp_dir(path::Path::new("./"))
.with_buffer(MemoryLimitedBufferBuilder::new(mem_limited_buffer))
.with_rw_buf_size(RW_BUFFER_CAPACITY)
.with_threads_number(util::njobs(args.flag_jobs))
.build()
{
Ok(sorter) => sorter,
Err(e) => {
return fail_clierror!("cannot create external sorter: {e}");
}
};
let mut header = String::new();
if !args.flag_no_headers {
input_reader.read_line(&mut header)?;
}
let sorted = if let Ok(ext_sorter) = sorter.sort(input_reader.lines()) {
ext_sorter
} else {
return fail!("cannot do external sort");
};
if !header.is_empty() {
output_writer.write_all(format!("{}\n", header.trim_end()).as_bytes())?;
}
for item in sorted.map(Result::unwrap) {
output_writer.write_all(format!("{item}\n").as_bytes())?;
}
output_writer.flush()?;
Ok(())
}
#[test]
fn test_mem_check() {
// check to see if sysinfo return meminfo without segfaulting
let mut sys = System::new_all();
sys.refresh_memory();
let mem10percent = (sys.total_memory() * 1000) / 10; // 10 percent of total memory
assert!(mem10percent > 0);
}