Skip to content

Commit

Permalink
feat(clickhouse sink): Add timestamp_format field (#1634)
Browse files Browse the repository at this point in the history
* Add timestamp_format field to clickhouse sink

Signed-off-by: Ashley Jeffs <ash@jeffail.uk>

* Nest clickhouse ts format under encoding

Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
  • Loading branch information
Jeffail authored Feb 24, 2020
1 parent 9e963e5 commit 992b493
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 4 deletions.
17 changes: 17 additions & 0 deletions .meta/sinks/clickhouse.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,20 @@ type = "string"
common = true
examples = ["mydatabase"]
description = "The database that contains the stable that data will be inserted into."

[sinks.clickhouse.options.encoding]
type = "table"
common = true
required = false
description = "Customize how events are encoded."

[sinks.clickhouse.options.encoding.children.timestamp_format]
type = "string"
common = true
required = false
default = "rfc3339"
description = "How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds."

[sinks.clickhouse.options.encoding.children.timestamp_format.enum]
rfc3339 = "Format as an RFC3339 string"
unix = "Format as a unix timestamp, can be parsed as a Clickhouse DateTime"
16 changes: 16 additions & 0 deletions config/vector.spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2951,6 +2951,22 @@ end
when_full = "block"
when_full = "drop_newest"

#
# Encoding
#

[sinks.clickhouse.encoding]
# How to format event timestamps. Formats such as unix can be parsed as a
# Clickhouse DateTime, however, this loses precision as DateTimes are defined
# in seconds.
#
# * optional
# * default: "rfc3339"
# * type: string
# * enum: "rfc3339" or "unix"
timestamp_format = "rfc3339"
timestamp_format = "unix"

#
# Request
#
Expand Down
98 changes: 94 additions & 4 deletions src/sinks/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
dns::Resolver,
event::Event,
event::{Event, Value},
sinks::util::{
http::{https_client, Auth, HttpRetryLogic, HttpService, Response},
retries::{RetryAction, RetryLogic},
Expand All @@ -17,13 +17,27 @@ use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
#[serde(rename_all = "lowercase")]
pub enum TimestampFormat {
Unix,
RFC3339,
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct EncodingConfig {
pub timestamp_format: Option<TimestampFormat>,
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct ClickhouseConfig {
pub host: String,
pub table: String,
pub database: Option<String>,
pub compression: Option<Compression>,
pub encoding: EncodingConfig,
#[serde(default)]
pub batch: BatchBytesConfig,
pub auth: Option<Auth>,
Expand Down Expand Up @@ -109,12 +123,27 @@ fn clickhouse(config: ClickhouseConfig, cx: SinkContext) -> crate::Result<super:
cx.acker(),
)
.batched_with_min(Buffer::new(gzip), &batch)
.with_flat_map(move |event: Event| iter_ok(encode_event(event)));
.with_flat_map(move |event: Event| iter_ok(encode_event(&config, event)));

Ok(Box::new(sink))
}

fn encode_event(event: Event) -> Option<Vec<u8>> {
fn encode_event(config: &ClickhouseConfig, mut event: Event) -> Option<Vec<u8>> {
match config.encoding.timestamp_format {
Some(TimestampFormat::Unix) => {
let mut unix_timestamps = Vec::new();
for (k, v) in event.as_log().all_fields() {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
}
for (k, v) in unix_timestamps.pop() {
event.as_mut_log().insert(k, v);
}
}
// RFC3339 is the default serialization of a timestamp.
Some(TimestampFormat::RFC3339) | None => {}
}
let mut body =
serde_json::to_vec(&event.as_log().all_fields()).expect("Events should be valid json!");
body.push(b'\n');
Expand Down Expand Up @@ -226,6 +255,7 @@ mod tests {
mod integration_tests {
use super::*;
use crate::{
event,
event::Event,
test_util::{random_string, runtime},
topology::config::{SinkConfig, SinkContext},
Expand Down Expand Up @@ -276,6 +306,67 @@ mod integration_tests {
assert_eq!(expected, output.data[0]);
}

#[test]
fn insert_events_unix_timestamps() {
crate::test_util::trace_init();
let mut rt = runtime();

let table = gen_table();
let host = String::from("http://localhost:8123");

let config = ClickhouseConfig {
host: host.clone(),
table: table.clone(),
compression: Some(Compression::None),
encoding: EncodingConfig {
timestamp_format: Some(TimestampFormat::Unix),
},
batch: BatchBytesConfig {
max_size: Some(1),
timeout_secs: None,
},
request: TowerRequestConfig {
retry_attempts: Some(1),
..Default::default()
},
..Default::default()
};

let client = ClickhouseClient::new(host);
client.create_table(
&table,
"host String, timestamp DateTime('Europe/London'), message String",
);

let (sink, _hc) = config.build(SinkContext::new_test(rt.executor())).unwrap();

let mut input_event = Event::from("raw log line");
input_event.as_mut_log().insert("host", "example.com");

let pump = sink.send(input_event.clone());
rt.block_on(pump).unwrap();

let output = client.select_all(&table);
assert_eq!(1, output.rows);

let exp_event = input_event.as_mut_log();
exp_event.insert(
event::log_schema().timestamp_key().clone(),
format!(
"{}",
exp_event
.get(&event::log_schema().timestamp_key())
.unwrap()
.as_timestamp()
.unwrap()
.format("%Y-%m-%d %H:%M:%S")
),
);

let expected = serde_json::to_value(exp_event.all_fields()).unwrap();
assert_eq!(expected, output.data[0]);
}

#[test]
fn no_retry_on_incorrect_data() {
crate::test_util::trace_init();
Expand Down Expand Up @@ -334,7 +425,6 @@ mod integration_tests {
"CREATE TABLE {}
({})
ENGINE = MergeTree()
PARTITION BY substring(timestamp, 1, 7)
ORDER BY (host, timestamp);",
table, schema
))
Expand Down
58 changes: 58 additions & 0 deletions website/docs/reference/sinks/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ import CodeHeader from '@site/src/components/CodeHeader';

# OPTIONAL - requests
compression = "none" # default, enum

# OPTIONAL - Encoding
[sinks.my_sink_id.encoding]
timestamp_format = "rfc3339" # default, enum
```

</TabItem>
Expand Down Expand Up @@ -98,6 +102,10 @@ import CodeHeader from '@site/src/components/CodeHeader';
# REQUIRED
max_size = 104900000 # example, bytes, relevant when type = "disk"

# OPTIONAL - Encoding
[sinks.my_sink_id.encoding]
timestamp_format = "rfc3339" # default, enum

# OPTIONAL - Request
[sinks.my_sink_id.request]
in_flight_limit = 5 # default, requests
Expand Down Expand Up @@ -475,6 +483,56 @@ The database that contains the stable that data will be inserted into.
</Field>


<Field
common={true}
defaultValue={null}
enumValues={null}
examples={[]}
groups={[]}
name={"encoding"}
path={null}
relevantWhen={null}
required={false}
templateable={false}
type={"table"}
unit={null}
>
### encoding

Customize how events are encoded.

<Fields filters={false}>


<Field
common={true}
defaultValue={"rfc3339"}
enumValues={{"rfc3339":"Format as an RFC3339 string","unix":"Format as a unix timestamp, can be parsed as a Clickhouse DateTime"}}
examples={["rfc3339","unix"]}
groups={[]}
name={"timestamp_format"}
path={"encoding"}
relevantWhen={null}
required={false}
templateable={false}
type={"string"}
unit={null}
>
#### timestamp_format

How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds.


</Field>


</Fields>

</Field>


<Field
common={true}
defaultValue={true}
Expand Down

1 comment on commit 992b493

@binarylogic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jeffail it appears this change required the encoding table for the clickhouse sink even though the option specifies a default?

Please sign in to comment.