-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
config.go
173 lines (142 loc) · 5.25 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"
import (
"database/sql"
"errors"
"fmt"
"net/url"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)
// Config defines configuration for Elastic exporter.
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
// Endpoint is the clickhouse endpoint.
Endpoint string `mapstructure:"endpoint"`
// Username is the authentication username.
Username string `mapstructure:"username"`
// Password is the authentication password.
Password configopaque.String `mapstructure:"password"`
// Database is the database name to export.
Database string `mapstructure:"database"`
// ConnectionParams is the extra connection parameters with map format. for example compression/dial_timeout
ConnectionParams map[string]string `mapstructure:"connection_params"`
// LogsTableName is the table name for logs. default is `otel_logs`.
LogsTableName string `mapstructure:"logs_table_name"`
// TracesTableName is the table name for traces. default is `otel_traces`.
TracesTableName string `mapstructure:"traces_table_name"`
// MetricsTableName is the table name for metrics. default is `otel_metrics`.
MetricsTableName string `mapstructure:"metrics_table_name"`
// TTL is The data time-to-live example 30m, 48h. 0 means no ttl.
TTL time.Duration `mapstructure:"ttl"`
// TableEngine is the table engine to use. default is `MergeTree()`.
TableEngine TableEngine `mapstructure:"table_engine"`
// ClusterName if set will append `ON CLUSTER` with the provided name when creating tables.
ClusterName string `mapstructure:"cluster_name"`
// CreateSchema if set to true will run the DDL for creating the database and tables. default is true.
CreateSchema *bool `mapstructure:"create_schema"`
}
// TableEngine defines the ENGINE string value when creating the table.
type TableEngine struct {
Name string `mapstructure:"name"`
Params string `mapstructure:"params"`
}
const defaultDatabase = "default"
const defaultTableEngineName = "MergeTree"
var (
errConfigNoEndpoint = errors.New("endpoint must be specified")
errConfigInvalidEndpoint = errors.New("endpoint must be url format")
)
// Validate the ClickHouse server configuration.
func (cfg *Config) Validate() (err error) {
if cfg.Endpoint == "" {
err = errors.Join(err, errConfigNoEndpoint)
}
dsn, e := cfg.buildDSN(cfg.Database)
if e != nil {
err = errors.Join(err, e)
}
// Validate DSN with clickhouse driver.
// Last chance to catch invalid config.
if _, e := clickhouse.ParseDSN(dsn); e != nil {
err = errors.Join(err, e)
}
return err
}
func (cfg *Config) buildDSN(database string) (string, error) {
dsnURL, err := url.Parse(cfg.Endpoint)
if err != nil {
return "", fmt.Errorf("%w: %s", errConfigInvalidEndpoint, err.Error())
}
queryParams := dsnURL.Query()
// Add connection params to query params.
for k, v := range cfg.ConnectionParams {
queryParams.Set(k, v)
}
// Enable TLS if scheme is https. This flag is necessary to support https connections.
if dsnURL.Scheme == "https" {
queryParams.Set("secure", "true")
}
// Override database if specified in config.
if cfg.Database != "" {
dsnURL.Path = cfg.Database
}
// Override database if specified in database param.
if database != "" {
dsnURL.Path = database
}
// Use default database if not specified in any other place.
if database == "" && cfg.Database == "" && dsnURL.Path == "" {
dsnURL.Path = defaultDatabase
}
// Override username and password if specified in config.
if cfg.Username != "" {
dsnURL.User = url.UserPassword(cfg.Username, string(cfg.Password))
}
dsnURL.RawQuery = queryParams.Encode()
return dsnURL.String(), nil
}
func (cfg *Config) buildDB(database string) (*sql.DB, error) {
dsn, err := cfg.buildDSN(database)
if err != nil {
return nil, err
}
// ClickHouse sql driver will read clickhouse settings from the DSN string.
// It also ensures defaults.
// See https://github.com/ClickHouse/clickhouse-go/blob/08b27884b899f587eb5c509769cd2bdf74a9e2a1/clickhouse_std.go#L189
conn, err := sql.Open(driverName, dsn)
if err != nil {
return nil, err
}
return conn, nil
}
// shouldCreateSchema returns true if the exporter should run the DDL for creating database/tables.
func (cfg *Config) shouldCreateSchema() bool {
if cfg.CreateSchema == nil {
return true // default to true
}
return *cfg.CreateSchema
}
// tableEngineString generates the ENGINE string.
func (cfg *Config) tableEngineString() string {
engine := cfg.TableEngine.Name
params := cfg.TableEngine.Params
if cfg.TableEngine.Name == "" {
engine = defaultTableEngineName
params = ""
}
return fmt.Sprintf("%s(%s)", engine, params)
}
// clusterString generates the ON CLUSTER string. Returns empty string if not set.
func (cfg *Config) clusterString() string {
if cfg.ClusterName == "" {
return ""
}
return fmt.Sprintf("ON CLUSTER %s", cfg.ClusterName)
}