Skip to content

Commit

Permalink
fix: alter partition table tag column (#1304)
Browse files Browse the repository at this point in the history
## Rationale
Fix adding column in `partition table` bug. 

## Detailed Changes
1. Checking the schema inconsistency error in the write process, caused
by adding columns.
    - Note: This only works for grpc write
3. Add a Go SDK test to check adding a column in a "partition table".
4. Start ceresdb cluster to run go-sdk tests.

## Test Plan
GO SDK test.
  • Loading branch information
chunshao90 authored Nov 10, 2023
1 parent d1f3349 commit 718c230
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 19 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ jobs:
- uses: actions/checkout@v3
with:
submodules: true
- uses: actions/setup-go@v3
with:
go-version: 1.21
- run: |
rustup set auto-self-update disable
rustup toolchain install ${RUST_VERSION} --profile minimal
Expand All @@ -169,11 +172,12 @@ jobs:
run: |
sudo apt update
sudo apt install --yes protobuf-compiler
- name: Build and Run CeresDB
- name: Build and Run CeresDB-Cluster
working-directory: integration_tests
run: |
make build-debug
nohup ./target/debug/ceresdb-server -c docs/minimal.toml > /tmp/ceresdb-stdout.log &
sleep 10
make prepare
make run-ceresmeta
make run-ceresdb-cluster
- name: Run Go SDK tests
working-directory: integration_tests
run: |
Expand Down
17 changes: 15 additions & 2 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,25 @@ build-test:

build: build-ceresdb build-test

kill-old-process:
kill-old-ceresmeta:
killall ceresmeta-server | true

kill-old-ceresdb:
killall ceresdb-server | true
killall ceresmeta | true

kill-old-process: kill-old-ceresmeta kill-old-ceresdb

prepare: clean build kill-old-process

run-ceresmeta: build-meta
nohup $(CERESMETA_BINARY_PATH) --config ${CERESMETA_CONFIG_PATH} > /tmp/ceresmeta-stdout.log 2>&1 &
sleep 10

run-ceresdb-cluster: build-ceresdb
nohup ${CERESDB_BINARY_PATH} --config ${CERESDB_CONFIG_FILE_0} > ${CLUSTER_CERESDB_STDOUT_FILE_0} 2>&1 &
nohup ${CERESDB_BINARY_PATH} --config ${CERESDB_CONFIG_FILE_1} > ${CLUSTER_CERESDB_STDOUT_FILE_1} 2>&1 &
sleep 30

run: prepare build-meta
$(CERESDB_TEST_BINARY)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/config/ceresdb-cluster-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ level = "debug"

[server]
bind_addr = "0.0.0.0"
http_port = 15440
grpc_port = 18831
http_port = 5441
grpc_port = 8832
mysql_port = 13307
postgresql_port = 15433
deploy_mode = "Cluster"
Expand Down
193 changes: 193 additions & 0 deletions integration_tests/sdk/go/alteraddcolumn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package main

import (
"context"
"fmt"

"github.com/CeresDB/ceresdb-client-go/ceresdb"
)

const fieldName = "b"
const tagName = "btag"
const timestampName = "t"

func checkPartitionTableAddColumn(ctx context.Context, client ceresdb.Client) error {
err := dropTable(ctx, client, partitionTable)
if err != nil {
return err
}

_, err = ddl(ctx, client, partitionTable, fmt.Sprintf(
"CREATE TABLE `%s`( "+
"`name`string TAG,"+
"`id` int TAG,"+
"`value` int64 NOT NULL,"+
"`t` timestamp NOT NULL,"+
"TIMESTAMP KEY(t)) "+
"PARTITION BY KEY(name) PARTITIONS 4 ENGINE = Analytic", partitionTable))
if err != nil {
return err
}

_, err = ddl(ctx, client, partitionTable, fmt.Sprintf("ALTER TABLE `%s` ADD COLUMN (%s string);", partitionTable, fieldName))
if err != nil {
return err
}

ts := currentMS()

// First write will fail, because the schema is not updated yet.
// Currently, ceresdb will update the schema when write failed.
err = writePartitionTableNewField(ctx, client, ts, fieldName)
if err == nil {
panic("first write should fail")
}

if err := writePartitionTableNewField(ctx, client, ts, fieldName); err != nil {
return err
}

_, err = ddl(ctx, client, partitionTable, fmt.Sprintf("ALTER TABLE `%s` ADD COLUMN (%s string TAG);", partitionTable, tagName))
if err != nil {
return err
}

// First write will fail, because the schema is not updated yet.
// Currently, write failed will update the schema.
err = writePartitionTableNewTag(ctx, client, ts, tagName)
if err == nil {
panic("first write should fail")
}

if err := writePartitionTableNewTag(ctx, client, ts, tagName); err != nil {
return err
}

if err := queryPartitionTable(ctx, client, ts, timestampName); err != nil {
return err
}

return nil
}

func writePartitionTableNewField(ctx context.Context, client ceresdb.Client, ts int64, fieldName string) error {
points := make([]ceresdb.Point, 0, 2)
for i := 0; i < 2; i++ {
builder := ceresdb.NewPointBuilder(partitionTable).
SetTimestamp(ts).
AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))).
AddField("value", ceresdb.NewInt64Value(int64(i))).
AddField(fieldName, ceresdb.NewStringValue("ss"))

point, err := builder.Build()

if err != nil {
return err
}
points = append(points, point)
}

resp, err := client.Write(ctx, ceresdb.WriteRequest{
Points: points,
})
if err != nil {
return err
}

if resp.Success != 2 {
return fmt.Errorf("write failed, resp: %+v", resp)
}
return nil
}

func writePartitionTableNewTag(ctx context.Context, client ceresdb.Client, ts int64, tagName string) error {
points := make([]ceresdb.Point, 0, 2)
for i := 0; i < 2; i++ {
builder := ceresdb.NewPointBuilder(partitionTable).
SetTimestamp(ts).
AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))).
AddField("value", ceresdb.NewInt64Value(int64(i))).
AddTag(tagName, ceresdb.NewStringValue("sstag")).
AddField(fieldName, ceresdb.NewStringValue("ss"))

point, err := builder.Build()

if err != nil {
return err
}
points = append(points, point)
}

resp, err := client.Write(ctx, ceresdb.WriteRequest{
Points: points,
})
if err != nil {
return err
}

if resp.Success != 2 {
return fmt.Errorf("write failed, resp: %+v", resp)
}
return nil
}

func queryPartitionTable(ctx context.Context, client ceresdb.Client, ts int64, timestampName string) error {
sql := fmt.Sprintf("select t, name, value,%s,%s from %s where %s = %d order by name,%s", fieldName, tagName, partitionTable, timestampName, ts, tagName)

resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{
Tables: []string{partitionTable},
SQL: sql,
})
if err != nil {
return err
}

if len(resp.Rows) != 4 {
return fmt.Errorf("expect 2 rows, current: %+v", len(resp.Rows))
}

row0 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-0"),
ceresdb.NewInt64Value(0),
ceresdb.NewStringValue("ss"),
ceresdb.NewStringValue("sstag"),
}

row1 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-0"),
ceresdb.NewInt64Value(0),
ceresdb.NewStringValue("ss"),
}

row2 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-1"),
ceresdb.NewInt64Value(1),
ceresdb.NewStringValue("ss"),
ceresdb.NewStringValue("sstag"),
}

row3 := []ceresdb.Value{
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-1"),
ceresdb.NewInt64Value(1),
ceresdb.NewStringValue("ss"),
}

if err := ensureRow(row0,
resp.Rows[0].Columns()); err != nil {
return err
}
if err := ensureRow(row1,
resp.Rows[1].Columns()); err != nil {
return err
}
if err := ensureRow(row2,
resp.Rows[2].Columns()); err != nil {
return err
}

return ensureRow(row3, resp.Rows[3].Columns())
}
2 changes: 1 addition & 1 deletion integration_tests/sdk/go/autocreatetable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func checkAutoAddColumns(ctx context.Context, client ceresdb.Client) error {
timestampName := "timestamp"
err := dropTable(ctx, client)
err := dropTable(ctx, client, table)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions integration_tests/sdk/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ module go-sdk-test

go 1.17

require github.com/CeresDB/ceresdb-client-go v1.1.0
require github.com/CeresDB/ceresdb-client-go v1.1.3

require (
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230228090856-37ba6214b131 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions integration_tests/sdk/go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdb-client-go v1.1.0 h1:iCBb03OubOA7NCLSiYBis9lMaR3nu6s0wipUulEeonU=
github.com/CeresDB/ceresdb-client-go v1.1.0/go.mod h1:bZneg5VvvfSZhCnlKO0SjIoPY+fLKnFwhPb3gijLorE=
github.com/CeresDB/ceresdb-client-go v1.1.3 h1:Qm01ekHSSWfuAjSyMtuGYs5L2UlKl6SIXVWYHpcH5GY=
github.com/CeresDB/ceresdb-client-go v1.1.3/go.mod h1:Wg7MC22gqth8rpmJiQW85meo8UbPZDjzWMgVvWfP08w=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230228090856-37ba6214b131 h1:ePzWeIoLOT4FsdjNrmEVrrtm412H2xHM6ZNLamB0bqk=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230228090856-37ba6214b131/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
Expand Down Expand Up @@ -783,6 +785,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -822,6 +826,7 @@ github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/sdk/go/issue-779.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
func checkAutoAddColumnsWithCreateTable(ctx context.Context, client ceresdb.Client) error {
timestampName := "timestamp"

err := dropTable(ctx, client)
err := dropTable(ctx, client, table)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/sdk/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func main() {
panic(err)
}

if err = checkPartitionTableAddColumn(ctx, client); err != nil {
panic(err)
}

fmt.Println("Test done")
}

Expand Down
11 changes: 6 additions & 5 deletions integration_tests/sdk/go/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
)

const table = "godemo"
const partitionTable = "godemoPartition"

func createTable(ctx context.Context, client ceresdb.Client, timestampName string) error {
_, err := ddl(ctx, client, fmt.Sprintf("create table %s (`%s` timestamp not null, name string tag, value int64,TIMESTAMP KEY(%s))", table, timestampName, timestampName))
_, err := ddl(ctx, client, table, fmt.Sprintf("create table %s (`%s` timestamp not null, name string tag, value int64,TIMESTAMP KEY(%s))", table, timestampName, timestampName))
return err
}

Expand Down Expand Up @@ -100,9 +101,9 @@ func query(ctx context.Context, client ceresdb.Client, ts int64, timestampName s
return ensureRow(row1, resp.Rows[1].Columns())
}

func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error) {
func ddl(ctx context.Context, client ceresdb.Client, tableName string, sql string) (uint32, error) {
resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{
Tables: []string{table},
Tables: []string{tableName},
SQL: sql,
})
if err != nil {
Expand Down Expand Up @@ -138,8 +139,8 @@ func writeAndQueryWithNewColumns(ctx context.Context, client ceresdb.Client, tim
return nil
}

func dropTable(ctx context.Context, client ceresdb.Client) error {
affected, err := ddl(ctx, client, "drop table if exists "+table)
func dropTable(ctx context.Context, client ceresdb.Client, table string) error {
affected, err := ddl(ctx, client, table, "drop table if exists "+table)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 718c230

Please sign in to comment.