From 8cbc6a246fb7c8a18ea89f8575380eeaab44f63d Mon Sep 17 00:00:00 2001 From: Ivan Yurochko Date: Thu, 14 Mar 2024 07:16:22 +0200 Subject: [PATCH 1/3] Ignore materialized and alias cols infered during HTTP prepare batch (#1214) * ignore materialized and alias cols during instantiation of the batch insert we construct all the columns, there is no need to do it for materialized and alias columns as they are non insertable * fix indent * add test case * Update and rename mat_cols_test.go to materialized_column_test.go --------- Co-authored-by: Kuba Kaflik --- conn_http_batch.go | 13 +++-- tests/std/materialized_column_test.go | 77 +++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 tests/std/materialized_column_test.go diff --git a/conn_http_batch.go b/conn_http_batch.go index b1713ccc6e..6fbf3f1100 100644 --- a/conn_http_batch.go +++ b/conn_http_batch.go @@ -63,14 +63,19 @@ func (h *httpConnect) prepareBatch(ctx context.Context, query string, opts drive var colNames []string for r.Next() { var ( - colName string - colType string - ignore string + colName string + colType string + default_type string + ignore string ) - if err = r.Scan(&colName, &colType, &ignore, &ignore, &ignore, &ignore, &ignore); err != nil { + if err = r.Scan(&colName, &colType, &default_type, &ignore, &ignore, &ignore, &ignore); err != nil { return nil, err } + // these column types cannot be specified in INSERT queries + if default_type == "MATERIALIZED" || default_type == "ALIAS" { + continue + } colNames = append(colNames, colName) columns[colName] = colType } diff --git a/tests/std/materialized_column_test.go b/tests/std/materialized_column_test.go new file mode 100644 index 0000000000..1ba1575345 --- /dev/null +++ b/tests/std/materialized_column_test.go @@ -0,0 +1,77 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package std + +import ( + "fmt" + "math/big" + "strconv" + "testing" + + "github.com/ClickHouse/clickhouse-go/v2" + clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMaterializedColumnInsert(t *testing.T) { + dsns := map[string]clickhouse.Protocol{"Native": clickhouse.Native, "Http": clickhouse.HTTP} + useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false")) + require.NoError(t, err) + for name, protocol := range dsns { + t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) { + if conn, err := GetStdDSNConnection(protocol, useSSL, nil); assert.NoError(t, err) { + if !CheckMinServerVersion(conn, 21, 12, 0) { + t.Skip(fmt.Errorf("unsupported clickhouse version")) + return + } + const ddl = ` + CREATE TABLE test_mat_cols ( + Col1 Int128 + , Col2 MATERIALIZED Col1 * 2 + ) Engine MergeTree() ORDER BY tuple() + ` + defer func() { + conn.Exec("DROP TABLE test_mat_cols") + }() + _, err := conn.Exec(ddl) + require.NoError(t, err) + scope, err := conn.Begin() + require.NoError(t, err) + batch, err := scope.Prepare("INSERT INTO test_mat_cols") + require.NoError(t, err) + var ( + col1Data = big.NewInt(128) + col2Data = big.NewInt(128 * 2) + ) + _, err = batch.Exec(col1Data) + require.NoError(t, err) + require.NoError(t, scope.Commit()) + var ( + col1 big.Int + col2 big.Int + ) + require.NoError(t, conn.QueryRow("SELECT * FROM test_mat_cols").Scan(&col1)) + assert.Equal(t, *col1Data, col1) + require.NoError(t, conn.QueryRow("SELECT Col1, Col2 from test_mat_cols").Scan(&col1, &col2)) + assert.Equal(t, *col1Data, col1) + assert.Equal(t, *col2Data, col2) + } + }) + } +} From 22a33513eabfc376a9e4ef9b421cb24e6bf93223 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 14 Mar 2024 06:16:36 +0100 Subject: [PATCH 2/3] Bump google.golang.org/protobuf from 1.31.0 to 1.33.0 (#1232) Bumps google.golang.org/protobuf from 1.31.0 to 1.33.0. --- updated-dependencies: - dependency-name: google.golang.org/protobuf dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 35973ac2e1..1792a31889 100644 --- a/go.mod +++ b/go.mod @@ -73,5 +73,5 @@ require ( golang.org/x/tools v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/grpc v1.58.3 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect ) diff --git a/go.sum b/go.sum index 442a8e52f1..d45da47e96 100644 --- a/go.sum +++ b/go.sum @@ -253,8 +253,8 @@ google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSs google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= From 9491310e41542906087e531aed4128913f74f98e Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Thu, 14 Mar 2024 11:40:35 +0100 Subject: [PATCH 3/3] Experimental feature of rows blocks be appended directly to the batch (#1233) --- conn_batch.go | 37 ++++++++++++++++++ tests/batch_block_test.go | 80 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 tests/batch_block_test.go diff --git a/conn_batch.go b/conn_batch.go index 7d42a6aba3..246d9118f3 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -133,6 +133,13 @@ func (b *batch) Append(v ...any) error { if b.err != nil { return b.err } + + if len(v) > 0 { + if r, ok := v[0].(*rows); ok { + return b.appendRowsBlocks(r) + } + } + if err := b.block.Append(v...); err != nil { b.err = errors.Wrap(ErrBatchInvalid, err.Error()) b.release(err) @@ -141,6 +148,36 @@ func (b *batch) Append(v ...any) error { return nil } +// appendRowsBlocks is an experimental feature that allows rows blocks be appended directly to the batch. +// This API is not stable and may be changed in the future. +// See: tests/batch_block_test.go +func (b *batch) appendRowsBlocks(r *rows) error { + var lastReadLock *proto.Block + var blockNum int + + for r.Next() { + if lastReadLock == nil { // make sure the first block is logged + b.conn.debugf("[batch.appendRowsBlocks] blockNum = %d", blockNum) + } + + // rows.Next() will read the next block from the server only if the current block is empty + // only if new block is available we should flush the current block + // the last block will be handled by the batch.Send() method + if lastReadLock != nil && lastReadLock != r.block { + if err := b.Flush(); err != nil { + return err + } + blockNum++ + b.conn.debugf("[batch.appendRowsBlocks] blockNum = %d", blockNum) + } + + b.block = r.block + lastReadLock = r.block + } + + return nil +} + func (b *batch) AppendStruct(v any) error { if b.err != nil { return b.err diff --git a/tests/batch_block_test.go b/tests/batch_block_test.go new file mode 100644 index 0000000000..41d2fe61cf --- /dev/null +++ b/tests/batch_block_test.go @@ -0,0 +1,80 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tests + +import ( + "sync/atomic" + "testing" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" +) + +// TestBatchAppendRows tests experimental batch rows blocks append feature. +// This API is not stable and may be changed in the future. +func TestBatchAppendRows(t *testing.T) { + te, err := GetTestEnvironment(testSet) + require.NoError(t, err) + blocksRead := atomic.Uint64{} + opts := ClientOptionsFromEnv(te, clickhouse.Settings{}) + opts.Debug = true + opts.Debugf = func(format string, args ...interface{}) { + if format == "[batch.appendRowsBlocks] blockNum = %d" { + blocksRead.Store(uint64(args[0].(int))) // store the last block number read from rows + } + } + + conn, err := GetConnectionWithOptions(&opts) + require.NoError(t, err) + + ctx := context.Background() + + // given we have two tables and a million rows in the source table + var tables = []string{"source", "target"} + for _, table := range tables { + require.NoError(t, conn.Exec(context.Background(), "create table if not exists "+table+" (number1 Int, number2 String, number3 Tuple(String, Int), number4 DateTime) engine = MergeTree() order by tuple()")) + defer conn.Exec(context.Background(), "drop table if exists "+table) + } + + require.NoError(t, conn.Exec(ctx, "INSERT INTO source SELECT number, 'string', tuple('foo', number), now() FROM system.numbers LIMIT 1000000")) + + // when we create a batch with direct data block access 10 times + + selectCtx := clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ + "max_block_size": 1000, + })) + + sourceRows, err := conn.Query(selectCtx, "SELECT * FROM source") + require.NoError(t, err) + defer sourceRows.Close() + + b, err := conn.PrepareBatch(ctx, "INSERT INTO target") + require.NoError(t, err) + require.NoError(t, b.Append(sourceRows)) + require.NoError(t, b.Send()) + + // then we should be able to see the data in the target table + row := conn.QueryRow(ctx, "SELECT count() FROM source") + require.NoError(t, row.Err()) + var count uint64 + require.NoError(t, row.Scan(&count)) + assert.Equal(t, uint64(1000000), count) + assert.Equal(t, uint64(999), blocksRead.Load()) +}