Skip to content

Commit

Permalink
feat: Add ClickHouse driver to sql inputs/outputs plugins (#9671)
Browse files Browse the repository at this point in the history
  • Loading branch information
nahsi authored Jan 28, 2022
1 parent 74357c5 commit 531d7bb
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ following works:
- github.com/Azure/go-amqp [MIT License](https://github.com/Azure/go-amqp/blob/master/LICENSE)
- github.com/Azure/go-autorest [Apache License 2.0](https://github.com/Azure/go-autorest/blob/master/LICENSE)
- github.com/Azure/go-ntlmssp [MIT License](https://github.com/Azure/go-ntlmssp/blob/master/LICENSE)
- github.com/ClickHouse/clickhouse-go [MIT License](https://github.com/ClickHouse/clickhouse-go/blob/master/LICENSE)
- github.com/Mellanox/rdmamap [Apache License 2.0](https://github.com/Mellanox/rdmamap/blob/master/LICENSE)
- github.com/Microsoft/go-winio [MIT License](https://github.com/Microsoft/go-winio/blob/master/LICENSE)
- github.com/Shopify/sarama [MIT License](https://github.com/Shopify/sarama/blob/master/LICENSE)
Expand Down
19 changes: 10 additions & 9 deletions docs/SQL_DRIVERS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
This is a list of available drivers for the SQL input plugin. The data-source-name (DSN) is driver specific and
might change between versions. Please check the driver documentation for available options and the format.

database | driver | aliases | example DSN | comment
---------------------| ------------------------------------------------------| --------------- | -------------------------------------------------------------------------------------- | -------
CockroachDB | [cockroach](https://github.com/jackc/pgx) | postgres or pgx | see _postgres_ driver | uses PostgresQL driver
MariaDB | [maria](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver
Microsoft SQL Server | [sqlserver](https://github.com/denisenkom/go-mssqldb) | mssql | `username:password@host/instance?param1=value&param2=value` | uses newer _sqlserver_ driver
MySQL | [mysql](https://github.com/go-sql-driver/mysql) | | `[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]` | see [driver docs](https://github.com/go-sql-driver/mysql) for more information
PostgreSQL | [postgres](https://github.com/jackc/pgx) | pgx | `[user[:password]@][netloc][:port][,...][/dbname][?param1=value1&...]` | see [postgres docs](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) for more information
SQLite | [sqlite](https://gitlab.com/cznic/sqlite) | | `filename` | see [driver docu](https://pkg.go.dev/modernc.org/sqlite) for more information
TiDB | [tidb](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver
| database | driver | aliases | example DSN | comment |
| -------------------- | --------------------------------------------------------- | --------------- | -------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- |
| CockroachDB | [cockroach](https://github.com/jackc/pgx) | postgres or pgx | see _postgres_ driver | uses PostgresQL driver |
| MariaDB | [maria](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver |
| Microsoft SQL Server | [sqlserver](https://github.com/denisenkom/go-mssqldb) | mssql | `username:password@host/instance?param1=value&param2=value` | uses newer _sqlserver_ driver |
| MySQL | [mysql](https://github.com/go-sql-driver/mysql) | | `[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]` | see [driver docs](https://github.com/go-sql-driver/mysql) for more information |
| PostgreSQL | [postgres](https://github.com/jackc/pgx) | pgx | `[user[:password]@][netloc][:port][,...][/dbname][?param1=value1&...]` | see [postgres docs](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) for more information |
| SQLite | [sqlite](https://gitlab.com/cznic/sqlite) | | `filename` | see [driver docu](https://pkg.go.dev/modernc.org/sqlite) for more information |
| TiDB | [tidb](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver |
| ClickHouse | [clickhouse](https://github.com/ClickHouse/clickhouse-go) | | `tcp://host:port[?param1=value&...&paramN=value]"` | see [clickhouse-go docs](https://github.com/ClickHouse/clickhouse-go#dsn) for more information |

## Comments

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/Azure/go-autorest/autorest/adal v0.9.16
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/BurntSushi/toml v0.4.1
github.com/ClickHouse/clickhouse-go v1.5.1
github.com/Mellanox/rdmamap v0.0.0-20191106181932-7c3c4763a6ee
github.com/Shopify/sarama v1.29.1
github.com/aerospike/aerospike-client-go v1.27.0
Expand Down Expand Up @@ -202,6 +203,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/containerd/cgroups v1.0.1 // indirect
github.com/containerd/containerd v1.5.9 // indirect
github.com/couchbase/gomemcached v0.1.3 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/clickhouse-go v1.5.1 h1:I8zVFZTz80crCs0FFEBJooIxsPcV0xfthzK1YrkpJTc=
github.com/ClickHouse/clickhouse-go v1.5.1/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
Expand Down Expand Up @@ -432,6 +434,8 @@ github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2io
github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/bkielbasa/cyclop v1.2.0/go.mod h1:qOI0yy6A7dYC4Zgsa72Ppm9kONl0RoIlPbzot9mhmeI=
Expand Down Expand Up @@ -494,6 +498,8 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/sql/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sql

import (
// Blank imports to register the drivers
_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/denisenkom/go-mssqldb"
_ "github.com/go-sql-driver/mysql"
_ "github.com/jackc/pgx/v4/stdlib"
Expand Down
111 changes: 111 additions & 0 deletions plugins/inputs/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,114 @@ func TestPostgreSQL(t *testing.T) {
})
}
}

func TestClickHouse(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

logger := testutil.Logger{}

addr := "127.0.0.1"
port := "9000"
user := "default"

if *spinup {
logger.Infof("Spinning up container...")

// Determine the test-data mountpoint
testdata, err := filepath.Abs("testdata/clickhouse")
require.NoError(t, err, "determining absolute path of test-data failed")

// Spin-up the container
ctx := context.Background()
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "yandex/clickhouse-server",
BindMounts: map[string]string{
testdata: "/docker-entrypoint-initdb.d",
},
ExposedPorts: []string{"9000/tcp", "8123/tcp"},
WaitingFor: wait.NewHTTPStrategy("/").WithPort("8123/tcp"),
},
Started: true,
}
container, err := testcontainers.GenericContainer(ctx, req)
require.NoError(t, err, "starting container failed")
defer func() {
require.NoError(t, container.Terminate(ctx), "terminating container failed")
}()

// Get the connection details from the container
addr, err = container.Host(ctx)
require.NoError(t, err, "getting container host address failed")
p, err := container.MappedPort(ctx, "9000/tcp")
require.NoError(t, err, "getting container host port failed")
port = p.Port()
}

// Define the testset
var testset = []struct {
name string
queries []Query
expected []telegraf.Metric
}{
{
name: "metric_one",
queries: []Query{
{
Query: "SELECT * FROM default.metric_one",
TagColumnsInclude: []string{"tag_*"},
FieldColumnsExclude: []string{"tag_*", "timestamp"},
TimeColumn: "timestamp",
TimeFormat: "unix",
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"sql",
map[string]string{
"tag_one": "tag1",
"tag_two": "tag2",
},
map[string]interface{}{
"int64_one": int64(1234),
"int64_two": int64(2345),
},
time.Unix(1621289085, 0),
),
},
},
}

for _, tt := range testset {
t.Run(tt.name, func(t *testing.T) {
// Setup the plugin-under-test
plugin := &SQL{
Driver: "clickhouse",
Dsn: fmt.Sprintf("tcp://%v:%v?username=%v", addr, port, user),
Queries: tt.queries,
Log: logger,
}

var acc testutil.Accumulator

// Startup the plugin
err := plugin.Init()
require.NoError(t, err)
err = plugin.Start(&acc)
require.NoError(t, err)

// Gather
err = plugin.Gather(&acc)
require.NoError(t, err)
require.Len(t, acc.Errors, 0)

// Stopping the plugin
plugin.Stop()

// Do the comparison
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics())
})
}
}
15 changes: 15 additions & 0 deletions plugins/inputs/sql/testdata/clickhouse/expected.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS default.metric_one (
tag_one String,
tag_two String,
int64_one Int64,
int64_two Int64,
timestamp Int64
) ENGINE MergeTree() ORDER BY timestamp;

INSERT INTO default.metric_one (
tag_one,
tag_two,
int64_one,
int64_two,
timestamp
) VALUES ('tag1', 'tag2', 1234, 2345, 1621289085);
18 changes: 17 additions & 1 deletion plugins/outputs/sql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ through the convert settings.
[[outputs.sql]]
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com)
## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse)
# driver = ""

## Data source name
Expand Down Expand Up @@ -147,6 +147,22 @@ FreeBSD, and other Linux and Darwin platforms.
The DSN is a filename or url with scheme "file:". See the [driver
docs](https://modernc.org/sqlite) for details.

### clickhouse

Use this metric type to SQL type conversion:

```toml
[outputs.sql.convert]
integer = "Int64"
text = "String"
timestamp = "DateTime"
defaultvalue = "String"
unsigned = "UInt64"
bool = "Uint8"
```

See [ClickHouse data types](https://clickhouse.com/docs/en/sql-reference/data-types/) for more info.

### denisenkom/go-mssqldb

Telegraf doesn't have unit tests for go-mssqldb so it should be
Expand Down
44 changes: 34 additions & 10 deletions plugins/outputs/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"strings"

//Register sql drivers
_ "github.com/denisenkom/go-mssqldb" // mssql (sql server)
_ "github.com/go-sql-driver/mysql" // mysql
_ "github.com/jackc/pgx/v4/stdlib" // pgx (postgres)
_ "github.com/snowflakedb/gosnowflake" // snowflake
_ "github.com/ClickHouse/clickhouse-go" // clickhouse
_ "github.com/denisenkom/go-mssqldb" // mssql (sql server)
_ "github.com/go-sql-driver/mysql" // mysql
_ "github.com/jackc/pgx/v4/stdlib" // pgx (postgres)
_ "github.com/snowflakedb/gosnowflake" // snowflake

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (p *SQL) deriveDatatype(value interface{}) string {
var sampleConfig = `
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com)
## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse)
# driver = ""
## Data source name
Expand Down Expand Up @@ -223,6 +224,8 @@ func (p *SQL) tableExists(tableName string) bool {
}

func (p *SQL) Write(metrics []telegraf.Metric) error {
var err error

for _, metric := range metrics {
tablename := metric.Name()

Expand Down Expand Up @@ -255,12 +258,33 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
}

sql := p.generateInsert(tablename, columns)
_, err := p.db.Exec(sql, values...)

if err != nil {
// check if insert error was caused by column mismatch
p.Log.Errorf("Error during insert: %v, %v", err, sql)
return err
switch p.Driver {
case "clickhouse":
// ClickHouse needs to batch inserts with prepared statements
tx, err := p.db.Begin()
if err != nil {
return fmt.Errorf("begin failed: %v", err)
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("prepare failed: %v", err)
}
defer stmt.Close() //nolint:revive // We cannot do anything about a failing close.

_, err = stmt.Exec(values...)

This comment has been minimized.

Copy link
@xtrime-ru

xtrime-ru Jul 3, 2023

Looks like this will insert single row, which is very bad for clickhouse perfomance.
clickhouse-go suggest to do begin->prepare->(in loop exec)->commit.
Is it bug or done by purpose?

if err != nil {
return fmt.Errorf("execution failed: %v", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit failed: %v", err)
}
default:
_, err = p.db.Exec(sql, values...)
if err != nil {
return fmt.Errorf("execution failed: %v", err)
}
}
}
return nil
Expand Down
Loading

0 comments on commit 531d7bb

Please sign in to comment.