From 708855b7e770868e46496bdc5bd7f57c1d9ba88a Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 8 Aug 2024 16:21:33 -0700 Subject: [PATCH] feat: Add last cache create/delete to WAL This moves the LastCacheDefinition into the WAL so that it can be serialized there. This ended up being a pretty large refactor to get the last cache creation to work through the WAL. I think I also stumbled on a bug where the last cache wasn't getting initialized from the catalog on reboot so that it wouldn't actually end up caching values. The refactored last cache persistence test in write_buffer/mod.rs surfaced this. Finally, I also had to update the WAL so that it would persist if there were only catalog updates and no writes. Fixes #25203 --- influxdb3_catalog/src/catalog.rs | 147 ++---------- influxdb3_catalog/src/serialize.rs | 4 +- influxdb3_server/src/http.rs | 3 +- .../src/system_tables/last_caches.rs | 2 +- influxdb3_wal/src/lib.rs | 135 +++++++++++ influxdb3_wal/src/object_store.rs | 7 +- influxdb3_write/src/last_cache/mod.rs | 166 ++++++++------ influxdb3_write/src/lib.rs | 5 +- influxdb3_write/src/write_buffer/mod.rs | 211 +++++++++--------- .../src/write_buffer/queryable_buffer.rs | 67 ++++-- ...time-to-persist-segments-after-delete.snap | 49 ---- ...fter-last-cache-create-and-new-field.snap} | 3 +- ...g-immediately-after-last-cache-create.snap | 2 +- ...g-immediately-after-last-cache-delete.snap | 2 +- influxdb3_write/src/write_buffer/validator.rs | 12 +- 15 files changed, 429 insertions(+), 386 deletions(-) delete mode 100644 influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-delete.snap rename influxdb3_write/src/write_buffer/snapshots/{influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-create.snap => influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap} (97%) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index c6b999b3fc7..ff6a3fb54cf 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -1,6 +1,6 @@ //! Implementation of the Catalog that sits entirely in memory. -use influxdb3_wal::{CatalogBatch, CatalogOp}; +use influxdb3_wal::{CatalogBatch, CatalogOp, LastCacheDefinition}; use influxdb_line_protocol::FieldValue; use observability_deps::tracing::info; use parking_lot::RwLock; @@ -32,9 +32,6 @@ pub enum Error { Catalog::NUM_DBS_LIMIT )] TooManyDbs, - - #[error("last cache size must be from 1 to 10")] - InvalidLastCacheSize, } pub type Result = std::result::Result; @@ -337,6 +334,25 @@ impl DatabaseSchema { CatalogOp::CreateDatabase(_) => { // Do nothing } + CatalogOp::CreateLastCache(definition) => { + let table_namme: Arc = definition.table.as_str().into(); + let table = tables.get_mut(table_namme.as_ref()); + match table { + Some(table) => { + table + .last_caches + .insert(definition.name.clone(), definition.clone()); + } + None => panic!("table must exist before last cache creation"), + } + } + CatalogOp::DeleteLastCache(definition) => { + let table_namme: Arc = definition.table.as_str().into(); + let table = tables.get_mut(table_namme.as_ref()); + if let Some(table) = table { + table.last_caches.remove(&definition.name); + } + } } } @@ -503,129 +519,6 @@ impl TableDefinition { } } -/// Defines a last cache in a given table and database -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] -pub struct LastCacheDefinition { - /// The table name the cache is associated with - pub table: String, - /// Given name of the cache - pub name: String, - /// Columns intended to be used as predicates in the cache - pub key_columns: Vec, - /// Columns that store values in the cache - pub value_columns: LastCacheValueColumnsDef, - /// The number of last values to hold in the cache - pub count: LastCacheSize, - /// The time-to-live (TTL) in seconds for entries in the cache - pub ttl: u64, -} - -impl LastCacheDefinition { - /// Create a new [`LastCacheDefinition`] with explicit value columns - pub fn new_with_explicit_value_columns( - table: impl Into, - name: impl Into, - key_columns: impl IntoIterator>, - value_columns: impl IntoIterator>, - count: usize, - ttl: u64, - ) -> Result { - Ok(Self { - table: table.into(), - name: name.into(), - key_columns: key_columns.into_iter().map(Into::into).collect(), - value_columns: LastCacheValueColumnsDef::Explicit { - columns: value_columns.into_iter().map(Into::into).collect(), - }, - count: count.try_into()?, - ttl, - }) - } - - /// Create a new [`LastCacheDefinition`] with explicit value columns - pub fn new_all_non_key_value_columns( - table: impl Into, - name: impl Into, - key_columns: impl IntoIterator>, - count: usize, - ttl: u64, - ) -> Result { - Ok(Self { - table: table.into(), - name: name.into(), - key_columns: key_columns.into_iter().map(Into::into).collect(), - value_columns: LastCacheValueColumnsDef::AllNonKeyColumns, - count: count.try_into()?, - ttl, - }) - } -} - -/// A last cache will either store values for an explicit set of columns, or will accept all -/// non-key columns -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum LastCacheValueColumnsDef { - /// Explicit list of column names - Explicit { columns: Vec }, - /// Stores all non-key columns - AllNonKeyColumns, -} - -/// The maximum allowed size for a last cache -pub const LAST_CACHE_MAX_SIZE: usize = 10; - -/// The size of the last cache -/// -/// Must be between 1 and [`LAST_CACHE_MAX_SIZE`] -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)] -pub struct LastCacheSize(usize); - -impl LastCacheSize { - pub fn new(size: usize) -> Result { - if size == 0 || size > LAST_CACHE_MAX_SIZE { - Err(Error::InvalidLastCacheSize) - } else { - Ok(Self(size)) - } - } -} - -impl TryFrom for LastCacheSize { - type Error = Error; - - fn try_from(value: usize) -> Result { - Self::new(value) - } -} - -impl From for usize { - fn from(value: LastCacheSize) -> Self { - value.0 - } -} - -impl From for u64 { - fn from(value: LastCacheSize) -> Self { - value - .0 - .try_into() - .expect("usize fits into a 64 bit unsigned integer") - } -} - -impl PartialEq for LastCacheSize { - fn eq(&self, other: &usize) -> bool { - self.0.eq(other) - } -} - -impl PartialEq for usize { - fn eq(&self, other: &LastCacheSize) -> bool { - self.eq(&other.0) - } -} - pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnType { match fv { FieldValue::I64(_) => InfluxColumnType::Field(InfluxFieldType::Integer), diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index 2e1848703ef..635762e4a7b 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -1,10 +1,10 @@ +use crate::catalog::TableDefinition; use arrow::datatypes::DataType as ArrowDataType; +use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef}; use schema::{InfluxColumnType, SchemaBuilder}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -use crate::catalog::{LastCacheDefinition, LastCacheValueColumnsDef, TableDefinition}; - impl Serialize for TableDefinition { fn serialize(&self, serializer: S) -> Result where diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 7524c410598..5b4f45e2f0d 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -20,8 +20,9 @@ use hyper::header::CONTENT_TYPE; use hyper::http::HeaderValue; use hyper::HeaderMap; use hyper::{Body, Method, Request, Response, StatusCode}; -use influxdb3_catalog::catalog::{Error as CatalogError, LastCacheDefinition}; +use influxdb3_catalog::catalog::Error as CatalogError; use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION}; +use influxdb3_wal::LastCacheDefinition; use influxdb3_write::last_cache; use influxdb3_write::persister::TrackedMemoryArrowWriter; use influxdb3_write::write_buffer::Error as WriteBufferError; diff --git a/influxdb3_server/src/system_tables/last_caches.rs b/influxdb3_server/src/system_tables/last_caches.rs index 627e4a0607f..170dbc6aa1b 100644 --- a/influxdb3_server/src/system_tables/last_caches.rs +++ b/influxdb3_server/src/system_tables/last_caches.rs @@ -4,7 +4,7 @@ use arrow::array::{GenericListBuilder, StringBuilder}; use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::{error::DataFusionError, logical_expr::Expr}; -use influxdb3_catalog::catalog::{LastCacheDefinition, LastCacheValueColumnsDef}; +use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef}; use influxdb3_write::last_cache::LastCacheProvider; use iox_system_tables::IoxSystemTable; diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 58627e6c007..2d20474c3d6 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -44,6 +44,9 @@ pub enum Error { #[error("invalid level 0 duration {0}. Must be one of 1m, 5m, 10m")] InvalidLevel0Duration(String), + + #[error("last cache size must be from 1 to 10")] + InvalidLastCacheSize, } pub type Result = std::result::Result; @@ -203,6 +206,7 @@ pub enum WalOp { #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct CatalogBatch { pub database_name: Arc, + pub time_ns: i64, pub ops: Vec, } @@ -211,6 +215,8 @@ pub enum CatalogOp { CreateDatabase(DatabaseDefinition), CreateTable(TableDefinition), AddFields(FieldAdditions), + CreateLastCache(LastCacheDefinition), + DeleteLastCache(LastCacheDelete), } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -281,6 +287,135 @@ impl From for InfluxColumnType { } } +/// Defines a last cache in a given table and database +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct LastCacheDefinition { + /// The table name the cache is associated with + pub table: String, + /// Given name of the cache + pub name: String, + /// Columns intended to be used as predicates in the cache + pub key_columns: Vec, + /// Columns that store values in the cache + pub value_columns: LastCacheValueColumnsDef, + /// The number of last values to hold in the cache + pub count: LastCacheSize, + /// The time-to-live (TTL) in seconds for entries in the cache + pub ttl: u64, +} + +impl LastCacheDefinition { + /// Create a new [`LastCacheDefinition`] with explicit value columns + pub fn new_with_explicit_value_columns( + table: impl Into, + name: impl Into, + key_columns: impl IntoIterator>, + value_columns: impl IntoIterator>, + count: usize, + ttl: u64, + ) -> Result { + Ok(Self { + table: table.into(), + name: name.into(), + key_columns: key_columns.into_iter().map(Into::into).collect(), + value_columns: LastCacheValueColumnsDef::Explicit { + columns: value_columns.into_iter().map(Into::into).collect(), + }, + count: count.try_into()?, + ttl, + }) + } + + /// Create a new [`LastCacheDefinition`] with explicit value columns + pub fn new_all_non_key_value_columns( + table: impl Into, + name: impl Into, + key_columns: impl IntoIterator>, + count: usize, + ttl: u64, + ) -> Result { + Ok(Self { + table: table.into(), + name: name.into(), + key_columns: key_columns.into_iter().map(Into::into).collect(), + value_columns: LastCacheValueColumnsDef::AllNonKeyColumns, + count: count.try_into()?, + ttl, + }) + } +} + +/// A last cache will either store values for an explicit set of columns, or will accept all +/// non-key columns +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum LastCacheValueColumnsDef { + /// Explicit list of column names + Explicit { columns: Vec }, + /// Stores all non-key columns + AllNonKeyColumns, +} + +/// The maximum allowed size for a last cache +pub const LAST_CACHE_MAX_SIZE: usize = 10; + +/// The size of the last cache +/// +/// Must be between 1 and [`LAST_CACHE_MAX_SIZE`] +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)] +pub struct LastCacheSize(usize); + +impl LastCacheSize { + pub fn new(size: usize) -> Result { + if size == 0 || size > LAST_CACHE_MAX_SIZE { + Err(Error::InvalidLastCacheSize) + } else { + Ok(Self(size)) + } + } +} + +impl TryFrom for LastCacheSize { + type Error = Error; + + fn try_from(value: usize) -> Result { + Self::new(value) + } +} + +impl From for usize { + fn from(value: LastCacheSize) -> Self { + value.0 + } +} + +impl From for u64 { + fn from(value: LastCacheSize) -> Self { + value + .0 + .try_into() + .expect("usize fits into a 64 bit unsigned integer") + } +} + +impl PartialEq for LastCacheSize { + fn eq(&self, other: &usize) -> bool { + self.0.eq(other) + } +} + +impl PartialEq for usize { + fn eq(&self, other: &LastCacheSize) -> bool { + self.eq(&other.0) + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct LastCacheDelete { + pub table: String, + pub name: String, +} + #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct WriteBatch { pub database_name: Arc, diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 4b1805202df..b30c5b010d4 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -467,7 +467,7 @@ struct WalBuffer { impl WalBuffer { fn is_empty(&self) -> bool { - self.database_to_write_batch.is_empty() + self.database_to_write_batch.is_empty() && self.catalog_batches.is_empty() } } @@ -537,6 +537,11 @@ impl WalBuffer { max_timestamp_ns = max_timestamp_ns.max(write_batch.max_time_ns); } + for catalog_batch in &self.catalog_batches { + min_timestamp_ns = min_timestamp_ns.min(catalog_batch.time_ns); + max_timestamp_ns = max_timestamp_ns.max(catalog_batch.time_ns); + } + // have the catalog ops come before any writes in ordering let mut ops = Vec::with_capacity(self.database_to_write_batch.len() + self.catalog_batches.len()); diff --git a/influxdb3_write/src/last_cache/mod.rs b/influxdb3_write/src/last_cache/mod.rs index 95928efc997..776809c4def 100644 --- a/influxdb3_write/src/last_cache/mod.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -4,6 +4,7 @@ use std::{ time::{Duration, Instant}, }; +use arrow::datatypes::SchemaRef; use arrow::{ array::{ new_null_array, ArrayRef, BooleanBuilder, Float64Builder, GenericByteDictionaryBuilder, @@ -22,8 +23,11 @@ use datafusion::{ }; use hashbrown::{HashMap, HashSet}; use indexmap::{IndexMap, IndexSet}; -use influxdb3_catalog::catalog::{LastCacheDefinition, LastCacheSize, LastCacheValueColumnsDef}; -use influxdb3_wal::{Field, FieldData, Row, WalContents, WalOp}; +use influxdb3_catalog::catalog::InnerCatalog; +use influxdb3_wal::{ + Field, FieldData, LastCacheDefinition, LastCacheSize, LastCacheValueColumnsDef, Row, + WalContents, WalOp, +}; use iox_time::Time; use parking_lot::RwLock; use schema::{InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME}; @@ -113,10 +117,7 @@ impl LastCacheProvider { } /// Initialize a [`LastCacheProvider`] from a [`InnerCatalog`] - #[cfg(test)] - pub(crate) fn new_from_catalog( - catalog: &influxdb3_catalog::catalog::InnerCatalog, - ) -> Result { + pub(crate) fn new_from_catalog(catalog: &InnerCatalog) -> Result { let provider = LastCacheProvider::new(); for db_schema in catalog.databases() { for tbl_def in db_schema.tables() { @@ -244,59 +245,15 @@ impl LastCacheProvider { format!("{tbl_name}_{keys}_last_cache", keys = key_columns.join("_")) }); - let (value_columns, accept_new_fields) = if let Some(mut vals) = value_columns { - // if value columns are specified, check that they are present in the table schema - for name in vals.iter() { - if schema.field_by_name(name).is_none() { - return Err(Error::ValueColumnDoesNotExist { - column_name: name.into(), - }); - } - } - // double-check that time column is included - let time_col = TIME_COLUMN_NAME.to_string(); - if !vals.contains(&time_col) { - vals.push(time_col); - } - (vals, false) - } else { - // default to all non-key columns - ( - schema - .iter() - .filter_map(|(_, f)| { - if key_columns.contains(f.name()) { - None - } else { - Some(f.name().to_string()) - } - }) - .collect::>(), - true, - ) + let accept_new_fields = value_columns.is_none(); + let last_cache_value_columns_def = match &value_columns { + None => LastCacheValueColumnsDef::AllNonKeyColumns, + Some(cols) => LastCacheValueColumnsDef::Explicit { + columns: cols.clone(), + }, }; - let mut schema_builder = ArrowSchemaBuilder::new(); - // Add key columns first: - for (t, field) in schema - .iter() - .filter(|&(_, f)| key_columns.contains(f.name())) - { - if let InfluxColumnType::Tag = t { - // override tags with string type in the schema, because the KeyValue type stores - // them as strings, and produces them as StringArray when creating RecordBatches: - schema_builder.push(ArrowField::new(field.name(), DataType::Utf8, false)) - } else { - schema_builder.push(field.clone()); - }; - } - // Add value columns second: - for (_, field) in schema - .iter() - .filter(|&(_, f)| value_columns.contains(f.name())) - { - schema_builder.push(field.clone()); - } + let cache_schema = self.last_cache_schema_from_schema(&schema, &key_columns, value_columns); let series_key = schema .series_key() @@ -312,7 +269,7 @@ impl LastCacheProvider { count, ttl, key_columns.clone(), - Arc::new(schema_builder.finish()), + cache_schema, series_key, accept_new_fields, ); @@ -341,18 +298,91 @@ impl LastCacheProvider { table: tbl_name, name: cache_name, key_columns, - value_columns: if accept_new_fields { - LastCacheValueColumnsDef::AllNonKeyColumns - } else { - LastCacheValueColumnsDef::Explicit { - columns: value_columns, - } - }, + value_columns: last_cache_value_columns_def, count, ttl: ttl.as_secs(), })) } + fn last_cache_schema_from_schema( + &self, + schema: &Schema, + key_columns: &[String], + value_columns: Option>, + ) -> SchemaRef { + let mut schema_builder = ArrowSchemaBuilder::new(); + // Add key columns first: + for (t, field) in schema + .iter() + .filter(|&(_, f)| key_columns.contains(f.name())) + { + if let InfluxColumnType::Tag = t { + // override tags with string type in the schema, because the KeyValue type stores + // them as strings, and produces them as StringArray when creating RecordBatches: + schema_builder.push(ArrowField::new(field.name(), DataType::Utf8, false)) + } else { + schema_builder.push(field.clone()); + }; + } + // Add value columns second: + match value_columns { + Some(cols) => { + for (_, field) in schema + .iter() + .filter(|&(_, f)| cols.contains(f.name()) || f.name() == TIME_COLUMN_NAME) + { + schema_builder.push(field.clone()); + } + } + None => { + for (_, field) in schema + .iter() + .filter(|&(_, f)| !key_columns.contains(f.name())) + { + schema_builder.push(field.clone()); + } + } + } + + Arc::new(schema_builder.finish()) + } + + pub fn create_cache_from_definition( + &self, + db_name: &str, + schema: &Schema, + definition: &LastCacheDefinition, + ) { + let value_columns = match &definition.value_columns { + LastCacheValueColumnsDef::AllNonKeyColumns => None, + LastCacheValueColumnsDef::Explicit { columns } => Some(columns.clone()), + }; + let accept_new_fields = value_columns.is_none(); + let series_key = schema + .series_key() + .map(|keys| keys.into_iter().map(|s| s.to_string()).collect()); + + let schema = + self.last_cache_schema_from_schema(schema, &definition.key_columns, value_columns); + + let last_cache = LastCache::new( + definition.count, + Duration::from_secs(definition.ttl), + definition.key_columns.clone(), + schema, + series_key, + accept_new_fields, + ); + + let mut lock = self.cache_map.write(); + + lock.entry(db_name.to_string()) + .or_default() + .entry_ref(&definition.table) + .or_default() + .insert(definition.name.clone(), last_cache); + } + /// Delete a cache from the provider /// /// This will also clean up empty levels in the provider hierarchy, so if there are no more @@ -1545,10 +1575,8 @@ mod tests { use ::object_store::{memory::InMemory, ObjectStore}; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use data_types::NamespaceName; - use influxdb3_catalog::catalog::{ - Catalog, DatabaseSchema, LastCacheDefinition, TableDefinition, - }; - use influxdb3_wal::WalConfig; + use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; + use influxdb3_wal::{LastCacheDefinition, WalConfig}; use insta::assert_json_snapshot; use iox_time::{MockProvider, Time}; diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index dc11d75acbc..af2a51f4a18 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -21,8 +21,7 @@ use datafusion::execution::context::SessionState; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::Expr; use influxdb3_catalog::catalog; -use influxdb3_catalog::catalog::LastCacheDefinition; -use influxdb3_wal::WalFileSequenceNumber; +use influxdb3_wal::{LastCacheDefinition, WalFileSequenceNumber}; use iox_query::QueryChunk; use iox_time::Time; use last_cache::LastCacheProvider; @@ -364,7 +363,7 @@ mod test_helpers { lp: &str, ) -> WriteBatch { let db_name = NamespaceName::new(db_name).unwrap(); - let result = WriteValidator::initialize(db_name.clone(), catalog) + let result = WriteValidator::initialize(db_name.clone(), catalog, 0) .unwrap() .v1_parse_lines_and_update_schema(lp, false) .unwrap() diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 8a2ef476191..df19481a524 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -23,9 +23,13 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::SessionState; use datafusion::logical_expr::Expr; use datafusion::physical_plan::SendableRecordBatchStream; -use influxdb3_catalog::catalog::{Catalog, LastCacheDefinition}; +use influxdb3_catalog::catalog::Catalog; use influxdb3_wal::object_store::WalObjectStore; -use influxdb3_wal::{Wal, WalConfig, WalFileNotifier, WalOp}; +use influxdb3_wal::CatalogOp::CreateLastCache; +use influxdb3_wal::{ + CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, Wal, WalConfig, WalFileNotifier, + WalOp, +}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::QueryChunk; use iox_time::{Time, TimeProvider}; @@ -114,8 +118,6 @@ impl WriteBufferImpl { executor: Arc, wal_config: WalConfig, ) -> Result { - let last_cache = Arc::new(LastCacheProvider::new()); - // load up the catalog, the snapshots, and replay the wal into the in memory buffer let catalog = persister.load_catalog().await?; let catalog = Arc::new( @@ -123,6 +125,9 @@ impl WriteBufferImpl { .map(|c| Catalog::from_inner(c.catalog)) .unwrap_or_else(Catalog::new), ); + + let last_cache = Arc::new(LastCacheProvider::new_from_catalog(&catalog.clone_inner())?); + let persisted_snapshots = persister.load_snapshots(1000).await?; let last_snapshot_wal_sequence = persisted_snapshots .first() @@ -182,9 +187,13 @@ impl WriteBufferImpl { // validated lines will update the in-memory catalog, ensuring that all write operations // past this point will be infallible - let result = WriteValidator::initialize(db_name.clone(), self.catalog())? - .v1_parse_lines_and_update_schema(lp, accept_partial)? - .convert_lines_to_buffer(ingest_time, self.wal_config.level_0_duration, precision); + let result = WriteValidator::initialize( + db_name.clone(), + self.catalog(), + ingest_time.timestamp_nanos(), + )? + .v1_parse_lines_and_update_schema(lp, accept_partial)? + .convert_lines_to_buffer(ingest_time, self.wal_config.level_0_duration, precision); // if there were catalog updates, ensure they get persisted to the wal, so they're // replayed on restart @@ -220,9 +229,13 @@ impl WriteBufferImpl { ) -> Result { // validated lines will update the in-memory catalog, ensuring that all write operations // past this point will be infallible - let result = WriteValidator::initialize(db_name.clone(), self.catalog())? - .v3_parse_lines_and_update_schema(lp, accept_partial)? - .convert_lines_to_buffer(ingest_time, self.wal_config.level_0_duration, precision); + let result = WriteValidator::initialize( + db_name.clone(), + self.catalog(), + ingest_time.timestamp_nanos(), + )? + .v3_parse_lines_and_update_schema(lp, accept_partial)? + .convert_lines_to_buffer(ingest_time, self.wal_config.level_0_duration, precision); // if there were catalog updates, ensure they get persisted to the wal, so they're // replayed on restart @@ -510,6 +523,7 @@ impl LastCacheManager for WriteBufferImpl { .ok_or(Error::TableDoesNotExist)? .schema() .clone(); + if let Some(info) = self.last_cache.create_cache(CreateCacheArguments { db_name: db_name.to_string(), tbl_name: tbl_name.to_string(), @@ -520,14 +534,14 @@ impl LastCacheManager for WriteBufferImpl { key_columns, value_columns, })? { - let last_wal_file_number = self.wal.last_sequence_number().await; self.catalog.add_last_cache(db_name, tbl_name, info.clone()); + let add_cache_catalog_batch = WalOp::Catalog(CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_name: Arc::clone(&db_schema.name), + ops: vec![CreateLastCache(info.clone())], + }); + self.wal.write_ops(vec![add_cache_catalog_batch]).await?; - let inner_catalog = catalog.clone_inner(); - // Force persistence to the catalog, since we aren't going through the WAL: - self.persister - .persist_catalog(last_wal_file_number, Catalog::from_inner(inner_catalog)) - .await?; Ok(Some(info)) } else { Ok(None) @@ -545,14 +559,19 @@ impl LastCacheManager for WriteBufferImpl { .delete_cache(db_name, tbl_name, cache_name)?; catalog.delete_last_cache(db_name, tbl_name, cache_name); - let last_wal_file_number = self.wal.last_sequence_number().await; // NOTE: if this fails then the cache will be gone from the running server, but will be // resurrected on server restart. - let inner_catalog = catalog.clone_inner(); - // Force persistence to the catalog, since we aren't going through the WAL: - self.persister - .persist_catalog(last_wal_file_number, Catalog::from_inner(inner_catalog)) + self.wal + .write_ops(vec![WalOp::Catalog(CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_name: db_name.into(), + ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete { + table: tbl_name.into(), + name: cache_name.into(), + })], + })]) .await?; + Ok(()) } } @@ -562,13 +581,11 @@ impl WriteBuffer for WriteBufferImpl {} #[cfg(test)] mod tests { use super::*; - use crate::paths::CatalogFilePath; use crate::persister::PersisterImpl; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; use datafusion::assert_batches_sorted_eq; use datafusion_util::config::register_iox_object_store; - use futures_util::StreamExt; use influxdb3_wal::Level0Duration; use iox_query::exec::IOxSessionContext; use iox_time::{MockProvider, Time}; @@ -580,7 +597,7 @@ mod tests { let catalog = Arc::new(Catalog::new()); let db_name = NamespaceName::new("foo").unwrap(); let lp = "cpu,region=west user=23.2 100\nfoo f1=1i"; - WriteValidator::initialize(db_name, Arc::clone(&catalog)) + WriteValidator::initialize(db_name, Arc::clone(&catalog), 0) .unwrap() .v1_parse_lines_and_update_schema(lp, false) .unwrap() @@ -690,7 +707,7 @@ mod tests { } #[tokio::test] - async fn persists_catalog_on_last_cache_create_and_delete() { + async fn last_cache_create_and_delete_is_durable() { let (wbuf, _ctx) = setup( Time::from_timestamp_nanos(0), WalConfig { @@ -708,7 +725,7 @@ mod tests { wbuf.write_lp( NamespaceName::new(db_name).unwrap(), format!("{tbl_name},t1=a f1=true").as_str(), - Time::from_timestamp(30, 0).unwrap(), + Time::from_timestamp(20, 0).unwrap(), false, Precision::Nanosecond, ) @@ -718,19 +735,30 @@ mod tests { wbuf.create_last_cache(db_name, tbl_name, Some(cache_name), None, None, None, None) .await .unwrap(); - // Check that the catalog was persisted, without advancing time: - let object_store = wbuf.persister.object_store(); - let catalog_json = fetch_catalog_as_json( - Arc::clone(&object_store), - wbuf.persister.host_identifier_prefix(), + + // load a new write buffer to ensure its durable + let wbuf = WriteBufferImpl::new( + Arc::clone(&wbuf.persister), + Arc::clone(&wbuf.time_provider), + Arc::clone(&wbuf.buffer.executor), + WalConfig { + level_0_duration: Level0Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }, ) - .await; + .await + .unwrap(); + + let catalog_json = catalog_to_json(&wbuf.catalog); insta::assert_json_snapshot!("catalog-immediately-after-last-cache-create", catalog_json); + // Do another write that will update the state of the catalog, specifically, the table // that the last cache was created for, and add a new field to the table/cache `f2`: wbuf.write_lp( NamespaceName::new(db_name).unwrap(), - format!("{tbl_name},t1=a f1=true,f2=42i").as_str(), + format!("{tbl_name},t1=a f1=false,f2=42i").as_str(), Time::from_timestamp(30, 0).unwrap(), false, Precision::Nanosecond, @@ -738,39 +766,44 @@ mod tests { .await .unwrap(); - // do another write, which will force a snapshot of the WAL and thus the persistence of - // the catalog + // and do another replay and verification + let wbuf = WriteBufferImpl::new( + Arc::clone(&wbuf.persister), + Arc::clone(&wbuf.time_provider), + Arc::clone(&wbuf.buffer.executor), + WalConfig { + level_0_duration: Level0Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }, + ) + .await + .unwrap(); + + let catalog_json = catalog_to_json(&wbuf.catalog); + insta::assert_json_snapshot!( + "catalog-after-last-cache-create-and-new-field", + catalog_json + ); + + // write a new data point to fill the cache wbuf.write_lp( NamespaceName::new(db_name).unwrap(), - format!("{tbl_name},t1=b f1=false").as_str(), + format!("{tbl_name},t1=a f1=true,f2=53i").as_str(), Time::from_timestamp(40, 0).unwrap(), false, Precision::Nanosecond, ) .await .unwrap(); - // Check the catalog again, to make sure it still has the last cache with the correct - // configuration: - let catalog_json = fetch_catalog_as_json( - Arc::clone(&object_store), - wbuf.persister.host_identifier_prefix(), - ) - .await; - // NOTE: the asserted snapshot is correct in-so-far as the catalog contains the last cache - // configuration; however, it is not correct w.r.t. the fields. The second write adds a new - // field `f2` to the last cache (which you can see in the query below), but the persisted - // catalog does not have `f2` in the value columns. This will need to be fixed, see - // https://github.com/influxdata/influxdb/issues/25171 - insta::assert_json_snapshot!( - "catalog-after-allowing-time-to-persist-segments-after-create", - catalog_json - ); + // Fetch record batches from the last cache directly: let expected = [ "+----+------+----------------------+----+", "| t1 | f1 | time | f2 |", "+----+------+----------------------+----+", - "| a | true | 1970-01-01T00:00:30Z | 42 |", + "| a | true | 1970-01-01T00:00:40Z | 53 |", "+----+------+----------------------+----+", ]; let actual = wbuf @@ -783,47 +816,23 @@ mod tests { wbuf.delete_last_cache(db_name, tbl_name, cache_name) .await .unwrap(); - // Catalog should be persisted, and no longer have the last cache, without advancing time: - let catalog_json = fetch_catalog_as_json( - Arc::clone(&object_store), - wbuf.persister.host_identifier_prefix(), - ) - .await; - insta::assert_json_snapshot!("catalog-immediately-after-last-cache-delete", catalog_json); - // Do another write so there is data to be persisted in the buffer: - wbuf.write_lp( - NamespaceName::new(db_name).unwrap(), - format!("{tbl_name},t1=b f1=false,f2=1337i").as_str(), - Time::from_timestamp(830, 0).unwrap(), - false, - Precision::Nanosecond, + + // do another reload and verify it's gone + let wbuf = WriteBufferImpl::new( + Arc::clone(&wbuf.persister), + Arc::clone(&wbuf.time_provider), + Arc::clone(&wbuf.buffer.executor), + WalConfig { + level_0_duration: Level0Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }, ) .await .unwrap(); - // Advance time to allow for persistence of segment data: - wbuf.time_provider - .set(Time::from_timestamp(1600, 0).unwrap()); - let mut count = 0; - loop { - count += 1; - tokio::time::sleep(Duration::from_millis(10)).await; - let files = wbuf.persisted_files.get_files(db_name, tbl_name); - if !files.is_empty() { - break; - } else if count > 9 { - panic!("not persisting"); - } - } - // Check the catalog again, to ensure the last cache is still gone: - let catalog_json = fetch_catalog_as_json( - Arc::clone(&object_store), - wbuf.persister.host_identifier_prefix(), - ) - .await; - insta::assert_json_snapshot!( - "catalog-after-allowing-time-to-persist-segments-after-delete", - catalog_json - ); + let catalog_json = catalog_to_json(&wbuf.catalog); + insta::assert_json_snapshot!("catalog-immediately-after-last-cache-delete", catalog_json); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1016,23 +1025,9 @@ mod tests { assert_batches_sorted_eq!(&expected, &actual); } - async fn fetch_catalog_as_json( - object_store: Arc, - host_identifier_prefix: &str, - ) -> serde_json::Value { - let mut list = object_store.list(Some(&CatalogFilePath::dir(host_identifier_prefix))); - let Some(item) = list.next().await else { - panic!("there should have been a catalog file persisted"); - }; - let item = item.expect("item from object store"); - let obj = object_store.get(&item.location).await.expect("get catalog"); - serde_json::from_slice::( - obj.bytes() - .await - .expect("get bytes from GetResult") - .as_ref(), - ) - .expect("parse bytes as JSON") + fn catalog_to_json(catalog: &Catalog) -> serde_json::Value { + let bytes = serde_json::to_vec_pretty(catalog).unwrap(); + serde_json::from_slice::(&bytes).expect("parse bytes as JSON") } async fn setup( diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 87b960a1e99..bf4dd5e6660 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -16,7 +16,7 @@ use datafusion::logical_expr::Expr; use datafusion_util::stream_from_batches; use hashbrown::HashMap; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; -use influxdb3_wal::{SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch}; +use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::exec::Executor; use iox_query::frontend::reorg::ReorgPlanner; @@ -131,16 +131,7 @@ impl QueryableBuffer { let mut buffer = self.buffer.write(); self.last_cache_provider.evict_expired_cache_entries(); self.last_cache_provider.write_wal_contents_to_cache(&write); - - for op in write.ops { - match op { - WalOp::Write(write_batch) => buffer.add_write_batch(write_batch), - WalOp::Catalog(catalog_batch) => buffer - .catalog - .apply_catalog_batch(&catalog_batch) - .expect("catalog batch should apply"), - } - } + buffer.buffer_ops(write.ops, &self.last_cache_provider); } /// Called when the wal has written a new file and is attempting to snapshot. Kicks off persistence of @@ -186,15 +177,7 @@ impl QueryableBuffer { // we must buffer the ops after the snapshotting as this data should not be persisted // with this set of wal files - for op in write.ops { - match op { - WalOp::Write(write_batch) => buffer.add_write_batch(write_batch), - WalOp::Catalog(catalog_batch) => buffer - .catalog - .apply_catalog_batch(&catalog_batch) - .expect("catalog batch should apply"), - } - } + buffer.buffer_ops(write.ops, &self.last_cache_provider); persisting_chunks }; @@ -328,6 +311,50 @@ impl BufferState { } } + fn buffer_ops(&mut self, ops: Vec, last_cache_provider: &LastCacheProvider) { + for op in ops { + match op { + WalOp::Write(write_batch) => self.add_write_batch(write_batch), + WalOp::Catalog(catalog_batch) => { + self.catalog + .apply_catalog_batch(&catalog_batch) + .expect("catalog batch should apply"); + + let db_schema = self + .catalog + .db_schema(&catalog_batch.database_name) + .expect("database should exist"); + + for op in catalog_batch.ops { + match op { + CatalogOp::CreateLastCache(definition) => { + let table_schema = db_schema + .get_table_schema(&definition.table) + .expect("table should exist"); + last_cache_provider.create_cache_from_definition( + db_schema.name.as_ref(), + table_schema, + &definition, + ); + } + CatalogOp::DeleteLastCache(cache) => { + // we can ignore it if this doesn't exist for any reason + let _ = last_cache_provider.delete_cache( + db_schema.name.as_ref(), + &cache.table, + &cache.name, + ); + } + CatalogOp::AddFields(_) => (), + CatalogOp::CreateTable(_) => (), + CatalogOp::CreateDatabase(_) => (), + } + } + } + } + } + } + fn add_write_batch(&mut self, write_batch: WriteBatch) { let db_schema = self .catalog diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-delete.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-delete.snap deleted file mode 100644 index 5c2ecd0544d..00000000000 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-delete.snap +++ /dev/null @@ -1,49 +0,0 @@ ---- -source: influxdb3_write/src/write_buffer/mod.rs -expression: catalog_json ---- -{ - "databases": { - "db": { - "name": "db", - "tables": { - "table": { - "cols": { - "f1": { - "influx_type": "field", - "nullable": true, - "type": "bool" - }, - "f2": { - "influx_type": "field", - "nullable": true, - "type": "i64" - }, - "t1": { - "influx_type": "tag", - "nullable": true, - "type": { - "dict": [ - "i32", - "str" - ] - } - }, - "time": { - "influx_type": "time", - "nullable": false, - "type": { - "time": [ - "ns", - null - ] - } - } - }, - "name": "table" - } - } - } - }, - "sequence": 7 -} diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-create.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap similarity index 97% rename from influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-create.snap rename to influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap index 072943b8886..1306dfe17ed 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-allowing-time-to-persist-segments-after-create.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap @@ -1,5 +1,6 @@ --- source: influxdb3_write/src/write_buffer/mod.rs +assertion_line: 774 expression: catalog_json --- { @@ -57,5 +58,5 @@ expression: catalog_json } } }, - "sequence": 6 + "sequence": 7 } diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap index c0f5f47c0cf..da4b9204c97 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap @@ -52,5 +52,5 @@ expression: catalog_json } } }, - "sequence": 4 + "sequence": 2 } diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap index 5c2ecd0544d..f44aec3b72c 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap @@ -45,5 +45,5 @@ expression: catalog_json } } }, - "sequence": 7 + "sequence": 8 } diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index e036978cb82..1a504948cfe 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -22,6 +22,7 @@ use super::Error; pub(crate) struct WithCatalog { catalog: Arc, db_schema: Arc, + time_now_ns: i64, } /// Type state for the [`WriteValidator`] after it has parsed v1 or v3 @@ -45,10 +46,15 @@ impl WriteValidator { pub(crate) fn initialize( db_name: NamespaceName<'static>, catalog: Arc, + time_now_ns: i64, ) -> Result> { let db_schema = catalog.db_or_create(db_name.as_str())?; Ok(WriteValidator { - state: WithCatalog { catalog, db_schema }, + state: WithCatalog { + catalog, + db_schema, + time_now_ns, + }, }) } @@ -105,6 +111,7 @@ impl WriteValidator { } else { let catalog_batch = CatalogBatch { database_name: Arc::clone(&self.state.db_schema.name), + time_ns: self.state.time_now_ns, ops: catalog_updates, }; self.state.catalog.apply_catalog_batch(&catalog_batch)?; @@ -178,6 +185,7 @@ impl WriteValidator { None } else { let catalog_batch = CatalogBatch { + time_ns: self.state.time_now_ns, database_name: Arc::clone(&self.state.db_schema.name), ops: catalog_updates, }; @@ -775,7 +783,7 @@ mod tests { fn write_validator_v1() -> Result<(), Error> { let namespace = NamespaceName::new("test").unwrap(); let catalog = Arc::new(Catalog::new()); - let result = WriteValidator::initialize(namespace.clone(), catalog)? + let result = WriteValidator::initialize(namespace.clone(), catalog, 0)? .v1_parse_lines_and_update_schema("cpu,tag1=foo val1=\"bar\" 1234", false)? .convert_lines_to_buffer( Time::from_timestamp_nanos(0),