Skip to content

Commit 279a09a

Browse files
e-dardalamb
authored andcommitted
test: test more than two partitions
1 parent 05ae7c3 commit 279a09a

File tree

1 file changed

+69
-9
lines changed

1 file changed

+69
-9
lines changed

datafusion/src/physical_plan/sort_preserving_merge.rs

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -649,8 +649,7 @@ mod tests {
649649
let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
650650

651651
_test_merge(
652-
b1,
653-
b2,
652+
&[vec![b1], vec![b2]],
654653
&[
655654
"+----+---+-------------------------------+",
656655
"| a | b | c |",
@@ -696,8 +695,7 @@ mod tests {
696695
let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
697696

698697
_test_merge(
699-
b1,
700-
b2,
698+
&[vec![b1], vec![b2]],
701699
&[
702700
"+-----+---+-------------------------------+",
703701
"| a | b | c |",
@@ -743,8 +741,7 @@ mod tests {
743741
let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
744742

745743
_test_merge(
746-
b1,
747-
b2,
744+
&[vec![b1], vec![b2]],
748745
&[
749746
"+----+---+-------------------------------+",
750747
"| a | b | c |",
@@ -765,8 +762,71 @@ mod tests {
765762
.await;
766763
}
767764

768-
async fn _test_merge(b1: RecordBatch, b2: RecordBatch, exp: &[&str]) {
769-
let schema = b1.schema();
765+
#[tokio::test]
766+
async fn test_merge_three_partitions() {
767+
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
768+
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
769+
Some("a"),
770+
Some("b"),
771+
Some("c"),
772+
Some("d"),
773+
Some("f"),
774+
]));
775+
let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8]));
776+
let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
777+
778+
let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30]));
779+
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
780+
Some("e"),
781+
Some("g"),
782+
Some("h"),
783+
Some("i"),
784+
Some("j"),
785+
]));
786+
let c: ArrayRef =
787+
Arc::new(TimestampNanosecondArray::from(vec![40, 60, 20, 20, 60]));
788+
let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
789+
790+
let a: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 700, 900, 300]));
791+
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
792+
Some("f"),
793+
Some("g"),
794+
Some("h"),
795+
Some("i"),
796+
Some("j"),
797+
]));
798+
let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6]));
799+
let b3 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
800+
801+
_test_merge(
802+
&[vec![b1], vec![b2], vec![b3]],
803+
&[
804+
"+-----+---+-------------------------------+",
805+
"| a | b | c |",
806+
"+-----+---+-------------------------------+",
807+
"| 1 | a | 1970-01-01 00:00:00.000000008 |",
808+
"| 2 | b | 1970-01-01 00:00:00.000000007 |",
809+
"| 7 | c | 1970-01-01 00:00:00.000000006 |",
810+
"| 9 | d | 1970-01-01 00:00:00.000000005 |",
811+
"| 10 | e | 1970-01-01 00:00:00.000000040 |",
812+
"| 100 | f | 1970-01-01 00:00:00.000000004 |",
813+
"| 3 | f | 1970-01-01 00:00:00.000000008 |",
814+
"| 200 | g | 1970-01-01 00:00:00.000000006 |",
815+
"| 20 | g | 1970-01-01 00:00:00.000000060 |",
816+
"| 700 | h | 1970-01-01 00:00:00.000000002 |",
817+
"| 70 | h | 1970-01-01 00:00:00.000000020 |",
818+
"| 900 | i | 1970-01-01 00:00:00.000000002 |",
819+
"| 90 | i | 1970-01-01 00:00:00.000000020 |",
820+
"| 300 | j | 1970-01-01 00:00:00.000000006 |",
821+
"| 30 | j | 1970-01-01 00:00:00.000000060 |",
822+
"+-----+---+-------------------------------+",
823+
],
824+
)
825+
.await;
826+
}
827+
828+
async fn _test_merge(partitions: &[Vec<RecordBatch>], exp: &[&str]) {
829+
let schema = partitions[0][0].schema();
770830
let sort = vec![
771831
PhysicalSortExpr {
772832
expr: col("b", &schema).unwrap(),
@@ -777,7 +837,7 @@ mod tests {
777837
options: Default::default(),
778838
},
779839
];
780-
let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
840+
let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
781841
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 1024));
782842

783843
let collected = collect(merge).await.unwrap();

0 commit comments

Comments
 (0)