Skip to content

Commit

Permalink
[test] add fuzz test for topk (#7772)
Browse files Browse the repository at this point in the history
Signed-off-by: reilly <tang.ruilin@foxmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
Tangruilin and alamb authored Oct 21, 2023
1 parent d6d3244 commit 5bdc9af
Showing 1 changed file with 244 additions and 64 deletions.
308 changes: 244 additions & 64 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,89 +22,100 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
use arrow_array::{Float64Array, StringArray};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::{
datasource::MemTable,
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
};
use datafusion_common::{
cast::{as_float64_array, as_string_array},
TableReference,
};
use datafusion_execution::memory_pool::GreedyMemoryPool;
use rand::Rng;
use datafusion_physical_expr::expressions::col;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::sync::Arc;
use test_utils::{batches_to_vec, partitions_to_sorted_vec};
use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};

const KB: usize = 1 << 10;
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_1k_mem() {
SortTest::new()
.with_int32_batches(5)
.with_pool_size(10240)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(20000)
.with_pool_size(10240)
.with_should_spill(true)
.run()
.await;

SortTest::new()
.with_int32_batches(1000000)
.with_pool_size(10240)
.with_should_spill(true)
.run()
.await;
for (batch_size, should_spill) in [(5, false), (20000, true), (1000000, true)] {
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(10 * KB)
.with_should_spill(should_spill)
.run()
.await;
}
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
SortTest::new()
.with_int32_batches(5)
.with_pool_size(102400)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(20000)
.with_pool_size(102400)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(1000000)
.with_pool_size(102400)
.with_should_spill(true)
.run()
.await;
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] {
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;
}
}

#[tokio::test]
async fn test_sort_unlimited_mem() {
SortTest::new()
.with_int32_batches(5)
.with_pool_size(usize::MAX)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(20000)
.with_pool_size(usize::MAX)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(1000000)
.with_pool_size(usize::MAX)
.with_should_spill(false)
.run()
.await;
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] {
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(usize::MAX)
.with_should_spill(should_spill)
.run()
.await;
}
}

#[tokio::test]
async fn test_sort_topk() {
for size in [10, 100, 1000, 10000, 1000000] {
let mut topk_scenario = TopKScenario::new()
.with_limit(10)
.with_table_name("t")
.with_col_name("x");

// test topk with i32
let collected_i32 = SortTest::new()
.with_input(topk_scenario.batches(size, ColType::I32))
.run_with_limit(&topk_scenario)
.await;
let actual = batches_to_vec(&collected_i32);
let excepted_i32 = topk_scenario.excepted_i32();
assert_eq!(actual, excepted_i32);

// test topk with f64
let collected_f64 = SortTest::new()
.with_input(topk_scenario.batches(size, ColType::F64))
.run_with_limit(&topk_scenario)
.await;
let actual: Vec<Option<f64>> = batches_to_f64_vec(&collected_f64);
let excepted_f64 = topk_scenario.excepted_f64();
assert_eq!(actual, excepted_f64);

// test topk with str
let collected_str = SortTest::new()
.with_input(topk_scenario.batches(size, ColType::Str))
.run_with_limit(&topk_scenario)
.await;
let actual: Vec<Option<&str>> = batches_to_str_vec(&collected_str);
let excepted_str = topk_scenario.excepted_str();
assert_eq!(actual, excepted_str);
}
}

#[derive(Debug, Default)]
Expand All @@ -121,6 +132,11 @@ impl SortTest {
Default::default()
}

fn with_input(mut self, batches: Vec<Vec<RecordBatch>>) -> Self {
self.input = batches.clone();
self
}

/// Create batches of int32 values of rows
fn with_int32_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_batches(rows)];
Expand All @@ -138,6 +154,44 @@ impl SortTest {
self
}

async fn run_with_limit<'a>(
&self,
topk_scenario: &TopKScenario<'a>,
) -> Vec<RecordBatch> {
let input = self.input.clone();
let schema = input
.iter()
.flat_map(|p| p.iter())
.next()
.expect("at least one batch")
.schema();

let table = MemTable::try_new(schema, input.clone()).unwrap();

let ctx = SessionContext::new();

ctx.register_table(
TableReference::Bare {
table: topk_scenario.table_name.into(),
},
Arc::new(table),
)
.unwrap();

let df = ctx
.table(topk_scenario.table_name)
.await
.unwrap()
.sort(vec![
datafusion_expr::col(topk_scenario.col_name).sort(true, true)
])
.unwrap()
.limit(0, Some(topk_scenario.limit))
.unwrap();

df.collect().await.unwrap()
}

/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) {
Expand Down Expand Up @@ -208,6 +262,109 @@ impl SortTest {
}
}

enum ColType {
I32,
F64,
Str,
}

struct TopKScenario<'a> {
limit: usize,
batches: Vec<Vec<RecordBatch>>,
table_name: &'a str,
col_name: &'a str,
}

impl<'a> TopKScenario<'a> {
fn new() -> Self {
TopKScenario {
limit: 0,
batches: vec![],
table_name: "",
col_name: "",
}
}

fn with_limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}

fn with_table_name(mut self, table_name: &'a str) -> Self {
self.table_name = table_name;
self
}

fn with_col_name(mut self, col_name: &'a str) -> Self {
self.col_name = col_name;
self
}

fn batches(&mut self, len: usize, t: ColType) -> Vec<Vec<RecordBatch>> {
let batches = match t {
ColType::I32 => make_staggered_i32_batches(len),
ColType::F64 => make_staggered_f64_batches(len),
ColType::Str => make_staggered_str_batches(len),
};
self.batches = vec![batches];
self.batches.clone()
}

fn excepted_i32(&self) -> Vec<Option<i32>> {
let excepted = partitions_to_sorted_vec(&self.batches);
excepted[0..self.limit].into()
}

fn excepted_f64(&self) -> Vec<Option<f64>> {
let mut excepted: Vec<Option<f64>> = self
.batches
.iter()
.flat_map(|batches| batches_to_f64_vec(batches).into_iter())
.collect();
excepted.sort_by(|a, b| a.partial_cmp(b).unwrap());
excepted[0..self.limit].into()
}

fn excepted_str(&self) -> Vec<Option<&str>> {
let mut excepted: Vec<Option<&str>> = self
.batches
.iter()
.flat_map(|batches| batches_to_str_vec(batches).into_iter())
.collect();
excepted.sort_unstable();
excepted[0..self.limit].into()
}
}

impl Default for TopKScenario<'_> {
fn default() -> Self {
Self::new()
}
}

fn make_staggered_f64_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = StdRng::seed_from_u64(100);
let remainder = RecordBatch::try_from_iter(vec![(
"x",
Arc::new(Float64Array::from_iter_values(
(0..len).map(|_| rng.gen_range(0.0..1000.7)),
)) as ArrayRef,
)])
.unwrap();
stagger_batch(remainder)
}

fn make_staggered_str_batches(len: usize) -> Vec<RecordBatch> {
let remainder = RecordBatch::try_from_iter(vec![(
"x",
Arc::new(StringArray::from_iter_values(
(0..len).map(|_| get_random_string(6)),
)) as ArrayRef,
)])
.unwrap();
stagger_batch(remainder)
}

/// Return randomly sized record batches in a field named 'x' of type `Int32`
/// with randomized i32 content
fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
Expand All @@ -232,3 +389,26 @@ fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
}
batches
}

/// Return random ASCII String with len
fn get_random_string(len: usize) -> String {
rand::thread_rng()
.sample_iter(rand::distributions::Alphanumeric)
.take(len)
.map(char::from)
.collect()
}

fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec<Option<f64>> {
batches
.iter()
.flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter())
.collect()
}

fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec<Option<&str>> {
batches
.iter()
.flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter())
.collect()
}

0 comments on commit 5bdc9af

Please sign in to comment.