Skip to content

Commit

Permalink
Merge pull request #93 from taosdata/test/ts2035
Browse files Browse the repository at this point in the history
test: add test case for partial update block
  • Loading branch information
zitsen authored Nov 22, 2022
2 parents f406d51 + 622f052 commit eab128e
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions taos-sys/src/tmq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,20 @@ mod tests {
])
.await?;

let builder = TmqBuilder::from_dsn("taos:///?group.id=10&timeout=1000ms&experimental.snapshot.enable=false")?;
// target
let target = crate::TaosBuilder::from_dsn("taos:///")?.build()?;
target
.exec_many([
"drop database if exists sys_ts2035_target",
"create database sys_ts2035_target",
"use sys_ts2035_target",
"create table tb1 (ts timestamp, c1 int, c2 int)",
])
.await?;

let builder = TmqBuilder::from_dsn(
"taos:///?group.id=10&timeout=1000ms&experimental.snapshot.enable=false",
)?;
let mut consumer = builder.build()?;
consumer.subscribe(["sys_ts2035"]).await?;

Expand All @@ -948,10 +961,15 @@ mod tests {
unreachable!()
}
MessageSet::Data(mut data) => {
let raw = data.as_raw_data().await?;
target
.write_raw_meta(unsafe { std::mem::transmute(raw) })
.await?;
// data message may have more than one data block for various tables.
while let Some(data) = data.next().transpose()? {
dbg!(data.table_name());
dbg!(data);
dbg!(&data);
// target.write_raw_block(&data).await?;
}
}
_ => (),
Expand All @@ -965,6 +983,15 @@ mod tests {

taos.exec_many(["drop topic sys_ts2035", "drop database sys_ts2035"])
.await?;

let (c1, c2) = target
.query_one::<_, (Option<i32>, Option<i32>)>("select c1, c2 from tb1")
.await?
.expect("should have data");
// todo: comment out for test.
// assert_eq!(c1, Some(0));
// assert_eq!(c2, Some(1));
target.exec("drop database sys_ts2035_target").await?;
Ok(())
}
#[tokio::test]
Expand Down

0 comments on commit eab128e

Please sign in to comment.