Skip to content

Commit

Permalink
feat: Support DataTypes of Date and Time (apache#657)
Browse files Browse the repository at this point in the history
* add date type

* support date

* suppport date

* dev finish, ready to test

* impl time

* proto dependency

* import specify proto

* add some test

* remove blank lines

* remove blank lines

* remove blank lines

* fmt

* add test case

* fmt

* addd test case

* fmt

* udpate ut

* fix

* fix

* Update analytic_engine/src/table/data.rs

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>

* fix

* add some comment

* Update common_types/src/datum.rs

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>

* Update common_types/src/datum.rs

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>

* support negative time

* Update common_types/src/datum.rs

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>

* add some test

* support time

* test

* support mysql time

* format date  time

* add some test

* Update common_types/src/datum.rs

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>

* Update common_types/src/datum.rs

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>

* add some test

* Update common_types/src/datum.rs

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>

---------

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
  • Loading branch information
MichaelLeeHZ and ShiKaiWi authored Feb 24, 2023
1 parent bbe664b commit 6551a7e
Show file tree
Hide file tree
Showing 22 changed files with 824 additions and 152 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 25 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,11 +1152,32 @@ mod tests {

#[test]
fn test_split_record_batch_with_time_ranges() {
let rows0 = vec![build_row(b"binary key", 20, 10.0, "string value")];
let rows1 = vec![build_row(b"binary key1", 120, 11.0, "string value 1")];
let rows0 = vec![build_row(
b"binary key",
20,
10.0,
"string value",
1000,
1_000_000,
)];
let rows1 = vec![build_row(
b"binary key1",
120,
11.0,
"string value 1",
1000,
1_000_000,
)];
let rows2 = vec![
build_row_opt(b"binary key2", 220, None, Some("string value 2")),
build_row_opt(b"binary key3", 250, Some(13.0), None),
build_row_opt(
b"binary key2",
220,
None,
Some("string value 2"),
Some(1000),
None,
),
build_row_opt(b"binary key3", 250, Some(13.0), None, None, Some(1_000_000)),
];

let rows = vec![rows0.clone(), rows1.clone(), rows2.clone()]
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ mod tests {
}
builder
.add_normal_column(
column_schema::Builder::new("field3".to_string(), DatumKind::String)
column_schema::Builder::new("field5".to_string(), DatumKind::String)
.build()
.expect("should succeed build column schema"),
)
Expand Down
70 changes: 49 additions & 21 deletions analytic_engine/src/memtable/skiplist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ mod tests {
reverse: false,
},
vec![
build_row(b"a", 1, 10.0, "v1"),
build_row(b"b", 2, 10.0, "v2"),
build_row(b"c", 3, 10.0, "v3"),
build_row(b"d", 4, 10.0, "v4"),
build_row(b"e", 5, 10.0, "v5"),
build_row(b"f", 6, 10.0, "v6"),
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
build_row(b"b", 2, 10.0, "v2", 2000, 2_000_000),
build_row(b"c", 3, 10.0, "v3", 3000, 3_000_000),
build_row(b"d", 4, 10.0, "v4", 4000, 4_000_000),
build_row(b"e", 5, 10.0, "v5", 5000, 5_000_000),
build_row(b"f", 6, 10.0, "v6", 6000, 6_000_000),
],
),
(
Expand All @@ -225,10 +225,10 @@ mod tests {
reverse: false,
},
vec![
build_row(b"a", 1, 10.0, "v1"),
build_row(b"b", 2, 10.0, "v2"),
build_row(b"c", 3, 10.0, "v3"),
build_row(b"d", 4, 10.0, "v4"),
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
build_row(b"b", 2, 10.0, "v2", 2000, 2_000_000),
build_row(b"c", 3, 10.0, "v3", 3000, 3_000_000),
build_row(b"d", 4, 10.0, "v4", 4000, 4_000_000),
],
),
(
Expand All @@ -243,9 +243,9 @@ mod tests {
reverse: false,
},
vec![
build_row(b"a", 1, 10.0, "v1"),
build_row(b"b", 2, 10.0, "v2"),
build_row(b"c", 3, 10.0, "v3"),
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
build_row(b"b", 2, 10.0, "v2", 2000, 2_000_000),
build_row(b"c", 3, 10.0, "v3", 3000, 3_000_000),
],
),
];
Expand Down Expand Up @@ -303,17 +303,45 @@ mod tests {

let mut ctx = PutContext::new(IndexInWriterSchema::for_same_schema(schema.num_columns()));
let input = vec![
(KeySequence::new(1, 1), build_row(b"a", 1, 10.0, "v1")),
(KeySequence::new(1, 2), build_row(b"b", 2, 10.0, "v2")),
(
KeySequence::new(1, 1),
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
),
(
KeySequence::new(1, 2),
build_row(b"b", 2, 10.0, "v2", 2000, 2_000_000),
),
(
KeySequence::new(1, 3),
build_row(b"c", 3, 10.0, "primary_key same with next row"),
build_row(
b"c",
3,
10.0,
"primary_key same with next row",
3000,
3_000_000,
),
),
(
KeySequence::new(1, 4),
build_row(b"c", 3, 10.0, "v3", 3000, 3_000_000),
),
(
KeySequence::new(2, 1),
build_row(b"d", 4, 10.0, "v4", 4000, 4_000_000),
),
(
KeySequence::new(2, 1),
build_row(b"e", 5, 10.0, "v5", 5000, 5_000_000),
),
(
KeySequence::new(2, 3),
build_row(b"f", 6, 10.0, "v6", 6000, 6_000_000),
),
(
KeySequence::new(3, 4),
build_row(b"g", 7, 10.0, "v7", 7000, 7_000_000),
),
(KeySequence::new(1, 4), build_row(b"c", 3, 10.0, "v3")),
(KeySequence::new(2, 1), build_row(b"d", 4, 10.0, "v4")),
(KeySequence::new(2, 1), build_row(b"e", 5, 10.0, "v5")),
(KeySequence::new(2, 3), build_row(b"f", 6, 10.0, "v6")),
(KeySequence::new(3, 4), build_row(b"g", 7, 10.0, "v7")),
];

for (seq, row) in input {
Expand Down
30 changes: 24 additions & 6 deletions analytic_engine/src/row_iter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,22 @@ mod tests {
async fn test_chain_multiple_streams() {
let testcases = vec![
// (sequence, rows)
(10, vec![build_row(b"key4", 1000000, 10.0, "v4")]),
(20, vec![build_row(b"key2", 1000000, 10.0, "v2")]),
(100, vec![build_row(b"key3", 1000000, 10.0, "v3")]),
(1, vec![build_row(b"key1", 1000000, 10.0, "v1")]),
(
10,
vec![build_row(b"key4", 1000000, 10.0, "v4", 1000, 1_000_000)],
),
(
20,
vec![build_row(b"key2", 1000000, 10.0, "v2", 2000, 2_000_000)],
),
(
100,
vec![build_row(b"key3", 1000000, 10.0, "v3", 3000, 3_000_000)],
),
(
1,
vec![build_row(b"key1", 1000000, 10.0, "v1", 4000, 4_000_000)],
),
];
run_and_check(testcases).await;
}
Expand All @@ -364,10 +376,16 @@ mod tests {
async fn test_chain_half_empty_streams() {
let testcases = vec![
// (sequence, rows)
(10, vec![build_row(b"key4", 1000000, 10.0, "v4")]),
(
10,
vec![build_row(b"key4", 1000000, 10.0, "v4", 1000, 1_000_000)],
),
(20, vec![]),
(100, vec![]),
(1, vec![build_row(b"key1", 1000000, 10.0, "v1")]),
(
1,
vec![build_row(b"key1", 1000000, 10.0, "v1", 1000, 1_000_000)],
),
];
run_and_check(testcases).await;
}
Expand Down
22 changes: 11 additions & 11 deletions analytic_engine/src/row_iter/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,18 @@ mod tests {
build_record_batch_with_key(
schema.clone(),
vec![
build_row(b"a", 1, 10.0, "v1"),
build_row(b"a", 1, 10.0, "v"),
build_row(b"a", 2, 10.0, "v2"),
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
build_row(b"a", 1, 10.0, "v", 1000, 1_000_000),
build_row(b"a", 2, 10.0, "v2", 2000, 2_000_000),
],
),
build_record_batch_with_key(
schema,
vec![
build_row(b"a", 2, 10.0, "v"),
build_row(b"a", 3, 10.0, "v3"),
build_row(b"a", 3, 10.0, "v"),
build_row(b"a", 4, 10.0, "v4"),
build_row(b"a", 2, 10.0, "v", 2000, 2_000_000),
build_row(b"a", 3, 10.0, "v3", 3000, 3_000_000),
build_row(b"a", 3, 10.0, "v", 3000, 3_000_000),
build_row(b"a", 4, 10.0, "v4", 4000, 4_000_000),
],
),
],
Expand All @@ -233,10 +233,10 @@ mod tests {
check_iterator(
&mut iter,
vec![
build_row(b"a", 1, 10.0, "v1"),
build_row(b"a", 2, 10.0, "v2"),
build_row(b"a", 3, 10.0, "v3"),
build_row(b"a", 4, 10.0, "v4"),
build_row(b"a", 1, 10.0, "v1", 1000, 1_000_000),
build_row(b"a", 2, 10.0, "v2", 2000, 2_000_000),
build_row(b"a", 3, 10.0, "v3", 3000, 3_000_000),
build_row(b"a", 4, 10.0, "v4", 4000, 4_000_000),
],
)
.await;
Expand Down
Loading

0 comments on commit 6551a7e

Please sign in to comment.