Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ClickHouse driver to sql inputs/outputs plugins #9671

Merged
merged 21 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ following works:
- github.com/Azure/go-amqp [MIT License](https://github.com/Azure/go-amqp/blob/master/LICENSE)
- github.com/Azure/go-autorest [Apache License 2.0](https://github.com/Azure/go-autorest/blob/master/LICENSE)
- github.com/Azure/go-ntlmssp [MIT License](https://github.com/Azure/go-ntlmssp/blob/master/LICENSE)
- github.com/ClickHouse/clickhouse-go [MIT License](https://github.com/ClickHouse/clickhouse-go/blob/master/LICENSE)
- github.com/Mellanox/rdmamap [Apache License 2.0](https://github.com/Mellanox/rdmamap/blob/master/LICENSE)
- github.com/Microsoft/go-winio [MIT License](https://github.com/Microsoft/go-winio/blob/master/LICENSE)
- github.com/Shopify/sarama [MIT License](https://github.com/Shopify/sarama/blob/master/LICENSE)
Expand Down
19 changes: 10 additions & 9 deletions docs/SQL_DRIVERS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
This is a list of available drivers for the SQL input plugin. The data-source-name (DSN) is driver specific and
might change between versions. Please check the driver documentation for available options and the format.

database | driver | aliases | example DSN | comment
---------------------| ------------------------------------------------------| --------------- | -------------------------------------------------------------------------------------- | -------
CockroachDB | [cockroach](https://github.com/jackc/pgx) | postgres or pgx | see _postgres_ driver | uses PostgresQL driver
MariaDB | [maria](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver
Microsoft SQL Server | [sqlserver](https://github.com/denisenkom/go-mssqldb) | mssql | `username:password@host/instance?param1=value&param2=value` | uses newer _sqlserver_ driver
MySQL | [mysql](https://github.com/go-sql-driver/mysql) | | `[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]` | see [driver docs](https://github.com/go-sql-driver/mysql) for more information
PostgreSQL | [postgres](https://github.com/jackc/pgx) | pgx | `[user[:password]@][netloc][:port][,...][/dbname][?param1=value1&...]` | see [postgres docs](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) for more information
SQLite | [sqlite](https://gitlab.com/cznic/sqlite) | | `filename` | see [driver docu](https://pkg.go.dev/modernc.org/sqlite) for more information
TiDB | [tidb](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver
| database | driver | aliases | example DSN | comment |
| -------------------- | --------------------------------------------------------- | --------------- | -------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- |
| CockroachDB | [cockroach](https://github.com/jackc/pgx) | postgres or pgx | see _postgres_ driver | uses PostgresQL driver |
| MariaDB | [maria](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver |
| Microsoft SQL Server | [sqlserver](https://github.com/denisenkom/go-mssqldb) | mssql | `username:password@host/instance?param1=value&param2=value` | uses newer _sqlserver_ driver |
| MySQL | [mysql](https://github.com/go-sql-driver/mysql) | | `[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]` | see [driver docs](https://github.com/go-sql-driver/mysql) for more information |
| PostgreSQL | [postgres](https://github.com/jackc/pgx) | pgx | `[user[:password]@][netloc][:port][,...][/dbname][?param1=value1&...]` | see [postgres docs](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) for more information |
| SQLite | [sqlite](https://gitlab.com/cznic/sqlite) | | `filename` | see [driver docu](https://pkg.go.dev/modernc.org/sqlite) for more information |
| TiDB | [tidb](https://github.com/go-sql-driver/mysql) | mysql | see _mysql_ driver | uses MySQL driver |
| ClickHouse | [clickhouse](https://github.com/ClickHouse/clickhouse-go) | | `tcp://host:port[?param1=value&...&paramN=value]"` | see [clickhouse-go docs](https://github.com/ClickHouse/clickhouse-go#dsn) for more information |

## Comments

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/Azure/go-autorest/autorest/adal v0.9.16
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/BurntSushi/toml v0.4.1
github.com/ClickHouse/clickhouse-go v1.5.1
github.com/Mellanox/rdmamap v0.0.0-20191106181932-7c3c4763a6ee
github.com/Shopify/sarama v1.29.1
github.com/aerospike/aerospike-client-go v1.27.0
Expand Down Expand Up @@ -199,6 +200,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/containerd/cgroups v1.0.1 // indirect
github.com/containerd/containerd v1.5.7 // indirect
github.com/couchbase/gomemcached v0.1.3 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/clickhouse-go v1.5.1 h1:I8zVFZTz80crCs0FFEBJooIxsPcV0xfthzK1YrkpJTc=
github.com/ClickHouse/clickhouse-go v1.5.1/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
Expand Down Expand Up @@ -423,6 +425,8 @@ github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2io
github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/bkielbasa/cyclop v1.2.0/go.mod h1:qOI0yy6A7dYC4Zgsa72Ppm9kONl0RoIlPbzot9mhmeI=
Expand Down Expand Up @@ -483,6 +487,8 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/sql/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sql

import (
// Blank imports to register the drivers
_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/denisenkom/go-mssqldb"
_ "github.com/go-sql-driver/mysql"
_ "github.com/jackc/pgx/v4/stdlib"
Expand Down
111 changes: 111 additions & 0 deletions plugins/inputs/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,114 @@ func TestPostgreSQL(t *testing.T) {
})
}
}

func TestClickHouse(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

logger := testutil.Logger{}

addr := "127.0.0.1"
port := "9000"
user := "default"

if *spinup {
logger.Infof("Spinning up container...")

// Determine the test-data mountpoint
testdata, err := filepath.Abs("testdata/clickhouse")
require.NoError(t, err, "determining absolute path of test-data failed")

// Spin-up the container
ctx := context.Background()
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "yandex/clickhouse-server",
BindMounts: map[string]string{
testdata: "/docker-entrypoint-initdb.d",
},
ExposedPorts: []string{"9000/tcp", "8123/tcp"},
WaitingFor: wait.NewHTTPStrategy("/").WithPort("8123/tcp"),
},
Started: true,
}
container, err := testcontainers.GenericContainer(ctx, req)
require.NoError(t, err, "starting container failed")
defer func() {
require.NoError(t, container.Terminate(ctx), "terminating container failed")
}()

// Get the connection details from the container
addr, err = container.Host(ctx)
require.NoError(t, err, "getting container host address failed")
p, err := container.MappedPort(ctx, "9000/tcp")
require.NoError(t, err, "getting container host port failed")
port = p.Port()
}

// Define the testset
var testset = []struct {
name string
queries []Query
expected []telegraf.Metric
}{
{
name: "metric_one",
queries: []Query{
{
Query: "SELECT * FROM default.metric_one",
TagColumnsInclude: []string{"tag_*"},
FieldColumnsExclude: []string{"tag_*", "timestamp"},
TimeColumn: "timestamp",
TimeFormat: "unix",
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"sql",
map[string]string{
"tag_one": "tag1",
"tag_two": "tag2",
},
map[string]interface{}{
"int64_one": int64(1234),
"int64_two": int64(2345),
},
time.Unix(1621289085, 0),
),
},
},
}

for _, tt := range testset {
t.Run(tt.name, func(t *testing.T) {
// Setup the plugin-under-test
plugin := &SQL{
Driver: "clickhouse",
Dsn: fmt.Sprintf("tcp://%v:%v?username=%v", addr, port, user),
Queries: tt.queries,
Log: logger,
}

var acc testutil.Accumulator

// Startup the plugin
err := plugin.Init()
require.NoError(t, err)
err = plugin.Start(&acc)
require.NoError(t, err)

// Gather
err = plugin.Gather(&acc)
require.NoError(t, err)
require.Len(t, acc.Errors, 0)

// Stopping the plugin
plugin.Stop()

// Do the comparison
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics())
})
}
}
15 changes: 15 additions & 0 deletions plugins/inputs/sql/testdata/clickhouse/expected.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS default.metric_one (
tag_one String,
tag_two String,
int64_one Int64,
int64_two Int64,
timestamp Int64
) ENGINE MergeTree() ORDER BY timestamp;

INSERT INTO default.metric_one (
tag_one,
tag_two,
int64_one,
int64_two,
timestamp
) VALUES ('tag1', 'tag2', 1234, 2345, 1621289085);
18 changes: 17 additions & 1 deletion plugins/outputs/sql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ through the convert settings.
[[outputs.sql]]
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com)
## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse)
# driver = ""

## Data source name
Expand Down Expand Up @@ -140,6 +140,22 @@ FreeBSD, and other Linux and Darwin platforms.
The DSN is a filename or url with scheme "file:". See the [driver
docs](https://modernc.org/sqlite) for details.

### clickhouse

Use this metric type to SQL type conversion:

```toml
[outputs.sql.convert]
integer = "Int64"
text = "String"
timestamp = "DateTime"
defaultvalue = "String"
unsigned = "UInt64"
bool = "Uint8"
```

See [ClickHouse data types](https://clickhouse.com/docs/en/sql-reference/data-types/) for more info.

### denisenkom/go-mssqldb

Telegraf doesn't have unit tests for go-mssqldb so it should be
Expand Down
46 changes: 35 additions & 11 deletions plugins/outputs/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"strings"

//Register sql drivers
_ "github.com/denisenkom/go-mssqldb" // mssql (sql server)
_ "github.com/go-sql-driver/mysql" // mysql
_ "github.com/jackc/pgx/v4/stdlib" // pgx (postgres)
_ "github.com/snowflakedb/gosnowflake" // snowflake
_ "github.com/ClickHouse/clickhouse-go" // clickhouse
_ "github.com/denisenkom/go-mssqldb" // mssql (sql server)
_ "github.com/go-sql-driver/mysql" // mysql
_ "github.com/jackc/pgx/v4/stdlib" // pgx (postgres)
_ "github.com/snowflakedb/gosnowflake" // snowflake

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (p *SQL) deriveDatatype(value interface{}) string {
var sampleConfig = `
## Database driver
## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
## sqlite (SQLite3), snowflake (snowflake.com)
## sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse)
# driver = ""

## Data source name
Expand Down Expand Up @@ -216,6 +217,8 @@ func (p *SQL) tableExists(tableName string) bool {
}

func (p *SQL) Write(metrics []telegraf.Metric) error {
var err error

for _, metric := range metrics {
tablename := metric.Name()

Expand Down Expand Up @@ -248,15 +251,36 @@ func (p *SQL) Write(metrics []telegraf.Metric) error {
}

sql := p.generateInsert(tablename, columns)
_, err := p.db.Exec(sql, values...)

if err != nil {
// check if insert error was caused by column mismatch
p.Log.Errorf("Error during insert: %v, %v", err, sql)
return err
switch p.Driver {
case "clickhouse":
// ClickHouse needs to batch inserts with prepared statements
tx, err := p.db.Begin()
if err != nil {
return fmt.Errorf(": %v", err)
nahsi marked this conversation as resolved.
Show resolved Hide resolved
}
stmt, err := tx.Prepare(sql)
if err != nil {
return fmt.Errorf("Error during prepare: %v", err)
nahsi marked this conversation as resolved.
Show resolved Hide resolved
}
defer stmt.Close() //nolint
nahsi marked this conversation as resolved.
Show resolved Hide resolved

_, err = stmt.Exec(values...)
if err != nil {
return fmt.Errorf("Error during execution: %v", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("Error during commit: %v", err)
}
default:
_, err = p.db.Exec(sql, values...)
if err != nil {
return fmt.Errorf("Error during execution: %v", err)
}
}
}
return nil
return err
nahsi marked this conversation as resolved.
Show resolved Hide resolved
}

func init() {
Expand Down
Loading