Skip to content

Commit

Permalink
[exporter/clickhouse] Update table schema (open-telemetry#12664)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanjm authored Aug 6, 2022
1 parent e79cf2b commit 40f6af2
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 58 deletions.
5 changes: 3 additions & 2 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.2.0 // indirect
github.com/DataDog/agent-payload/v5 v5.0.26 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.38.0-rc.3.0.20220804102556-2fec6abdb5f7 // indirect
github.com/DataDog/datadog-agent/pkg/otlp/model v0.38.0-rc.3.0.20220804102556-2fec6abdb5f7 // indirect
Expand Down Expand Up @@ -105,7 +105,6 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/checkpoint-restore/go-criu/v5 v5.3.0 // indirect
github.com/cilium/ebpf v0.8.1 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/cloudfoundry-incubator/uaago v0.0.0-20190307164349-8136b7bbe76e // indirect
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc // indirect
Expand Down Expand Up @@ -460,6 +459,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/paulmach/orb v0.7.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
Expand All @@ -483,6 +483,7 @@ require (
github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.3.1 // indirect
github.com/shirou/gopsutil/v3 v3.22.7 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.3 // indirect
github.com/signalfx/gohistogram v0.0.0-20160107210732-1ccfd2ff5083 // indirect
github.com/signalfx/golib/v3 v3.3.37 // indirect
Expand Down
20 changes: 17 additions & 3 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 35 additions & 42 deletions exporter/clickhouseexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go" // For register database driver.
_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -73,27 +73,30 @@ func (e *clickhouseExporter) pushLogsData(ctx context.Context, ld plog.Logs) err
defer func() {
_ = statement.Close()
}()
var serviceName string
for i := 0; i < ld.ResourceLogs().Len(); i++ {
logs := ld.ResourceLogs().At(i)
res := logs.Resource()
resourceKeys, resourceValues := attributesToSlice(res.Attributes())
resAttr := attributesToMap(res.Attributes())
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.StringVal()
}
for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
attrKeys, attrValues := attributesToSlice(r.Attributes())
logAttr := attributesToMap(r.Attributes())
_, err = statement.ExecContext(ctx,
r.Timestamp().AsTime(),
r.TraceID().HexString(),
r.SpanID().HexString(),
r.Flags(),
r.SeverityText(),
r.SeverityNumber(),
int32(r.SeverityNumber()),
serviceName,
r.Body().AsString(),
resourceKeys,
resourceValues,
attrKeys,
attrValues,
resAttr,
logAttr,
)
if err != nil {
return fmt.Errorf("ExecContext:%w", err)
Expand All @@ -104,53 +107,45 @@ func (e *clickhouseExporter) pushLogsData(ctx context.Context, ld plog.Logs) err
return nil
})
duration := time.Since(start)
e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()),
e.logger.Info("insert logs", zap.Int("records", ld.LogRecordCount()),
zap.String("cost", duration.String()))
return err
}

func attributesToSlice(attributes pcommon.Map) ([]string, []string) {
keys := make([]string, 0, attributes.Len())
values := make([]string, 0, attributes.Len())
func attributesToMap(attributes pcommon.Map) map[string]string {
m := make(map[string]string, attributes.Len())
attributes.Range(func(k string, v pcommon.Value) bool {
keys = append(keys, formatKey(k))
values = append(values, v.AsString())
m[k] = v.StringVal()
return true
})
return keys, values
}

func formatKey(k string) string {
return strings.ReplaceAll(k, ".", "_")
return m
}

const (
// language=ClickHouse SQL
createLogsTableSQL = `
CREATE TABLE IF NOT EXISTS %s (
Timestamp DateTime CODEC(Delta, ZSTD(1)),
Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
SpanId String CODEC(ZSTD(1)),
TraceFlags UInt32,
TraceFlags UInt32 CODEC(ZSTD(1)),
SeverityText LowCardinality(String) CODEC(ZSTD(1)),
SeverityNumber Int32,
SeverityNumber Int32 CODEC(ZSTD(1)),
ServiceName LowCardinality(String) CODEC(ZSTD(1)),
Body String CODEC(ZSTD(1)),
ResourceAttributes Nested
(
Key LowCardinality(String),
Value String
) CODEC(ZSTD(1)),
LogAttributes Nested
(
Key LowCardinality(String),
Value String
) CODEC(ZSTD(1)),
INDEX idx_attr_keys ResourceAttributes.Key TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX idx_res_keys LogAttributes.Key TYPE bloom_filter(0.01) GRANULARITY 64
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
LogAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_log_attr_value mapValues(LogAttributes) TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
) ENGINE MergeTree()
%s
PARTITION BY toDate(Timestamp)
ORDER BY (toUnixTimestamp(Timestamp));
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
`
// language=ClickHouse SQL
insertLogsSQLTemplate = `INSERT INTO %s (
Expand All @@ -160,11 +155,10 @@ ORDER BY (toUnixTimestamp(Timestamp));
TraceFlags,
SeverityText,
SeverityNumber,
ServiceName,
Body,
ResourceAttributes.Key,
ResourceAttributes.Value,
LogAttributes.Key,
LogAttributes.Value
ResourceAttributes,
LogAttributes
) VALUES (
?,
?,
Expand All @@ -175,7 +169,6 @@ ORDER BY (toUnixTimestamp(Timestamp));
?,
?,
?,
?,
?
)`
)
Expand All @@ -194,7 +187,7 @@ func newClickhouseClient(cfg *Config) (*sql.DB, error) {
if cfg.TTLDays > 0 {
query = fmt.Sprintf(createLogsTableSQL,
cfg.LogsTableName,
fmt.Sprintf(`TTL Timestamp + INTERVAL %d DAY`, cfg.TTLDays))
fmt.Sprintf(`TTL toDateTime(Timestamp) + toIntervalDay(%d)`, cfg.TTLDays))
}
if _, err := db.Exec(query); err != nil {
return nil, fmt.Errorf("exec create table sql: %w", err)
Expand Down
8 changes: 6 additions & 2 deletions exporter/clickhouseexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ require (
require go.uber.org/multierr v1.8.0

require (
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/ClickHouse/clickhouse-go/v2 v2.2.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector/pdata v0.57.2
go.opentelemetry.io/collector/semconv v0.57.2
)

require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf v1.4.2 // indirect
github.com/kr/pretty v0.3.0 // indirect
Expand All @@ -32,9 +33,12 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/paulmach/orb v0.7.1 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/metric v0.31.0 // indirect
Expand Down
Loading

0 comments on commit 40f6af2

Please sign in to comment.