Skip to content

Commit 75bc386

Browse files
authored
chore: address remaining feedback of #244 (feat!(insert): RowBinaryWithNamesAndTypes) (#302)
2 parents b5eaa6f + a3932a6 commit 75bc386

File tree

12 files changed

+792
-152
lines changed

12 files changed

+792
-152
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ jobs:
5959
- run: rustup override set nightly
6060
- run: rustup show active-toolchain -v
6161
# Serde 1.0.227 fails to build with `--cfg docsrs`, only pass it to our own packages
62-
- run: cargo rustdoc -p clickhouse-derive -- -D warnings --cfg docsrs
63-
- run: cargo rustdoc -p clickhouse-types -- -D warnings --cfg docsrs
64-
- run: cargo rustdoc -p clickhouse -- -D warnings --cfg docsrs
62+
- run: cargo rustdoc -p clickhouse-derive --all-features -- -D warnings --cfg docsrs
63+
- run: cargo rustdoc -p clickhouse-types --all-features -- -D warnings --cfg docsrs
64+
- run: cargo rustdoc -p clickhouse --all-features -- -D warnings --cfg docsrs
6565

6666
test:
6767
runs-on: ubuntu-latest

docker-compose.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
name: clickhouse-rs
22
services:
33
clickhouse:
4-
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}'
4+
# Note: use of a fully qualified url makes Podman happy without need for further configuration.
5+
image: 'docker.io/clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}'
56
container_name: 'clickhouse-rs-clickhouse-server'
67
environment:
78
CLICKHOUSE_SKIP_USER_SETUP: 1

src/insert.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use bytes::{Bytes, BytesMut};
1212
use clickhouse_types::put_rbwnat_columns_header;
1313
use hyper::{self, Request};
1414
use replace_with::replace_with_or_abort;
15-
use std::sync::Arc;
1615
use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Duration};
1716
use tokio::{
1817
task::JoinHandle,
@@ -34,11 +33,21 @@ const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capac
3433
/// Otherwise, the whole `INSERT` will be aborted.
3534
///
3635
/// Rows are being sent progressively to spread network load.
36+
///
37+
/// # Note: Metadata is Cached
38+
/// If [validation is enabled][Client::with_validation],
39+
/// this helper will query the metadata for the target table to learn the column names and types.
40+
///
41+
/// To avoid querying this metadata every time, it is cached within the [`Client`].
42+
///
43+
/// Any concurrent changes to the table schema may cause insert failures if the metadata
44+
/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`]
45+
/// after any changes to the current database schema.
3746
#[must_use]
3847
pub struct Insert<T> {
3948
state: InsertState,
4049
buffer: BytesMut,
41-
row_metadata: Option<Arc<RowMetadata>>,
50+
row_metadata: Option<RowMetadata>,
4251
#[cfg(feature = "lz4")]
4352
compression: Compression,
4453
send_timeout: Option<Duration>,
@@ -121,7 +130,7 @@ macro_rules! timeout {
121130
}
122131

123132
impl<T> Insert<T> {
124-
pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<Arc<RowMetadata>>) -> Self
133+
pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
125134
where
126135
T: Row,
127136
{

src/lib.rs

Lines changed: 125 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ pub use self::{
99
row::{Row, RowOwned, RowRead, RowWrite},
1010
};
1111
use self::{error::Result, http_client::HttpClient};
12-
use crate::row_metadata::RowMetadata;
13-
use crate::sql::Identifier;
12+
use crate::row_metadata::{AccessType, ColumnDefaultKind, InsertMetadata, RowMetadata};
1413

1514
#[doc = include_str!("row_derive.md")]
1615
pub use clickhouse_derive::Row;
17-
use clickhouse_types::parse_rbwnat_columns_header;
16+
use clickhouse_types::{Column, DataTypeNode};
1817

18+
use crate::_priv::row_insert_metadata_query;
1919
use std::{collections::HashMap, fmt::Display, sync::Arc};
2020
use tokio::sync::RwLock;
2121

@@ -61,7 +61,7 @@ pub struct Client {
6161
headers: HashMap<String, String>,
6262
products_info: Vec<ProductInfo>,
6363
validation: bool,
64-
row_metadata_cache: Arc<RowMetadataCache>,
64+
insert_metadata_cache: Arc<InsertMetadataCache>,
6565

6666
#[cfg(feature = "test-util")]
6767
mocked: bool,
@@ -107,13 +107,8 @@ impl Default for Client {
107107

108108
/// Cache for [`RowMetadata`] to avoid allocating it for the same struct more than once
109109
/// during the application lifecycle. Key: fully qualified table name (e.g. `database.table`).
110-
pub(crate) struct RowMetadataCache(RwLock<HashMap<String, Arc<RowMetadata>>>);
111-
112-
impl Default for RowMetadataCache {
113-
fn default() -> Self {
114-
RowMetadataCache(RwLock::new(HashMap::default()))
115-
}
116-
}
110+
#[derive(Default)]
111+
pub(crate) struct InsertMetadataCache(RwLock<HashMap<String, Arc<InsertMetadata>>>);
117112

118113
impl Client {
119114
/// Creates a new client with a specified underlying HTTP client.
@@ -130,33 +125,55 @@ impl Client {
130125
headers: HashMap::new(),
131126
products_info: Vec::default(),
132127
validation: true,
133-
row_metadata_cache: Arc::new(RowMetadataCache::default()),
128+
insert_metadata_cache: Arc::new(InsertMetadataCache::default()),
134129
#[cfg(feature = "test-util")]
135130
mocked: false,
136131
}
137132
}
138133

139134
/// Specifies ClickHouse's url. Should point to HTTP endpoint.
140135
///
136+
/// Automatically [clears the metadata cache][Self::clear_cached_metadata]
137+
/// for this instance only.
138+
///
141139
/// # Examples
142140
/// ```
143141
/// # use clickhouse::Client;
144142
/// let client = Client::default().with_url("http://localhost:8123");
145143
/// ```
146144
pub fn with_url(mut self, url: impl Into<String>) -> Self {
147145
self.url = url.into();
146+
147+
// `with_mock()` didn't exist previously, so to not break existing usages,
148+
// we need to be able to detect a mocked server using nothing but the URL.
149+
#[cfg(feature = "test-util")]
150+
if let Some(url) = test::Mock::mocked_url_to_real(&self.url) {
151+
self.url = url;
152+
self.mocked = true;
153+
}
154+
155+
// Assume our cached metadata is invalid.
156+
self.insert_metadata_cache = Default::default();
157+
148158
self
149159
}
150160

151161
/// Specifies a database name.
152162
///
163+
/// Automatically [clears the metadata cache][Self::clear_cached_metadata]
164+
/// for this instance only.
165+
///
153166
/// # Examples
154167
/// ```
155168
/// # use clickhouse::Client;
156169
/// let client = Client::default().with_database("test");
157170
/// ```
158171
pub fn with_database(mut self, database: impl Into<String>) -> Self {
159172
self.database = Some(database.into());
173+
174+
// Assume our cached metadata is invalid.
175+
self.insert_metadata_cache = Default::default();
176+
160177
self
161178
}
162179

@@ -347,8 +364,12 @@ impl Client {
347364
/// If `T` has unnamed fields, e.g. tuples.
348365
pub async fn insert<T: Row>(&self, table: &str) -> Result<insert::Insert<T>> {
349366
if self.get_validation() {
350-
let metadata = self.get_row_metadata_for_insert::<T>(table).await?;
351-
return Ok(insert::Insert::new(self, table, Some(metadata)));
367+
let metadata = self.get_insert_metadata(table).await?;
368+
return Ok(insert::Insert::new(
369+
self,
370+
table,
371+
Some(metadata.to_row::<T>()),
372+
));
352373
}
353374
Ok(insert::Insert::new(self, table, None))
354375
}
@@ -382,11 +403,36 @@ impl Client {
382403
/// in your specific use case. Additionally, writing smoke tests to ensure that
383404
/// the row types match the ClickHouse schema is highly recommended,
384405
/// if you plan to disable validation in your application.
406+
///
407+
/// # Note: Mocking
408+
/// When using [`test::Mock`] with the `test-util` feature, validation is forced off.
409+
///
410+
/// This applies either when using [`Client::with_mock()`], or [`Client::with_url()`]
411+
/// with a URL from [`test::Mock::url()`].
412+
///
413+
/// As of writing, the mocking facilities are unable to generate the `RowBinaryWithNamesAndTypes`
414+
/// header required for validation to function.
385415
pub fn with_validation(mut self, enabled: bool) -> Self {
386416
self.validation = enabled;
387417
self
388418
}
389419

420+
/// Clear table metadata that was previously received and cached.
421+
///
422+
/// [`Insert`][crate::insert::Insert] uses cached metadata when sending data with validation.
423+
/// If the table schema changes, this metadata needs to re-fetched.
424+
///
425+
/// This method clears the metadata cache, causing future insert queries to re-fetch metadata.
426+
/// This applies to all cloned instances of this `Client` (using the same URL and database)
427+
/// as well.
428+
///
429+
/// This may need to wait to acquire a lock if a query is concurrently writing into the cache.
430+
///
431+
/// Cancel-safe.
432+
pub async fn clear_cached_metadata(&self) {
433+
self.insert_metadata_cache.0.write().await.clear();
434+
}
435+
390436
/// Used internally to check if the validation mode is enabled,
391437
/// as it takes into account the `test-util` feature flag.
392438
#[inline]
@@ -413,43 +459,58 @@ impl Client {
413459
/// which is pointless in that kind of tests.
414460
#[cfg(feature = "test-util")]
415461
pub fn with_mock(mut self, mock: &test::Mock) -> Self {
416-
self.url = mock.url().to_string();
462+
self.url = mock.real_url().to_string();
417463
self.mocked = true;
418464
self
419465
}
420466

421-
async fn get_row_metadata_for_insert<T: Row>(
422-
&self,
423-
table_name: &str,
424-
) -> Result<Arc<RowMetadata>> {
425-
let read_lock = self.row_metadata_cache.0.read().await;
426-
match read_lock.get(table_name) {
427-
Some(metadata) => Ok(metadata.clone()),
428-
None => {
429-
drop(read_lock);
430-
// TODO: should it be moved to a cold function?
431-
let mut write_lock = self.row_metadata_cache.0.write().await;
432-
let db = match self.database {
433-
Some(ref db) => db,
434-
None => "default",
435-
};
436-
let mut bytes_cursor = self
437-
.query("SELECT * FROM ? LIMIT 0")
438-
.bind(Identifier(table_name))
439-
// don't allow to override the client database set in the client instance
440-
// with a `.with_option("database", "some_other_db")` call on the app side
441-
.with_option("database", db)
442-
.fetch_bytes("RowBinaryWithNamesAndTypes")?;
443-
let mut buffer = Vec::<u8>::new();
444-
while let Some(chunk) = bytes_cursor.next().await? {
445-
buffer.extend_from_slice(&chunk);
446-
}
447-
let columns = parse_rbwnat_columns_header(&mut buffer.as_slice())?;
448-
let metadata = Arc::new(RowMetadata::new_for_insert::<T>(columns));
449-
write_lock.insert(table_name.to_string(), metadata.clone());
450-
Ok(metadata)
467+
async fn get_insert_metadata(&self, table_name: &str) -> Result<Arc<InsertMetadata>> {
468+
{
469+
let read_lock = self.insert_metadata_cache.0.read().await;
470+
471+
// FIXME: `table_name` is not necessarily fully qualified here
472+
if let Some(metadata) = read_lock.get(table_name) {
473+
return Ok(metadata.clone());
451474
}
452475
}
476+
477+
// TODO: should it be moved to a cold function?
478+
let mut write_lock = self.insert_metadata_cache.0.write().await;
479+
let db = match self.database {
480+
Some(ref db) => db,
481+
None => "default",
482+
};
483+
484+
let mut columns_cursor = self
485+
.query(&row_insert_metadata_query(db, table_name))
486+
.fetch::<(String, String, String)>()?;
487+
488+
let mut columns = Vec::new();
489+
let mut column_default_kinds = Vec::new();
490+
let mut column_lookup = HashMap::new();
491+
492+
while let Some((name, type_, default_kind)) = columns_cursor.next().await? {
493+
let data_type = DataTypeNode::new(&type_)?;
494+
let default_kind = default_kind.parse::<ColumnDefaultKind>()?;
495+
496+
column_lookup.insert(name.clone(), columns.len());
497+
498+
columns.push(Column { name, data_type });
499+
500+
column_default_kinds.push(default_kind);
501+
}
502+
503+
let metadata = Arc::new(InsertMetadata {
504+
row_metadata: RowMetadata {
505+
columns,
506+
access_type: AccessType::WithSeqAccess, // ignored on insert
507+
},
508+
column_default_kinds,
509+
column_lookup,
510+
});
511+
512+
write_lock.insert(table_name.to_string(), metadata.clone());
513+
Ok(metadata)
453514
}
454515
}
455516

@@ -463,6 +524,25 @@ pub mod _priv {
463524
pub fn lz4_compress(uncompressed: &[u8]) -> super::Result<bytes::Bytes> {
464525
crate::compression::lz4::compress(uncompressed)
465526
}
527+
528+
// Also needed by `it::insert::cache_row_metadata()`
529+
pub fn row_insert_metadata_query(db: &str, table: &str) -> String {
530+
let mut out = "SELECT \
531+
name, \
532+
type, \
533+
default_kind \
534+
FROM system.columns \
535+
WHERE database = "
536+
.to_string();
537+
538+
crate::sql::escape::string(db, &mut out).unwrap();
539+
540+
out.push_str(" AND table = ");
541+
542+
crate::sql::escape::string(table, &mut out).unwrap();
543+
544+
out
545+
}
466546
}
467547

468548
#[cfg(test)]
@@ -643,32 +723,6 @@ mod client_tests {
643723
);
644724
}
645725

646-
#[tokio::test]
647-
async fn cache_row_metadata() {
648-
let client = Client::default()
649-
.with_url("http://localhost:8123")
650-
.with_database("system");
651-
652-
let metadata = client
653-
.get_row_metadata_for_insert::<SystemRolesRow>("roles")
654-
.await
655-
.unwrap();
656-
657-
assert_eq!(metadata.columns, SystemRolesRow::columns());
658-
assert_eq!(metadata.access_type, AccessType::WithSeqAccess);
659-
660-
// we can now use a dummy client, cause the metadata is cached,
661-
// and no calls to the database will be made
662-
client
663-
.with_url("whatever")
664-
.get_row_metadata_for_insert::<SystemRolesRow>("roles")
665-
.await
666-
.unwrap();
667-
668-
assert_eq!(metadata.columns, SystemRolesRow::columns());
669-
assert_eq!(metadata.access_type, AccessType::WithSeqAccess);
670-
}
671-
672726
#[test]
673727
fn it_does_follow_previous_configuration() {
674728
let client = Client::default().with_option("async_insert", "1");

0 commit comments

Comments
 (0)