Skip to content

Commit

Permalink
Nest clickhouse ts format under encoding
Browse files Browse the repository at this point in the history
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
  • Loading branch information
Jeffail committed Feb 24, 2020
1 parent 558d0e4 commit 4f6d794
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 50 deletions.
16 changes: 12 additions & 4 deletions .meta/sinks/clickhouse.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,19 @@ common = true
examples = ["mydatabase"]
description = "The database that contains the stable that data will be inserted into."

[sinks.clickhouse.options.timestamp_format]
[sinks.clickhouse.options.encoding]
type = "table"
common = true
required = false
description = "Customize how events are encoded."

[sinks.clickhouse.options.encoding.children.timestamp_format]
type = "string"
common = true
required = false
description = "Optionally convert timestamps into a format that can be parsed as a Clickhouse DateTime. This loses precision as DateTimes are defined in seconds."
default = "rfc3339"
description = "How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds."

[sinks.clickhouse.options.timestamp_format.enum]
unix = "Format as a unix timestamp"
[sinks.clickhouse.options.encoding.children.timestamp_format.enum]
rfc3339 = "Format as an RFC3339 string"
unix = "Format as a unix timestamp, can be parsed as a Clickhouse DateTime"
25 changes: 16 additions & 9 deletions config/vector.spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2841,15 +2841,6 @@ end
healthcheck = true
healthcheck = false

# Optionally convert timestamps into a format that can be parsed as a
# Clickhouse DateTime. This loses precision as DateTimes are defined in seconds.
#
# * optional
# * no default
# * type: string
# * must be: "unix" (if supplied)
timestamp_format = "unix"

#
# requests
#
Expand Down Expand Up @@ -2953,6 +2944,22 @@ end
when_full = "block"
when_full = "drop_newest"

#
# Encoding
#

[sinks.clickhouse.encoding]
# How to format event timestamps. Formats such as unix can be parsed as a
# Clickhouse DateTime, however, this loses precision as DateTimes are defined
# in seconds.
#
# * optional
# * default: "rfc3339"
# * type: string
# * enum: "rfc3339" or "unix"
timestamp_format = "rfc3339"
timestamp_format = "unix"

#
# Request
#
Expand Down
38 changes: 26 additions & 12 deletions src/sinks/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
dns::Resolver,
event::{Event, Value, TIMESTAMP},
event::{Event, Value},
sinks::util::{
http::{https_client, Auth, HttpRetryLogic, HttpService, Response},
retries::{RetryAction, RetryLogic},
Expand All @@ -21,6 +21,13 @@ use snafu::ResultExt;
#[serde(rename_all = "lowercase")]
pub enum TimestampFormat {
Unix,
RFC3339,
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct EncodingConfig {
pub timestamp_format: Option<TimestampFormat>,
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
Expand All @@ -30,7 +37,7 @@ pub struct ClickhouseConfig {
pub table: String,
pub database: Option<String>,
pub compression: Option<Compression>,
pub timestamp_format: Option<TimestampFormat>,
pub encoding: EncodingConfig,
#[serde(default)]
pub batch: BatchBytesConfig,
pub auth: Option<Auth>,
Expand Down Expand Up @@ -122,16 +129,20 @@ fn clickhouse(config: ClickhouseConfig, cx: SinkContext) -> crate::Result<super:
}

fn encode_event(config: &ClickhouseConfig, mut event: Event) -> Option<Vec<u8>> {
match config.timestamp_format {
match config.encoding.timestamp_format {
Some(TimestampFormat::Unix) => {
if let Some(unix) = match event.as_log().get(&TIMESTAMP) {
Some(Value::Timestamp(ts)) => Some(ts.timestamp()),
_ => None,
} {
event.as_mut_log().insert(TIMESTAMP.clone(), unix);
let mut unix_timestamps = Vec::new();
for (k, v) in event.as_log().all_fields() {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
}
for (k, v) in unix_timestamps.pop() {
event.as_mut_log().insert(k, v);
}
}
None => {}
// RFC3339 is the default serialization of a timestamp.
Some(TimestampFormat::RFC3339) | None => {}
}
let mut body =
serde_json::to_vec(&event.as_log().all_fields()).expect("Events should be valid json!");
Expand Down Expand Up @@ -244,6 +255,7 @@ mod tests {
mod integration_tests {
use super::*;
use crate::{
event,
event::Event,
test_util::{random_string, runtime},
topology::config::{SinkConfig, SinkContext},
Expand Down Expand Up @@ -306,7 +318,9 @@ mod integration_tests {
host: host.clone(),
table: table.clone(),
compression: Some(Compression::None),
timestamp_format: Some(TimestampFormat::Unix),
encoding: EncodingConfig {
timestamp_format: Some(TimestampFormat::Unix),
},
batch: BatchBytesConfig {
max_size: Some(1),
timeout_secs: None,
Expand Down Expand Up @@ -337,11 +351,11 @@ mod integration_tests {

let exp_event = input_event.as_mut_log();
exp_event.insert(
TIMESTAMP.clone(),
event::log_schema().timestamp_key().clone(),
format!(
"{}",
exp_event
.get(&TIMESTAMP)
.get(&event::log_schema().timestamp_key())
.unwrap()
.as_timestamp()
.unwrap()
Expand Down
83 changes: 58 additions & 25 deletions website/docs/reference/sinks/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ import CodeHeader from '@site/src/components/CodeHeader';
# OPTIONAL - General
database = "mydatabase" # example, no default
healthcheck = true # default
timestamp_format = "unix" # no default, must be: "unix" (if supplied)

# OPTIONAL - requests
compression = "none" # default, enum

# OPTIONAL - Encoding
[sinks.my_sink_id.encoding]
timestamp_format = "rfc3339" # default, enum
```

</TabItem>
Expand All @@ -74,7 +77,6 @@ import CodeHeader from '@site/src/components/CodeHeader';
# OPTIONAL - General
database = "mydatabase" # example, no default
healthcheck = true # default
timestamp_format = "unix" # no default, must be: "unix" (if supplied)

# OPTIONAL - requests
compression = "none" # default, enum
Expand All @@ -100,6 +102,10 @@ import CodeHeader from '@site/src/components/CodeHeader';
# REQUIRED
max_size = 104900000 # example, bytes, relevant when type = "disk"

# OPTIONAL - Encoding
[sinks.my_sink_id.encoding]
timestamp_format = "rfc3339" # default, enum

# OPTIONAL - Request
[sinks.my_sink_id.request]
in_flight_limit = 5 # default, requests
Expand Down Expand Up @@ -477,6 +483,56 @@ The database that contains the stable that data will be inserted into.
</Field>


<Field
common={true}
defaultValue={null}
enumValues={null}
examples={[]}
groups={[]}
name={"encoding"}
path={null}
relevantWhen={null}
required={false}
templateable={false}
type={"table"}
unit={null}
>
### encoding

Specify how events are encoded.

<Fields filters={false}>


<Field
common={true}
defaultValue={"rfc3339"}
enumValues={{"rfc3339":"Format as an RFC3339 string","unix":"Format as a unix timestamp, can be parsed as a Clickhouse DateTime"}}
examples={["rfc3339","unix"]}
groups={[]}
name={"timestamp_format"}
path={"encoding"}
relevantWhen={null}
required={false}
templateable={false}
type={"string"}
unit={null}
>
#### timestamp_format

How to format event timestamps. Formats such as unix can be parsed as a Clickhouse DateTime, however, this loses precision as DateTimes are defined in seconds.


</Field>


</Fields>

</Field>


<Field
common={true}
defaultValue={true}
Expand Down Expand Up @@ -734,29 +790,6 @@ The table that data will be inserted into.
</Field>


<Field
common={true}
defaultValue={null}
enumValues={{"unix":"Format as a unix timestamp"}}
examples={["unix"]}
groups={[]}
name={"timestamp_format"}
path={null}
relevantWhen={null}
required={false}
templateable={false}
type={"string"}
unit={null}
>
### timestamp_format

Optionally convert timestamps into a format that can be parsed as a Clickhouse DateTime. This loses precision as DateTimes are defined in seconds.


</Field>


<Field
common={false}
defaultValue={null}
Expand Down

0 comments on commit 4f6d794

Please sign in to comment.