From 992b4930b9d3052714cf90b66bfa6ead1d7855b2 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 24 Feb 2020 17:13:10 +0000 Subject: [PATCH] feat(clickhouse sink): Add timestamp_format field (#1634) * Add timestamp_format field to clickhouse sink Signed-off-by: Ashley Jeffs * Nest clickhouse ts format under encoding Signed-off-by: Ashley Jeffs --- .meta/sinks/clickhouse.toml | 17 ++++ config/vector.spec.toml | 16 ++++ src/sinks/clickhouse.rs | 98 +++++++++++++++++++++- website/docs/reference/sinks/clickhouse.md | 58 +++++++++++++ 4 files changed, 185 insertions(+), 4 deletions(-) diff --git a/.meta/sinks/clickhouse.toml b/.meta/sinks/clickhouse.toml index a8488e2365d3e..1bf7ed1bb4c03 100644 --- a/.meta/sinks/clickhouse.toml +++ b/.meta/sinks/clickhouse.toml @@ -93,3 +93,20 @@ type = "string" common = true examples = ["mydatabase"] description = "The database that contains the stable that data will be inserted into." + +[sinks.clickhouse.options.encoding] +type = "table" +common = true +required = false +description = "Customize how events are encoded." + +[sinks.clickhouse.options.encoding.children.timestamp_format] +type = "string" +common = true +required = false +default = "rfc3339" +description = "How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds." + +[sinks.clickhouse.options.encoding.children.timestamp_format.enum] +rfc3339 = "Format as an RFC3339 string" +unix = "Format as a unix timestamp, can be parsed as a Clickhouse DateTime" diff --git a/config/vector.spec.toml b/config/vector.spec.toml index 2931f9b8bbdf0..afd6e5036beec 100644 --- a/config/vector.spec.toml +++ b/config/vector.spec.toml @@ -2951,6 +2951,22 @@ end when_full = "block" when_full = "drop_newest" + # + # Encoding + # + + [sinks.clickhouse.encoding] + # How to format event timestamps. Formats such as unix can be parsed as a + # Clickhouse DateTime, however, this loses precision as DateTimes are defined + # in seconds. + # + # * optional + # * default: "rfc3339" + # * type: string + # * enum: "rfc3339" or "unix" + timestamp_format = "rfc3339" + timestamp_format = "unix" + # # Request # diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index ed659427efc80..84c9aa8e45f9e 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -1,6 +1,6 @@ use crate::{ dns::Resolver, - event::Event, + event::{Event, Value}, sinks::util::{ http::{https_client, Auth, HttpRetryLogic, HttpService, Response}, retries::{RetryAction, RetryLogic}, @@ -17,6 +17,19 @@ use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use snafu::ResultExt; +#[derive(Serialize, Deserialize, Debug, Copy, Clone)] +#[serde(rename_all = "lowercase")] +pub enum TimestampFormat { + Unix, + RFC3339, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct EncodingConfig { + pub timestamp_format: Option, +} + #[derive(Deserialize, Serialize, Debug, Clone, Default)] #[serde(deny_unknown_fields)] pub struct ClickhouseConfig { @@ -24,6 +37,7 @@ pub struct ClickhouseConfig { pub table: String, pub database: Option, pub compression: Option, + pub encoding: EncodingConfig, #[serde(default)] pub batch: BatchBytesConfig, pub auth: Option, @@ -109,12 +123,27 @@ fn clickhouse(config: ClickhouseConfig, cx: SinkContext) -> crate::Result Option> { +fn encode_event(config: &ClickhouseConfig, mut event: Event) -> Option> { + match config.encoding.timestamp_format { + Some(TimestampFormat::Unix) => { + let mut unix_timestamps = Vec::new(); + for (k, v) in event.as_log().all_fields() { + if let Value::Timestamp(ts) = v { + unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp()))); + } + } + for (k, v) in unix_timestamps.pop() { + event.as_mut_log().insert(k, v); + } + } + // RFC3339 is the default serialization of a timestamp. + Some(TimestampFormat::RFC3339) | None => {} + } let mut body = serde_json::to_vec(&event.as_log().all_fields()).expect("Events should be valid json!"); body.push(b'\n'); @@ -226,6 +255,7 @@ mod tests { mod integration_tests { use super::*; use crate::{ + event, event::Event, test_util::{random_string, runtime}, topology::config::{SinkConfig, SinkContext}, @@ -276,6 +306,67 @@ mod integration_tests { assert_eq!(expected, output.data[0]); } + #[test] + fn insert_events_unix_timestamps() { + crate::test_util::trace_init(); + let mut rt = runtime(); + + let table = gen_table(); + let host = String::from("http://localhost:8123"); + + let config = ClickhouseConfig { + host: host.clone(), + table: table.clone(), + compression: Some(Compression::None), + encoding: EncodingConfig { + timestamp_format: Some(TimestampFormat::Unix), + }, + batch: BatchBytesConfig { + max_size: Some(1), + timeout_secs: None, + }, + request: TowerRequestConfig { + retry_attempts: Some(1), + ..Default::default() + }, + ..Default::default() + }; + + let client = ClickhouseClient::new(host); + client.create_table( + &table, + "host String, timestamp DateTime('Europe/London'), message String", + ); + + let (sink, _hc) = config.build(SinkContext::new_test(rt.executor())).unwrap(); + + let mut input_event = Event::from("raw log line"); + input_event.as_mut_log().insert("host", "example.com"); + + let pump = sink.send(input_event.clone()); + rt.block_on(pump).unwrap(); + + let output = client.select_all(&table); + assert_eq!(1, output.rows); + + let exp_event = input_event.as_mut_log(); + exp_event.insert( + event::log_schema().timestamp_key().clone(), + format!( + "{}", + exp_event + .get(&event::log_schema().timestamp_key()) + .unwrap() + .as_timestamp() + .unwrap() + .format("%Y-%m-%d %H:%M:%S") + ), + ); + + let expected = serde_json::to_value(exp_event.all_fields()).unwrap(); + assert_eq!(expected, output.data[0]); + } + #[test] fn no_retry_on_incorrect_data() { crate::test_util::trace_init(); @@ -334,7 +425,6 @@ mod integration_tests { "CREATE TABLE {} ({}) ENGINE = MergeTree() - PARTITION BY substring(timestamp, 1, 7) ORDER BY (host, timestamp);", table, schema )) diff --git a/website/docs/reference/sinks/clickhouse.md b/website/docs/reference/sinks/clickhouse.md index e9dc61f05ebdc..6f3f17c082302 100644 --- a/website/docs/reference/sinks/clickhouse.md +++ b/website/docs/reference/sinks/clickhouse.md @@ -55,6 +55,10 @@ import CodeHeader from '@site/src/components/CodeHeader'; # OPTIONAL - requests compression = "none" # default, enum + + # OPTIONAL - Encoding + [sinks.my_sink_id.encoding] + timestamp_format = "rfc3339" # default, enum ``` @@ -98,6 +102,10 @@ import CodeHeader from '@site/src/components/CodeHeader'; # REQUIRED max_size = 104900000 # example, bytes, relevant when type = "disk" + # OPTIONAL - Encoding + [sinks.my_sink_id.encoding] + timestamp_format = "rfc3339" # default, enum + # OPTIONAL - Request [sinks.my_sink_id.request] in_flight_limit = 5 # default, requests @@ -475,6 +483,56 @@ The database that contains the stable that data will be inserted into. + + +### encoding + +Customize how events are encoded. + + + + + + +#### timestamp_format + +How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds. + + + + + + + + + +