Skip to content

Commit

Permalink
Port remainder of window.rs to sqllogictest
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 4, 2023
1 parent 5788a6a commit 8b102b8
Show file tree
Hide file tree
Showing 10 changed files with 967 additions and 831 deletions.
180 changes: 1 addition & 179 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

pub mod parquet;

use arrow::array::Int32Array;
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -42,13 +40,10 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Statistics, TableReference};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{col, CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
use tempfile::TempDir;

/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
Expand Down Expand Up @@ -293,179 +288,6 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
Arc::new(schema)
}

// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by ts
fn get_test_data() -> Result<(RecordBatch, Vec<Expr>)> {
let ts_field = Field::new("ts", DataType::Int32, false);
let inc_field = Field::new("inc_col", DataType::Int32, false);
let desc_field = Field::new("desc_col", DataType::Int32, false);

let schema = Arc::new(Schema::new(vec![ts_field, inc_field, desc_field]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from_slice([
1, 1, 5, 9, 10, 11, 16, 21, 22, 26, 26, 28, 31, 33, 38, 42, 47, 51, 53,
53, 58, 63, 67, 68, 70, 72, 72, 76, 81, 85, 86, 88, 91, 96, 97, 98, 100,
101, 102, 104, 104, 108, 112, 113, 113, 114, 114, 117, 122, 126, 131,
131, 136, 136, 136, 139, 141, 146, 147, 147, 152, 154, 159, 161, 163,
164, 167, 172, 173, 177, 180, 185, 186, 191, 195, 195, 199, 203, 207,
210, 213, 218, 221, 224, 226, 230, 232, 235, 238, 238, 239, 244, 245,
247, 250, 254, 258, 262, 264, 264,
])),
Arc::new(Int32Array::from_slice([
1, 5, 10, 15, 20, 21, 26, 29, 30, 33, 37, 40, 43, 44, 45, 49, 51, 53, 58,
61, 65, 70, 75, 78, 83, 88, 90, 91, 95, 97, 100, 105, 109, 111, 115, 119,
120, 124, 126, 129, 131, 135, 140, 143, 144, 147, 148, 149, 151, 155,
156, 159, 160, 163, 165, 170, 172, 177, 181, 182, 186, 187, 192, 196,
197, 199, 203, 207, 209, 213, 214, 216, 219, 221, 222, 225, 226, 231,
236, 237, 242, 245, 247, 248, 253, 254, 259, 261, 266, 269, 272, 275,
278, 283, 286, 289, 291, 296, 301, 305,
])),
Arc::new(Int32Array::from_slice([
100, 98, 93, 91, 86, 84, 81, 77, 75, 71, 70, 69, 64, 62, 59, 55, 50, 45,
41, 40, 39, 36, 31, 28, 23, 22, 17, 13, 10, 6, 5, 2, 1, -1, -4, -5, -6,
-8, -12, -16, -17, -19, -24, -25, -29, -34, -37, -42, -47, -48, -49, -53,
-57, -58, -61, -65, -67, -68, -71, -73, -75, -76, -78, -83, -87, -91,
-95, -98, -101, -105, -106, -111, -114, -116, -120, -125, -128, -129,
-134, -139, -142, -143, -146, -150, -154, -158, -163, -168, -172, -176,
-181, -184, -189, -193, -196, -201, -203, -208, -210, -213,
])),
],
)?;
let file_sort_order = vec![col("ts").sort(true, false)];
Ok((batch, file_sort_order))
}

// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by a, b, c
fn get_test_data2() -> Result<(RecordBatch, Vec<Expr>)> {
let a0 = Field::new("a0", DataType::Int32, false);
let a = Field::new("a", DataType::Int32, false);
let b = Field::new("b", DataType::Int32, false);
let c = Field::new("c", DataType::Int32, false);
let d = Field::new("d", DataType::Int32, false);

let schema = Arc::new(Schema::new(vec![a0, a, b, c, d]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from_slice([
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0,
])),
Arc::new(Int32Array::from_slice([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1,
])),
Arc::new(Int32Array::from_slice([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3,
])),
Arc::new(Int32Array::from_slice([
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38,
39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74,
75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92,
93, 94, 95, 96, 97, 98, 99,
])),
Arc::new(Int32Array::from_slice([
0, 2, 0, 0, 1, 1, 0, 2, 1, 4, 4, 2, 2, 1, 2, 3, 3, 2, 1, 4, 0, 3, 0, 0,
4, 0, 2, 0, 1, 1, 3, 4, 2, 2, 4, 0, 1, 4, 0, 1, 1, 3, 3, 2, 3, 0, 0, 1,
1, 3, 0, 3, 1, 1, 4, 2, 1, 1, 1, 2, 4, 3, 1, 4, 4, 0, 2, 4, 1, 1, 0, 2,
1, 1, 4, 2, 0, 2, 1, 4, 2, 0, 4, 2, 1, 1, 1, 4, 3, 4, 1, 2, 0, 0, 2, 0,
4, 2, 4, 3,
])),
],
)?;
let file_sort_order = vec![
col("a").sort(true, false),
col("b").sort(true, false),
col("c").sort(true, false),
];
Ok((batch, file_sort_order))
}

/// Creates a test_context with table name `annotated_data` which has 100 rows.
// Columns in the table are ts, inc_col, desc_col. Source is CsvExec which is ordered by
// ts column.
pub async fn get_test_context(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
) -> Result<SessionContext> {
get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data).await
}

/// Creates a test_context with table name `annotated_data`, which has 100 rows.
// Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
// a,b,c column. Column a has cardinality 2, column b has cardinality 4.
// Column c has cardinality 100 (unique entries). Column d has cardinality 5.
pub async fn get_test_context2(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
) -> Result<SessionContext> {
get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data2).await
}

async fn get_test_context_helper(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
data_receiver: fn() -> Result<(RecordBatch, Vec<Expr>)>,
) -> Result<SessionContext> {
let ctx = SessionContext::with_config(session_config);

let csv_read_options = CsvReadOptions::default();
let (batch, file_sort_order) = data_receiver()?;

let options_sort = csv_read_options
.to_listing_options(&ctx.copied_config())
.with_file_sort_order(Some(file_sort_order))
.with_infinite_source(infinite_source);

write_test_data_to_csv(tmpdir, 1, &batch)?;
let sql_definition = None;
ctx.register_listing_table(
"annotated_data",
tmpdir.path().to_string_lossy(),
options_sort.clone(),
Some(batch.schema()),
sql_definition,
)
.await
.unwrap();
Ok(ctx)
}

fn write_test_data_to_csv(
tmpdir: &TempDir,
n_file: usize,
batch: &RecordBatch,
) -> Result<()> {
let n_chunk = batch.num_rows() / n_file;
for i in 0..n_file {
let target_file = tmpdir.path().join(format!("{i}.csv"));
let file = File::create(target_file)?;
let chunks_start = i * n_chunk;
let cur_batch = batch.slice(chunks_start, n_chunk);
let mut writer = arrow::csv::Writer::new(file);
writer.write(&cur_batch)?;
}
Ok(())
}

/// TableFactory for tests
pub struct TestTableFactory {}

Expand Down
101 changes: 101 additions & 0 deletions datafusion/core/tests/data/window_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
ts,inc_col,desc_col
1,1,100
1,5,98
5,10,93
9,15,91
10,20,86
11,21,84
16,26,81
21,29,77
22,30,75
26,33,71
26,37,70
28,40,69
31,43,64
33,44,62
38,45,59
42,49,55
47,51,50
51,53,45
53,58,41
53,61,40
58,65,39
63,70,36
67,75,31
68,78,28
70,83,23
72,88,22
72,90,17
76,91,13
81,95,10
85,97,6
86,100,5
88,105,2
91,109,1
96,111,-1
97,115,-4
98,119,-5
100,120,-6
101,124,-8
102,126,-12
104,129,-16
104,131,-17
108,135,-19
112,140,-24
113,143,-25
113,144,-29
114,147,-34
114,148,-37
117,149,-42
122,151,-47
126,155,-48
131,156,-49
131,159,-53
136,160,-57
136,163,-58
136,165,-61
139,170,-65
141,172,-67
146,177,-68
147,181,-71
147,182,-73
152,186,-75
154,187,-76
159,192,-78
161,196,-83
163,197,-87
164,199,-91
167,203,-95
172,207,-98
173,209,-101
177,213,-105
180,214,-106
185,216,-111
186,219,-114
191,221,-116
195,222,-120
195,225,-125
199,226,-128
203,231,-129
207,236,-134
210,237,-139
213,242,-142
218,245,-143
221,247,-146
224,248,-150
226,253,-154
230,254,-158
232,259,-163
235,261,-168
238,266,-172
238,269,-176
239,272,-181
244,275,-184
245,278,-189
247,283,-193
250,286,-196
254,289,-201
258,291,-203
262,296,-208
264,301,-210
264,305,-213
Loading

0 comments on commit 8b102b8

Please sign in to comment.