25
25
clickhouseUsername string
26
26
clickhousePassword string
27
27
clickhouseDatabase string
28
- clickhouseWriteTimeout string
29
- clickhouseReadTimeout string
28
+ clickhouseDialTimeout string
29
+ clickhouseConnMaxLifetime string
30
+ clickhouseMaxIdleConns int
31
+ clickhouseMaxOpenConns int
30
32
clickhouseAsyncInsert bool
31
33
clickhouseWaitForAsyncInsert bool
32
34
clickhouseBatchSize int64
@@ -65,14 +67,32 @@ func init() {
65
67
defaultClickHouseDatabase = os .Getenv ("CLICKHOUSE_DATABASE" )
66
68
}
67
69
68
- defaultClickHouseWriteTimeout := "10 "
69
- if os .Getenv ("CLICKHOUSE_WRITE_TIMEOUT " ) != "" {
70
- defaultClickHouseWriteTimeout = os .Getenv ("CLICKHOUSE_WRITE_TIMEOUT " )
70
+ defaultClickHouseDialTimeout := "10s "
71
+ if os .Getenv ("CLICKHOUSE_DIAL_TIMEOUT " ) != "" {
72
+ defaultClickHouseDialTimeout = os .Getenv ("CLICKHOUSE_DIAL_TIMEOUT " )
71
73
}
72
74
73
- defaultClickHouseReadTimeout := "10"
74
- if os .Getenv ("CLICKHOUSE_READ_TIMEOUT" ) != "" {
75
- defaultClickHouseReadTimeout = os .Getenv ("CLICKHOUSE_READ_TIMEOUT" )
75
+ defaultClickHouseConnMaxLifetime := "1h"
76
+ if os .Getenv ("CLICKHOUSE_CONN_MAX_LIFETIME" ) != "" {
77
+ defaultClickHouseConnMaxLifetime = os .Getenv ("CLICKHOUSE_CONN_MAX_LIFETIME" )
78
+ }
79
+
80
+ defaultClickHouseMaxIdleConns := 5
81
+ if os .Getenv ("CLICKHOUSE_MAX_IDLE_CONNS" ) != "" {
82
+ defaultClickHouseMaxIdleConnsString := os .Getenv ("CLICKHOUSE_MAX_IDLE_CONNS" )
83
+ defaultClickHouseMaxIdleConnsParsed , err := strconv .Atoi (defaultClickHouseMaxIdleConnsString )
84
+ if err == nil && defaultClickHouseMaxIdleConnsParsed > 0 {
85
+ defaultClickHouseMaxIdleConns = defaultClickHouseMaxIdleConnsParsed
86
+ }
87
+ }
88
+
89
+ defaultClickHouseMaxOpenConns := 10
90
+ if os .Getenv ("CLICKHOUSE_MAX_OPEN_CONNS" ) != "" {
91
+ defaultClickHouseMaxOpenConnsString := os .Getenv ("CLICKHOUSE_MAX_OPEN_CONNS" )
92
+ defaultClickHouseMaxOpenConnsParsed , err := strconv .Atoi (defaultClickHouseMaxOpenConnsString )
93
+ if err == nil && defaultClickHouseMaxOpenConnsParsed > 0 {
94
+ defaultClickHouseMaxOpenConns = defaultClickHouseMaxOpenConnsParsed
95
+ }
76
96
}
77
97
78
98
defaultClickHouseAsyncInsert := false
@@ -156,8 +176,10 @@ func init() {
156
176
flag .StringVar (& clickhouseUsername , "clickhouse.username" , defaultClickHouseUsername , "ClickHouse username for the connection." )
157
177
flag .StringVar (& clickhousePassword , "clickhouse.password" , defaultClickHousePassword , "ClickHouse password for the connection." )
158
178
flag .StringVar (& clickhouseDatabase , "clickhouse.database" , defaultClickHouseDatabase , "ClickHouse database name." )
159
- flag .StringVar (& clickhouseWriteTimeout , "clickhouse.write-timeout" , defaultClickHouseWriteTimeout , "ClickHouse write timeout for the connection." )
160
- flag .StringVar (& clickhouseReadTimeout , "clickhouse.read-timeout" , defaultClickHouseReadTimeout , "ClickHouse read timeout for the connection." )
179
+ flag .StringVar (& clickhouseDialTimeout , "clickhouse.dial-timeout" , defaultClickHouseDialTimeout , "ClickHouse dial timeout." )
180
+ flag .StringVar (& clickhouseConnMaxLifetime , "clickhouse.conn-max-lifetime" , defaultClickHouseConnMaxLifetime , "ClickHouse maximum connection lifetime." )
181
+ flag .IntVar (& clickhouseMaxIdleConns , "clickhouse.max-idle-conns" , defaultClickHouseMaxIdleConns , "ClickHouse maximum number of idle connections." )
182
+ flag .IntVar (& clickhouseMaxOpenConns , "clickhouse.max-open-conns" , defaultClickHouseMaxOpenConns , "ClickHouse maximum number of open connections." )
161
183
flag .BoolVar (& clickhouseAsyncInsert , "clickhouse.async-insert" , defaultClickHouseAsyncInsert , "Enable async inserts." )
162
184
flag .BoolVar (& clickhouseWaitForAsyncInsert , "clickhouse.wait-for-async-insert" , defaultClickHouseWaitForAsyncInsert , "Wait for async inserts." )
163
185
flag .Int64Var (& clickhouseBatchSize , "clickhouse.batch-size" , defaultClickHouseBatchSize , "The size for how many log lines should be buffered, before they are written to ClickHouse." )
@@ -201,7 +223,7 @@ func main() {
201
223
},
202
224
}
203
225
204
- logger , err := zapConfig .Build ()
226
+ logger , err := zapConfig .Build (zap . AddCaller (), zap . AddCallerSkip ( 1 ) )
205
227
if err != nil {
206
228
panic (err )
207
229
}
@@ -225,7 +247,7 @@ func main() {
225
247
226
248
log .Info (nil , "Version information" , version .Info ()... )
227
249
log .Info (nil , "Build context" , version .BuildContext ()... )
228
- log .Info (nil , "Clickhouse configuration" , zap .String ("clickhouseAddress" , clickhouseAddress ), zap .String ("clickhouseUsername" , clickhouseUsername ), zap .String ("clickhousePassword" , "*****" ), zap .String ("clickhouseDatabase" , clickhouseDatabase ), zap .String ("clickhouseWriteTimeout " , clickhouseWriteTimeout ), zap .String ("clickhouseReadTimeout " , clickhouseReadTimeout ), zap .Int64 ("clickhouseBatchSize" , clickhouseBatchSize ), zap .Duration ("clickhouseFlushInterval" , clickhouseFlushInterval ))
250
+ log .Info (nil , "Clickhouse configuration" , zap .String ("clickhouseAddress" , clickhouseAddress ), zap .String ("clickhouseUsername" , clickhouseUsername ), zap .String ("clickhousePassword" , "*****" ), zap .String ("clickhouseDatabase" , clickhouseDatabase ), zap .String ("clickhouseDialTimeout " , clickhouseDialTimeout ), zap .String ("clickhouseConnMaxLifetime " , clickhouseConnMaxLifetime ), zap . Int ( "clickhouseMaxIdleConns" , clickhouseMaxIdleConns ), zap . Int ( "clickhouseMaxOpenConns" , clickhouseMaxOpenConns ), zap .Int64 ("clickhouseBatchSize" , clickhouseBatchSize ), zap .Duration ("clickhouseFlushInterval" , clickhouseFlushInterval ))
229
251
log .Info (nil , "Kafka configuration" , zap .String ("kafkaBrokers" , kafkaBrokers ), zap .String ("kafkaGroup" , kafkaGroup ), zap .String ("kafkaVersion" , kafkaVersion ), zap .String ("kafkaTopics" , kafkaTopics ))
230
252
231
253
// Create a http server, which can be used for the liveness and readiness probe in Kubernetes. The server also
@@ -246,7 +268,7 @@ func main() {
246
268
// Create a new client for the configured ClickHouse instance. Then pass the ClickHouse client to the Run function
247
269
// of the Kafka package, which listens for message in the configured Kafka instance. These messages are then written
248
270
// to ClickHouse via the created ClickHouse client.
249
- client , err := clickhouse .NewClient (clickhouseAddress , clickhouseUsername , clickhousePassword , clickhouseDatabase , clickhouseWriteTimeout , clickhouseReadTimeout , clickhouseAsyncInsert , clickhouseWaitForAsyncInsert )
271
+ client , err := clickhouse .NewClient (clickhouseAddress , clickhouseUsername , clickhousePassword , clickhouseDatabase , clickhouseDialTimeout , clickhouseConnMaxLifetime , clickhouseMaxIdleConns , clickhouseMaxOpenConns , clickhouseAsyncInsert , clickhouseWaitForAsyncInsert )
250
272
if err != nil {
251
273
log .Fatal (nil , "Could not create ClickHouse client" , zap .Error (err ))
252
274
}
0 commit comments