Skip to content

Commit

Permalink
Merge pull request #96 from taosdata/fix/ts-2035-partially-update-one…
Browse files Browse the repository at this point in the history
…-record-multitimes

fix: use new write_raw_block_with_fields API if avaliable
  • Loading branch information
zitsen authored Nov 29, 2022
2 parents 7f41d79 + 1efbb66 commit 2063d9f
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 79 deletions.
58 changes: 0 additions & 58 deletions taos-optin/benches/is_null.rs

This file was deleted.

2 changes: 1 addition & 1 deletion taos-query/src/common/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ impl RawBlock {
Layout::from_bits(self.layout.borrow().as_inner()).unwrap()
}

fn fields(&self) -> Vec<Field> {
pub fn fields(&self) -> Vec<Field> {
self.schemas()
.iter()
.zip(self.field_names())
Expand Down
3 changes: 2 additions & 1 deletion taos-query/src/common/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ impl Timestamp {
Timestamp::Microseconds(raw) => chrono::Duration::microseconds(*raw),
Timestamp::Nanoseconds(raw) => chrono::Duration::nanoseconds(*raw),
};
chrono::NaiveDateTime::from_timestamp(0, 0)
chrono::NaiveDateTime::from_timestamp_opt(0, 0)
.expect("timestamp value could always be mapped to a chrono::NaiveDateTime")
.checked_add_signed(duration)
.unwrap()
}
Expand Down
11 changes: 10 additions & 1 deletion taos-sys/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::env;
use std::ffi::OsString;
use std::ffi::{c_int, OsString};
use std::fmt::Display;

fn get_env(name: &str) -> Option<OsString> {
Expand Down Expand Up @@ -58,6 +58,15 @@ fn taos_version() -> String {
unreachable!("the current os is not supported");
};
let lib = unsafe { libloading::Library::new(lib_name).unwrap() };
if unsafe {
lib.get::<libloading::Symbol<unsafe extern "C" fn() -> c_int>>(
b"taos_write_raw_block_with_fields\0",
)
}
.is_ok()
{
println!("cargo:rustc-cfg=taos_write_raw_block_with_fields");
}
let version = unsafe {
let version: libloading::Symbol<unsafe extern "C" fn() -> *const std::os::raw::c_char> =
lib.get(b"taos_get_client_info\0").unwrap();
Expand Down
37 changes: 30 additions & 7 deletions taos-sys/src/conn/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{ffi::CStr, os::raw::*};

use cfg_if::cfg_if;
use itertools::Itertools;
use taos_query::common::raw_data_t;
use taos_query::prelude::{Code, RawError as Error};
use taos_query::RawBlock;

use crate::tmq::taos_write_raw_block;
use crate::tmq::*;
use crate::{err_or, into_c_str::IntoCStr, query::QueryFuture};
use crate::{ffi::*, tmq::ffi::tmq_write_raw, RawRes, ResultSet};

Expand Down Expand Up @@ -143,12 +145,33 @@ impl RawTaos {
.table_name()
.ok_or_else(|| Error::new(Code::Failed, "raw block should have table name"))?;
let ptr = block.as_raw_bytes().as_ptr();
err_or!(taos_write_raw_block(
self.as_ptr(),
nrows as _,
ptr as _,
name.into_c_str().as_ptr()
))
// block;

let fields = block
.fields()
.into_iter()
.map(|field| field.into())
.collect_vec();

cfg_if! {
if #[cfg(taos_write_raw_block_with_fields)] {
err_or!(taos_write_raw_block_with_fields(
self.as_ptr(),
nrows as _,
ptr as _,
name.into_c_str().as_ptr(),
fields.as_ptr(),
fields.len() as _,
))
} else {
err_or!(taos_write_raw_block(
self.as_ptr(),
nrows as _,
ptr as _,
name.into_c_str().as_ptr()
))
}
}
}

#[inline]
Expand Down
16 changes: 15 additions & 1 deletion taos-sys/src/tmq/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::{borrow::Cow, os::raw::*};
use taos_macros::c_cfg;
use taos_query::{common::raw_data_t, prelude::RawError};

use crate::ffi::{TAOS, TAOS_RES};
use crate::{
ffi::{TAOS, TAOS_RES},
types::TAOS_FIELD,
};

#[repr(transparent)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -141,6 +144,17 @@ extern "C" {
pub fn tmq_get_res_type(res: *mut TAOS_RES) -> tmq_res_t;
}

#[cfg(taos_write_raw_block_with_fields)]
extern "C" {
pub fn taos_write_raw_block_with_fields(
taos: *mut TAOS,
nrows: i32,
ptr: *const c_char,
tbname: *const c_char,
fields: *const TAOS_FIELD,
num_of_fields: i32,
) -> i32;
}
#[cfg(not(taos_tmq))]
pub unsafe fn tmq_get_res_type(res: *mut TAOS_RES) -> tmq_res_t {
tmq_res_t::TMQ_RES_INVALID
Expand Down
24 changes: 14 additions & 10 deletions taos-sys/src/tmq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ mod tests {
Ok(())
}

/// Partial update a record with different columns.
#[tokio::test(flavor = "multi_thread")]
async fn test_ts2035() -> anyhow::Result<()> {
use taos_query::prelude::*;
Expand Down Expand Up @@ -962,14 +963,14 @@ mod tests {
}
MessageSet::Data(mut data) => {
let raw = data.as_raw_data().await?;
target
.write_raw_meta(unsafe { std::mem::transmute(raw) })
.await?;
// target
// .write_raw_meta(unsafe { std::mem::transmute(raw) })
// .await?;
// data message may have more than one data block for various tables.
while let Some(data) = data.next().transpose()? {
dbg!(data.table_name());
dbg!(&data);
// target.write_raw_block(&data).await?;
target.write_raw_block(&data).await?;
}
}
_ => (),
Expand All @@ -981,16 +982,19 @@ mod tests {

consumer.unsubscribe().await;

taos.exec_many(["drop topic sys_ts2035", "drop database sys_ts2035"])
.await?;

let (c1, c2) = target
.query_one::<_, (Option<i32>, Option<i32>)>("select c1, c2 from tb1")
.await?
.expect("should have data");
// todo: comment out for test.
// assert_eq!(c1, Some(0));
// assert_eq!(c2, Some(1));

assert_eq!(c1, Some(0));
assert_eq!(c2, Some(1));

taos.exec_many([
"drop topic sys_ts2035", // drop topic before dropping database
"drop database sys_ts2035",
])
.await?;
target.exec("drop database sys_ts2035_target").await?;
Ok(())
}
Expand Down
18 changes: 18 additions & 0 deletions taos-sys/src/types/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,21 @@ pub fn from_raw_fields(ptr: *const TAOS_FIELD, len: usize) -> Vec<Field> {
.map(Into::into)
.collect()
}

impl From<Field> for TAOS_FIELD {
fn from(value: Field) -> Self {
// let name = value.name().into_c_str().into_owned();
let name = value.name().as_bytes();
let mut field = TAOS_FIELD {
name: [0; 65],
type_: value.ty() as _,
bytes: value.bytes() as _,
};

unsafe {
std::ptr::copy_nonoverlapping(name.as_ptr(), field.name.as_mut_ptr() as _, name.len());
}

field
}
}

0 comments on commit 2063d9f

Please sign in to comment.