Skip to content

Port remainder of window.rs to sqllogictest #6234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 1 addition & 179 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

pub mod parquet;

use arrow::array::Int32Array;
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand All @@ -42,13 +40,10 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Statistics, TableReference};
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{col, CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
use tempfile::TempDir;

/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
Expand Down Expand Up @@ -293,179 +288,6 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
Arc::new(schema)
}

// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by ts
fn get_test_data() -> Result<(RecordBatch, Vec<Expr>)> {
let ts_field = Field::new("ts", DataType::Int32, false);
let inc_field = Field::new("inc_col", DataType::Int32, false);
let desc_field = Field::new("desc_col", DataType::Int32, false);

let schema = Arc::new(Schema::new(vec![ts_field, inc_field, desc_field]));

let batch = RecordBatch::try_new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This data is now checked in as a CSV file

schema,
vec![
Arc::new(Int32Array::from_slice([
1, 1, 5, 9, 10, 11, 16, 21, 22, 26, 26, 28, 31, 33, 38, 42, 47, 51, 53,
53, 58, 63, 67, 68, 70, 72, 72, 76, 81, 85, 86, 88, 91, 96, 97, 98, 100,
101, 102, 104, 104, 108, 112, 113, 113, 114, 114, 117, 122, 126, 131,
131, 136, 136, 136, 139, 141, 146, 147, 147, 152, 154, 159, 161, 163,
164, 167, 172, 173, 177, 180, 185, 186, 191, 195, 195, 199, 203, 207,
210, 213, 218, 221, 224, 226, 230, 232, 235, 238, 238, 239, 244, 245,
247, 250, 254, 258, 262, 264, 264,
])),
Arc::new(Int32Array::from_slice([
1, 5, 10, 15, 20, 21, 26, 29, 30, 33, 37, 40, 43, 44, 45, 49, 51, 53, 58,
61, 65, 70, 75, 78, 83, 88, 90, 91, 95, 97, 100, 105, 109, 111, 115, 119,
120, 124, 126, 129, 131, 135, 140, 143, 144, 147, 148, 149, 151, 155,
156, 159, 160, 163, 165, 170, 172, 177, 181, 182, 186, 187, 192, 196,
197, 199, 203, 207, 209, 213, 214, 216, 219, 221, 222, 225, 226, 231,
236, 237, 242, 245, 247, 248, 253, 254, 259, 261, 266, 269, 272, 275,
278, 283, 286, 289, 291, 296, 301, 305,
])),
Arc::new(Int32Array::from_slice([
100, 98, 93, 91, 86, 84, 81, 77, 75, 71, 70, 69, 64, 62, 59, 55, 50, 45,
41, 40, 39, 36, 31, 28, 23, 22, 17, 13, 10, 6, 5, 2, 1, -1, -4, -5, -6,
-8, -12, -16, -17, -19, -24, -25, -29, -34, -37, -42, -47, -48, -49, -53,
-57, -58, -61, -65, -67, -68, -71, -73, -75, -76, -78, -83, -87, -91,
-95, -98, -101, -105, -106, -111, -114, -116, -120, -125, -128, -129,
-134, -139, -142, -143, -146, -150, -154, -158, -163, -168, -172, -176,
-181, -184, -189, -193, -196, -201, -203, -208, -210, -213,
])),
],
)?;
let file_sort_order = vec![col("ts").sort(true, false)];
Ok((batch, file_sort_order))
}

// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by a, b, c
fn get_test_data2() -> Result<(RecordBatch, Vec<Expr>)> {
let a0 = Field::new("a0", DataType::Int32, false);
let a = Field::new("a", DataType::Int32, false);
let b = Field::new("b", DataType::Int32, false);
let c = Field::new("c", DataType::Int32, false);
let d = Field::new("d", DataType::Int32, false);

let schema = Arc::new(Schema::new(vec![a0, a, b, c, d]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from_slice([
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0,
])),
Arc::new(Int32Array::from_slice([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1,
])),
Arc::new(Int32Array::from_slice([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3,
])),
Arc::new(Int32Array::from_slice([
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38,
39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74,
75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92,
93, 94, 95, 96, 97, 98, 99,
])),
Arc::new(Int32Array::from_slice([
0, 2, 0, 0, 1, 1, 0, 2, 1, 4, 4, 2, 2, 1, 2, 3, 3, 2, 1, 4, 0, 3, 0, 0,
4, 0, 2, 0, 1, 1, 3, 4, 2, 2, 4, 0, 1, 4, 0, 1, 1, 3, 3, 2, 3, 0, 0, 1,
1, 3, 0, 3, 1, 1, 4, 2, 1, 1, 1, 2, 4, 3, 1, 4, 4, 0, 2, 4, 1, 1, 0, 2,
1, 1, 4, 2, 0, 2, 1, 4, 2, 0, 4, 2, 1, 1, 1, 4, 3, 4, 1, 2, 0, 0, 2, 0,
4, 2, 4, 3,
])),
],
)?;
let file_sort_order = vec![
col("a").sort(true, false),
col("b").sort(true, false),
col("c").sort(true, false),
];
Ok((batch, file_sort_order))
}

/// Creates a test_context with table name `annotated_data` which has 100 rows.
// Columns in the table are ts, inc_col, desc_col. Source is CsvExec which is ordered by
// ts column.
pub async fn get_test_context(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
) -> Result<SessionContext> {
get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data).await
}

/// Creates a test_context with table name `annotated_data`, which has 100 rows.
// Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
// a,b,c column. Column a has cardinality 2, column b has cardinality 4.
// Column c has cardinality 100 (unique entries). Column d has cardinality 5.
pub async fn get_test_context2(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
) -> Result<SessionContext> {
get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data2).await
}

async fn get_test_context_helper(
tmpdir: &TempDir,
infinite_source: bool,
session_config: SessionConfig,
data_receiver: fn() -> Result<(RecordBatch, Vec<Expr>)>,
) -> Result<SessionContext> {
let ctx = SessionContext::with_config(session_config);

let csv_read_options = CsvReadOptions::default();
let (batch, file_sort_order) = data_receiver()?;

let options_sort = csv_read_options
.to_listing_options(&ctx.copied_config())
.with_file_sort_order(Some(file_sort_order))
.with_infinite_source(infinite_source);

write_test_data_to_csv(tmpdir, 1, &batch)?;
let sql_definition = None;
ctx.register_listing_table(
"annotated_data",
tmpdir.path().to_string_lossy(),
options_sort.clone(),
Some(batch.schema()),
sql_definition,
)
.await
.unwrap();
Ok(ctx)
}

fn write_test_data_to_csv(
tmpdir: &TempDir,
n_file: usize,
batch: &RecordBatch,
) -> Result<()> {
let n_chunk = batch.num_rows() / n_file;
for i in 0..n_file {
let target_file = tmpdir.path().join(format!("{i}.csv"));
let file = File::create(target_file)?;
let chunks_start = i * n_chunk;
let cur_batch = batch.slice(chunks_start, n_chunk);
let mut writer = arrow::csv::Writer::new(file);
writer.write(&cur_batch)?;
}
Ok(())
}

/// TableFactory for tests
pub struct TestTableFactory {}

Expand Down
101 changes: 101 additions & 0 deletions datafusion/core/tests/data/window_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
ts,inc_col,desc_col
1,1,100
1,5,98
5,10,93
9,15,91
10,20,86
11,21,84
16,26,81
21,29,77
22,30,75
26,33,71
26,37,70
28,40,69
31,43,64
33,44,62
38,45,59
42,49,55
47,51,50
51,53,45
53,58,41
53,61,40
58,65,39
63,70,36
67,75,31
68,78,28
70,83,23
72,88,22
72,90,17
76,91,13
81,95,10
85,97,6
86,100,5
88,105,2
91,109,1
96,111,-1
97,115,-4
98,119,-5
100,120,-6
101,124,-8
102,126,-12
104,129,-16
104,131,-17
108,135,-19
112,140,-24
113,143,-25
113,144,-29
114,147,-34
114,148,-37
117,149,-42
122,151,-47
126,155,-48
131,156,-49
131,159,-53
136,160,-57
136,163,-58
136,165,-61
139,170,-65
141,172,-67
146,177,-68
147,181,-71
147,182,-73
152,186,-75
154,187,-76
159,192,-78
161,196,-83
163,197,-87
164,199,-91
167,203,-95
172,207,-98
173,209,-101
177,213,-105
180,214,-106
185,216,-111
186,219,-114
191,221,-116
195,222,-120
195,225,-125
199,226,-128
203,231,-129
207,236,-134
210,237,-139
213,242,-142
218,245,-143
221,247,-146
224,248,-150
226,253,-154
230,254,-158
232,259,-163
235,261,-168
238,266,-172
238,269,-176
239,272,-181
244,275,-184
245,278,-189
247,283,-193
250,286,-196
254,289,-201
258,291,-203
262,296,-208
264,301,-210
264,305,-213
Loading