Skip to content

Commit

Permalink
feat: Add last cache create/delete to WAL
Browse files Browse the repository at this point in the history
This moves the LastCacheDefinition into the WAL so that it can be serialized there. This ended up being a pretty large refactor to get the last cache creation to work through the WAL.

I think I also stumbled on a bug where the last cache wasn't getting initialized from the catalog on reboot so that it wouldn't actually end up caching values. The refactored last cache persistence test in write_buffer/mod.rs surfaced this.

Finally, I also had to update the WAL so that it would persist if there were only catalog updates and no writes.

Fixes #25203
  • Loading branch information
pauldix committed Aug 8, 2024
1 parent 2082135 commit 708855b
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 386 deletions.
147 changes: 20 additions & 127 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Implementation of the Catalog that sits entirely in memory.

use influxdb3_wal::{CatalogBatch, CatalogOp};
use influxdb3_wal::{CatalogBatch, CatalogOp, LastCacheDefinition};
use influxdb_line_protocol::FieldValue;
use observability_deps::tracing::info;
use parking_lot::RwLock;
Expand Down Expand Up @@ -32,9 +32,6 @@ pub enum Error {
Catalog::NUM_DBS_LIMIT
)]
TooManyDbs,

#[error("last cache size must be from 1 to 10")]
InvalidLastCacheSize,
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -337,6 +334,25 @@ impl DatabaseSchema {
CatalogOp::CreateDatabase(_) => {
// Do nothing
}
CatalogOp::CreateLastCache(definition) => {
let table_namme: Arc<str> = definition.table.as_str().into();
let table = tables.get_mut(table_namme.as_ref());
match table {
Some(table) => {
table
.last_caches
.insert(definition.name.clone(), definition.clone());
}
None => panic!("table must exist before last cache creation"),
}
}
CatalogOp::DeleteLastCache(definition) => {
let table_namme: Arc<str> = definition.table.as_str().into();
let table = tables.get_mut(table_namme.as_ref());
if let Some(table) = table {
table.last_caches.remove(&definition.name);
}
}
}
}

Expand Down Expand Up @@ -503,129 +519,6 @@ impl TableDefinition {
}
}

/// Defines a last cache in a given table and database
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct LastCacheDefinition {
/// The table name the cache is associated with
pub table: String,
/// Given name of the cache
pub name: String,
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<String>,
/// Columns that store values in the cache
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
pub count: LastCacheSize,
/// The time-to-live (TTL) in seconds for entries in the cache
pub ttl: u64,
}

impl LastCacheDefinition {
/// Create a new [`LastCacheDefinition`] with explicit value columns
pub fn new_with_explicit_value_columns(
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
value_columns: impl IntoIterator<Item: Into<String>>,
count: usize,
ttl: u64,
) -> Result<Self, Error> {
Ok(Self {
table: table.into(),
name: name.into(),
key_columns: key_columns.into_iter().map(Into::into).collect(),
value_columns: LastCacheValueColumnsDef::Explicit {
columns: value_columns.into_iter().map(Into::into).collect(),
},
count: count.try_into()?,
ttl,
})
}

/// Create a new [`LastCacheDefinition`] with explicit value columns
pub fn new_all_non_key_value_columns(
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
count: usize,
ttl: u64,
) -> Result<Self, Error> {
Ok(Self {
table: table.into(),
name: name.into(),
key_columns: key_columns.into_iter().map(Into::into).collect(),
value_columns: LastCacheValueColumnsDef::AllNonKeyColumns,
count: count.try_into()?,
ttl,
})
}
}

/// A last cache will either store values for an explicit set of columns, or will accept all
/// non-key columns
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<String> },
/// Stores all non-key columns
AllNonKeyColumns,
}

/// The maximum allowed size for a last cache
pub const LAST_CACHE_MAX_SIZE: usize = 10;

/// The size of the last cache
///
/// Must be between 1 and [`LAST_CACHE_MAX_SIZE`]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)]
pub struct LastCacheSize(usize);

impl LastCacheSize {
pub fn new(size: usize) -> Result<Self, Error> {
if size == 0 || size > LAST_CACHE_MAX_SIZE {
Err(Error::InvalidLastCacheSize)
} else {
Ok(Self(size))
}
}
}

impl TryFrom<usize> for LastCacheSize {
type Error = Error;

fn try_from(value: usize) -> Result<Self, Self::Error> {
Self::new(value)
}
}

impl From<LastCacheSize> for usize {
fn from(value: LastCacheSize) -> Self {
value.0
}
}

impl From<LastCacheSize> for u64 {
fn from(value: LastCacheSize) -> Self {
value
.0
.try_into()
.expect("usize fits into a 64 bit unsigned integer")
}
}

impl PartialEq<usize> for LastCacheSize {
fn eq(&self, other: &usize) -> bool {
self.0.eq(other)
}
}

impl PartialEq<LastCacheSize> for usize {
fn eq(&self, other: &LastCacheSize) -> bool {
self.eq(&other.0)
}
}

pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnType {
match fv {
FieldValue::I64(_) => InfluxColumnType::Field(InfluxFieldType::Integer),
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::catalog::TableDefinition;
use arrow::datatypes::DataType as ArrowDataType;
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
use schema::{InfluxColumnType, SchemaBuilder};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

use crate::catalog::{LastCacheDefinition, LastCacheValueColumnsDef, TableDefinition};

impl Serialize for TableDefinition {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down
3 changes: 2 additions & 1 deletion influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use hyper::header::CONTENT_TYPE;
use hyper::http::HeaderValue;
use hyper::HeaderMap;
use hyper::{Body, Method, Request, Response, StatusCode};
use influxdb3_catalog::catalog::{Error as CatalogError, LastCacheDefinition};
use influxdb3_catalog::catalog::Error as CatalogError;
use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION};
use influxdb3_wal::LastCacheDefinition;
use influxdb3_write::last_cache;
use influxdb3_write::persister::TrackedMemoryArrowWriter;
use influxdb3_write::write_buffer::Error as WriteBufferError;
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/system_tables/last_caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::array::{GenericListBuilder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::{error::DataFusionError, logical_expr::Expr};
use influxdb3_catalog::catalog::{LastCacheDefinition, LastCacheValueColumnsDef};
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
use influxdb3_write::last_cache::LastCacheProvider;
use iox_system_tables::IoxSystemTable;

Expand Down
135 changes: 135 additions & 0 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum Error {

#[error("invalid level 0 duration {0}. Must be one of 1m, 5m, 10m")]
InvalidLevel0Duration(String),

#[error("last cache size must be from 1 to 10")]
InvalidLastCacheSize,
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -203,6 +206,7 @@ pub enum WalOp {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct CatalogBatch {
pub database_name: Arc<str>,
pub time_ns: i64,
pub ops: Vec<CatalogOp>,
}

Expand All @@ -211,6 +215,8 @@ pub enum CatalogOp {
CreateDatabase(DatabaseDefinition),
CreateTable(TableDefinition),
AddFields(FieldAdditions),
CreateLastCache(LastCacheDefinition),
DeleteLastCache(LastCacheDelete),
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -281,6 +287,135 @@ impl From<FieldDataType> for InfluxColumnType {
}
}

/// Defines a last cache in a given table and database
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct LastCacheDefinition {
/// The table name the cache is associated with
pub table: String,
/// Given name of the cache
pub name: String,
/// Columns intended to be used as predicates in the cache
pub key_columns: Vec<String>,
/// Columns that store values in the cache
pub value_columns: LastCacheValueColumnsDef,
/// The number of last values to hold in the cache
pub count: LastCacheSize,
/// The time-to-live (TTL) in seconds for entries in the cache
pub ttl: u64,
}

impl LastCacheDefinition {
/// Create a new [`LastCacheDefinition`] with explicit value columns
pub fn new_with_explicit_value_columns(
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
value_columns: impl IntoIterator<Item: Into<String>>,
count: usize,
ttl: u64,
) -> Result<Self, Error> {
Ok(Self {
table: table.into(),
name: name.into(),
key_columns: key_columns.into_iter().map(Into::into).collect(),
value_columns: LastCacheValueColumnsDef::Explicit {
columns: value_columns.into_iter().map(Into::into).collect(),
},
count: count.try_into()?,
ttl,
})
}

/// Create a new [`LastCacheDefinition`] with explicit value columns
pub fn new_all_non_key_value_columns(
table: impl Into<String>,
name: impl Into<String>,
key_columns: impl IntoIterator<Item: Into<String>>,
count: usize,
ttl: u64,
) -> Result<Self, Error> {
Ok(Self {
table: table.into(),
name: name.into(),
key_columns: key_columns.into_iter().map(Into::into).collect(),
value_columns: LastCacheValueColumnsDef::AllNonKeyColumns,
count: count.try_into()?,
ttl,
})
}
}

/// A last cache will either store values for an explicit set of columns, or will accept all
/// non-key columns
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum LastCacheValueColumnsDef {
/// Explicit list of column names
Explicit { columns: Vec<String> },
/// Stores all non-key columns
AllNonKeyColumns,
}

/// The maximum allowed size for a last cache
pub const LAST_CACHE_MAX_SIZE: usize = 10;

/// The size of the last cache
///
/// Must be between 1 and [`LAST_CACHE_MAX_SIZE`]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Copy)]
pub struct LastCacheSize(usize);

impl LastCacheSize {
pub fn new(size: usize) -> Result<Self, Error> {
if size == 0 || size > LAST_CACHE_MAX_SIZE {
Err(Error::InvalidLastCacheSize)
} else {
Ok(Self(size))
}
}
}

impl TryFrom<usize> for LastCacheSize {
type Error = Error;

fn try_from(value: usize) -> Result<Self, Self::Error> {
Self::new(value)
}
}

impl From<LastCacheSize> for usize {
fn from(value: LastCacheSize) -> Self {
value.0
}
}

impl From<LastCacheSize> for u64 {
fn from(value: LastCacheSize) -> Self {
value
.0
.try_into()
.expect("usize fits into a 64 bit unsigned integer")
}
}

impl PartialEq<usize> for LastCacheSize {
fn eq(&self, other: &usize) -> bool {
self.0.eq(other)
}
}

impl PartialEq<LastCacheSize> for usize {
fn eq(&self, other: &LastCacheSize) -> bool {
self.eq(&other.0)
}
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct LastCacheDelete {
pub table: String,
pub name: String,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct WriteBatch {
pub database_name: Arc<str>,
Expand Down
7 changes: 6 additions & 1 deletion influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ struct WalBuffer {

impl WalBuffer {
fn is_empty(&self) -> bool {
self.database_to_write_batch.is_empty()
self.database_to_write_batch.is_empty() && self.catalog_batches.is_empty()
}
}

Expand Down Expand Up @@ -537,6 +537,11 @@ impl WalBuffer {
max_timestamp_ns = max_timestamp_ns.max(write_batch.max_time_ns);
}

for catalog_batch in &self.catalog_batches {
min_timestamp_ns = min_timestamp_ns.min(catalog_batch.time_ns);
max_timestamp_ns = max_timestamp_ns.max(catalog_batch.time_ns);
}

// have the catalog ops come before any writes in ordering
let mut ops =
Vec::with_capacity(self.database_to_write_batch.len() + self.catalog_batches.len());
Expand Down
Loading

0 comments on commit 708855b

Please sign in to comment.