diff --git a/.meta/sinks/clickhouse.toml b/.meta/sinks/clickhouse.toml index 8ec3c2b0242b99..77107bec911f85 100644 --- a/.meta/sinks/clickhouse.toml +++ b/.meta/sinks/clickhouse.toml @@ -94,11 +94,19 @@ common = true examples = ["mydatabase"] description = "The database that contains the stable that data will be inserted into." -[sinks.clickhouse.options.timestamp_format] +[sinks.clickhouse.options.encoding] +type = "table" +common = true +required = false +description = "Customize how events are encoded." + +[sinks.clickhouse.options.encoding.children.timestamp_format] type = "string" common = true required = false -description = "Optionally convert timestamps into a format that can be parsed as a Clickhouse DateTime. This loses precision as DateTimes are defined in seconds." +default = "rfc3339" +description = "How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds." -[sinks.clickhouse.options.timestamp_format.enum] -unix = "Format as a unix timestamp" +[sinks.clickhouse.options.encoding.children.timestamp_format.enum] +rfc3339 = "Format as an RFC3339 string" +unix = "Format as a unix timestamp, can be parsed as a Clickhouse DateTime" diff --git a/config/vector.spec.toml b/config/vector.spec.toml index 73799c87ab8339..37f8feefbf9cc6 100644 --- a/config/vector.spec.toml +++ b/config/vector.spec.toml @@ -2841,15 +2841,6 @@ end healthcheck = true healthcheck = false - # Optionally convert timestamps into a format that can be parsed as a - # Clickhouse DateTime. This loses precision as DateTimes are defined in seconds. - # - # * optional - # * no default - # * type: string - # * must be: "unix" (if supplied) - timestamp_format = "unix" - # # requests # @@ -2953,6 +2944,22 @@ end when_full = "block" when_full = "drop_newest" + # + # Encoding + # + + [sinks.clickhouse.encoding] + # How to format event timestamps. Formats such as unix can be parsed as a + # Clickhouse DateTime, however, this loses precision as DateTimes are defined + # in seconds. + # + # * optional + # * default: "rfc3339" + # * type: string + # * enum: "rfc3339" or "unix" + timestamp_format = "rfc3339" + timestamp_format = "unix" + # # Request # diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 3b062cb5130aa9..84c9aa8e45f9e2 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -1,6 +1,6 @@ use crate::{ dns::Resolver, - event::{Event, Value, TIMESTAMP}, + event::{Event, Value}, sinks::util::{ http::{https_client, Auth, HttpRetryLogic, HttpService, Response}, retries::{RetryAction, RetryLogic}, @@ -21,6 +21,13 @@ use snafu::ResultExt; #[serde(rename_all = "lowercase")] pub enum TimestampFormat { Unix, + RFC3339, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct EncodingConfig { + pub timestamp_format: Option, } #[derive(Deserialize, Serialize, Debug, Clone, Default)] @@ -30,7 +37,7 @@ pub struct ClickhouseConfig { pub table: String, pub database: Option, pub compression: Option, - pub timestamp_format: Option, + pub encoding: EncodingConfig, #[serde(default)] pub batch: BatchBytesConfig, pub auth: Option, @@ -122,16 +129,20 @@ fn clickhouse(config: ClickhouseConfig, cx: SinkContext) -> crate::Result Option> { - match config.timestamp_format { + match config.encoding.timestamp_format { Some(TimestampFormat::Unix) => { - if let Some(unix) = match event.as_log().get(&TIMESTAMP) { - Some(Value::Timestamp(ts)) => Some(ts.timestamp()), - _ => None, - } { - event.as_mut_log().insert(TIMESTAMP.clone(), unix); + let mut unix_timestamps = Vec::new(); + for (k, v) in event.as_log().all_fields() { + if let Value::Timestamp(ts) = v { + unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp()))); + } + } + for (k, v) in unix_timestamps.pop() { + event.as_mut_log().insert(k, v); } } - None => {} + // RFC3339 is the default serialization of a timestamp. + Some(TimestampFormat::RFC3339) | None => {} } let mut body = serde_json::to_vec(&event.as_log().all_fields()).expect("Events should be valid json!"); @@ -244,6 +255,7 @@ mod tests { mod integration_tests { use super::*; use crate::{ + event, event::Event, test_util::{random_string, runtime}, topology::config::{SinkConfig, SinkContext}, @@ -306,7 +318,9 @@ mod integration_tests { host: host.clone(), table: table.clone(), compression: Some(Compression::None), - timestamp_format: Some(TimestampFormat::Unix), + encoding: EncodingConfig { + timestamp_format: Some(TimestampFormat::Unix), + }, batch: BatchBytesConfig { max_size: Some(1), timeout_secs: None, @@ -337,11 +351,11 @@ mod integration_tests { let exp_event = input_event.as_mut_log(); exp_event.insert( - TIMESTAMP.clone(), + event::log_schema().timestamp_key().clone(), format!( "{}", exp_event - .get(&TIMESTAMP) + .get(&event::log_schema().timestamp_key()) .unwrap() .as_timestamp() .unwrap() diff --git a/website/docs/reference/sinks/clickhouse.md b/website/docs/reference/sinks/clickhouse.md index add8974299a51d..74cf1e3d669eee 100644 --- a/website/docs/reference/sinks/clickhouse.md +++ b/website/docs/reference/sinks/clickhouse.md @@ -52,10 +52,13 @@ import CodeHeader from '@site/src/components/CodeHeader'; # OPTIONAL - General database = "mydatabase" # example, no default healthcheck = true # default - timestamp_format = "unix" # no default, must be: "unix" (if supplied) # OPTIONAL - requests compression = "none" # default, enum + + # OPTIONAL - Encoding + [sinks.my_sink_id.encoding] + timestamp_format = "rfc3339" # default, enum ``` @@ -74,7 +77,6 @@ import CodeHeader from '@site/src/components/CodeHeader'; # OPTIONAL - General database = "mydatabase" # example, no default healthcheck = true # default - timestamp_format = "unix" # no default, must be: "unix" (if supplied) # OPTIONAL - requests compression = "none" # default, enum @@ -100,6 +102,10 @@ import CodeHeader from '@site/src/components/CodeHeader'; # REQUIRED max_size = 104900000 # example, bytes, relevant when type = "disk" + # OPTIONAL - Encoding + [sinks.my_sink_id.encoding] + timestamp_format = "rfc3339" # default, enum + # OPTIONAL - Request [sinks.my_sink_id.request] in_flight_limit = 5 # default, requests @@ -477,6 +483,56 @@ The database that contains the stable that data will be inserted into. + + +### encoding + +Specify how events are encoded. + + + + + + +#### timestamp_format + +How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds. + + + + + + + + + + - - -### timestamp_format - -Optionally convert timestamps into a format that can be parsed as a Clickhouse DateTime. This loses precision as DateTimes are defined in seconds. - - - - -