From 6205de9faa8400ca531d52bf4998394223d299ca Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Wed, 29 Mar 2023 19:55:18 +0800 Subject: [PATCH] fix: timestamp column should not be auto added (#787) * fix: fix issue 779 * refactor by CR * modify ci --- .github/workflows/ci.yml | 4 +- integration_tests/Makefile | 3 + integration_tests/sdk/go/autocreatetable.go | 22 +++ integration_tests/sdk/go/issue-779.go | 28 ++++ integration_tests/sdk/go/main.go | 148 +------------------ integration_tests/sdk/go/util.go | 151 ++++++++++++++++++++ server/src/proxy/grpc/write.rs | 11 +- 7 files changed, 216 insertions(+), 151 deletions(-) create mode 100644 integration_tests/sdk/go/autocreatetable.go create mode 100644 integration_tests/sdk/go/issue-779.go create mode 100644 integration_tests/sdk/go/util.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 40c256014f..49db8157fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -206,9 +206,9 @@ jobs: nohup ./target/debug/ceresdb-server -c docs/minimal.toml > /tmp/ceresdb-stdout.log & sleep 10 - name: Run Go SDK tests - working-directory: integration_tests/sdk/go + working-directory: integration_tests run: | - go run main.go + make run-go - name: Run Java SDK tests working-directory: integration_tests run: | diff --git a/integration_tests/Makefile b/integration_tests/Makefile index aa4d4baedf..0f6c1689a3 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -48,3 +48,6 @@ run: clean build kill-old-process run-java: java -version cd sdk/java && MAVEN_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn clean compile exec:java + +run-go: + cd sdk/go && go run . diff --git a/integration_tests/sdk/go/autocreatetable.go b/integration_tests/sdk/go/autocreatetable.go new file mode 100644 index 0000000000..78f99086cb --- /dev/null +++ b/integration_tests/sdk/go/autocreatetable.go @@ -0,0 +1,22 @@ +package main + +import ( + "context" + + "github.com/CeresDB/ceresdb-client-go/ceresdb" +) + +func checkAutoAddColumns(ctx context.Context, client ceresdb.Client) error { + timestampName := "timestamp" + err := dropTable(ctx, client) + if err != nil { + return err + } + + err = writeAndQuery(ctx, client, timestampName) + if err != nil { + return err + } + + return writeAndQueryWithNewColumns(ctx, client, timestampName) +} diff --git a/integration_tests/sdk/go/issue-779.go b/integration_tests/sdk/go/issue-779.go new file mode 100644 index 0000000000..82eed0e51e --- /dev/null +++ b/integration_tests/sdk/go/issue-779.go @@ -0,0 +1,28 @@ +package main + +import ( + "context" + + "github.com/CeresDB/ceresdb-client-go/ceresdb" +) + +func checkAutoAddColumnsWithCreateTable(ctx context.Context, client ceresdb.Client) error { + timestampName := "t" + + err := dropTable(ctx, client) + if err != nil { + return err + } + + err = createTable(ctx, client, timestampName) + if err != nil { + return err + } + + err = writeAndQuery(ctx, client, timestampName) + if err != nil { + return err + } + + return writeAndQueryWithNewColumns(ctx, client, timestampName) +} diff --git a/integration_tests/sdk/go/main.go b/integration_tests/sdk/go/main.go index a4d0263f5b..028fb91148 100644 --- a/integration_tests/sdk/go/main.go +++ b/integration_tests/sdk/go/main.go @@ -11,154 +11,12 @@ import ( var endpoint = "127.0.0.1:8831" -const table = "godemo" - func init() { if v := os.Getenv("CERESDB_ADDR"); v != "" { endpoint = v } } -func write(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error { - points := make([]ceresdb.Point, 0, 2) - for i := 0; i < 2; i++ { - builder := ceresdb.NewPointBuilder(table). - SetTimestamp(ts). - AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))). - AddField("value", ceresdb.NewInt64Value(int64(i))) - - if addNewColumn { - builder = builder.AddTag("new_tag", ceresdb.NewStringValue(fmt.Sprintf("new-tag-%d", i))). - AddField("new_field", ceresdb.NewInt64Value(int64(i))) - } - - point, err := builder.Build() - - if err != nil { - return err - } - points = append(points, point) - } - - resp, err := client.Write(ctx, ceresdb.WriteRequest{ - Points: points, - }) - if err != nil { - return err - } - - if resp.Success != 2 { - return fmt.Errorf("write failed, resp: %+v", resp) - } - - return nil -} - -func ensureRow(expectedVals []ceresdb.Value, actualRow []ceresdb.Column) error { - for i, expected := range expectedVals { - if actual := actualRow[i].Value(); actual != expected { - return fmt.Errorf("expected: %+v, actual: %+v", expected, actual) - } - } - return nil - -} - -func query(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error { - resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{ - Tables: []string{table}, - SQL: fmt.Sprintf("select * from %s where timestamp = %d order by name", table, ts), - }) - if err != nil { - return err - } - - if len(resp.Rows) != 2 { - return fmt.Errorf("expect 2 rows, current: %+v", len(resp.Rows)) - } - - row0 := []ceresdb.Value{ - ceresdb.NewUint64Value(4024844655630594205), - ceresdb.NewInt64Value(ts), - ceresdb.NewStringValue("tag-0"), - ceresdb.NewInt64Value(0)} - - row1 := []ceresdb.Value{ - ceresdb.NewUint64Value(14230010170561829440), - ceresdb.NewInt64Value(ts), - ceresdb.NewStringValue("tag-1"), - ceresdb.NewInt64Value(1), - } - - if addNewColumn { - row0[0] = ceresdb.NewUint64Value(8341999341185504339) - row1[0] = ceresdb.NewUint64Value(4452331151453582498) - row0 = append(row0, ceresdb.NewInt64Value(0), ceresdb.NewStringValue("new-tag-0")) - row1 = append(row1, ceresdb.NewInt64Value(1), ceresdb.NewStringValue("new-tag-1")) - } - - if err := ensureRow(row0, - resp.Rows[0].Columns()); err != nil { - return err - } - - return ensureRow(row1, resp.Rows[1].Columns()) -} - -func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error) { - resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{ - Tables: []string{table}, - SQL: sql, - }) - if err != nil { - return 0, err - } - - return resp.AffectedRows, nil -} - -func checkAutoCreateTable(ctx context.Context, client ceresdb.Client) error { - if _, err := ddl(ctx, client, "drop table if exists "+table); err != nil { - return err - } - - ts := currentMS() - if err := write(ctx, client, ts, false); err != nil { - return err - } - - if err := query(ctx, client, ts, false); err != nil { - return err - } - - return nil -} - -func checkAutoAddColumns(ctx context.Context, client ceresdb.Client) error { - ts := currentMS() - if err := write(ctx, client, ts, true); err != nil { - return err - } - - if err := query(ctx, client, ts, true); err != nil { - return err - } - - return nil -} - -func dropTable(ctx context.Context, client ceresdb.Client) error { - affected, err := ddl(ctx, client, "drop table "+table) - if err != nil { - return err - } - - if affected != 0 { - panic(fmt.Sprintf("drop table expected 0, actual is %d", affected)) - } - return nil -} - func main() { fmt.Printf("Begin test, endpoint %s...\n", endpoint) @@ -170,15 +28,11 @@ func main() { } ctx := context.TODO() - if err = checkAutoCreateTable(ctx, client); err != nil { - panic(err) - } - if err = checkAutoAddColumns(ctx, client); err != nil { panic(err) } - if err = dropTable(ctx, client); err != nil { + if err = checkAutoAddColumnsWithCreateTable(ctx, client); err != nil { panic(err) } diff --git a/integration_tests/sdk/go/util.go b/integration_tests/sdk/go/util.go new file mode 100644 index 0000000000..90faef1371 --- /dev/null +++ b/integration_tests/sdk/go/util.go @@ -0,0 +1,151 @@ +package main + +import ( + "context" + "fmt" + + "github.com/CeresDB/ceresdb-client-go/ceresdb" +) + +const table = "godemo" + +func createTable(ctx context.Context, client ceresdb.Client, timestampName string) error { + _, err := ddl(ctx, client, fmt.Sprintf("create table %s (%s timestamp not null, name string tag, value int64,TIMESTAMP KEY(t))", table, timestampName)) + return err +} + +func write(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error { + points := make([]ceresdb.Point, 0, 2) + for i := 0; i < 2; i++ { + builder := ceresdb.NewPointBuilder(table). + SetTimestamp(ts). + AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))). + AddField("value", ceresdb.NewInt64Value(int64(i))) + + if addNewColumn { + builder = builder.AddTag("new_tag", ceresdb.NewStringValue(fmt.Sprintf("new-tag-%d", i))). + AddField("new_field", ceresdb.NewInt64Value(int64(i))) + } + + point, err := builder.Build() + + if err != nil { + return err + } + points = append(points, point) + } + + resp, err := client.Write(ctx, ceresdb.WriteRequest{ + Points: points, + }) + if err != nil { + return err + } + + if resp.Success != 2 { + return fmt.Errorf("write failed, resp: %+v", resp) + } + + return nil +} + +func ensureRow(expectedVals []ceresdb.Value, actualRow []ceresdb.Column) error { + for i, expected := range expectedVals { + if actual := actualRow[i].Value(); actual != expected { + return fmt.Errorf("expected: %+v, actual: %+v", expected, actual) + } + } + return nil + +} + +func query(ctx context.Context, client ceresdb.Client, ts int64, timestampName string, addNewColumn bool) error { + resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{ + Tables: []string{table}, + SQL: fmt.Sprintf("select * from %s where %s = %d order by name", table, timestampName, ts), + }) + if err != nil { + return err + } + + if len(resp.Rows) != 2 { + return fmt.Errorf("expect 2 rows, current: %+v", len(resp.Rows)) + } + + row0 := []ceresdb.Value{ + ceresdb.NewUint64Value(4024844655630594205), + ceresdb.NewInt64Value(ts), + ceresdb.NewStringValue("tag-0"), + ceresdb.NewInt64Value(0)} + + row1 := []ceresdb.Value{ + ceresdb.NewUint64Value(14230010170561829440), + ceresdb.NewInt64Value(ts), + ceresdb.NewStringValue("tag-1"), + ceresdb.NewInt64Value(1), + } + + if addNewColumn { + row0[0] = ceresdb.NewUint64Value(8341999341185504339) + row1[0] = ceresdb.NewUint64Value(4452331151453582498) + row0 = append(row0, ceresdb.NewInt64Value(0), ceresdb.NewStringValue("new-tag-0")) + row1 = append(row1, ceresdb.NewInt64Value(1), ceresdb.NewStringValue("new-tag-1")) + } + + if err := ensureRow(row0, + resp.Rows[0].Columns()); err != nil { + return err + } + + return ensureRow(row1, resp.Rows[1].Columns()) +} + +func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error) { + resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{ + Tables: []string{table}, + SQL: sql, + }) + if err != nil { + return 0, err + } + + return resp.AffectedRows, nil +} + +func writeAndQuery(ctx context.Context, client ceresdb.Client, timestampName string) error { + ts := currentMS() + if err := write(ctx, client, ts, false); err != nil { + return err + } + + if err := query(ctx, client, ts, timestampName, false); err != nil { + return err + } + + return nil +} + +func writeAndQueryWithNewColumns(ctx context.Context, client ceresdb.Client, timestampName string) error { + ts := currentMS() + if err := write(ctx, client, ts, true); err != nil { + return err + } + + if err := query(ctx, client, ts, timestampName, true); err != nil { + return err + } + + return nil +} + +func dropTable(ctx context.Context, client ceresdb.Client) error { + affected, err := ddl(ctx, client, "drop table if exists "+table) + if err != nil { + return err + } + + if affected != 0 { + panic(fmt.Sprintf("drop table expected 0, actual is %d", affected)) + } + return nil +} diff --git a/server/src/proxy/grpc/write.rs b/server/src/proxy/grpc/write.rs index 673ee3977e..19d90272a8 100644 --- a/server/src/proxy/grpc/write.rs +++ b/server/src/proxy/grpc/write.rs @@ -555,10 +555,17 @@ fn find_new_columns( let columns = new_schema.columns(); let old_columns = schema.columns(); + // find new columns: + // 1. timestamp column can't be a new column; + // 2. column not in old schema is a new column. let new_columns = columns .iter() - .filter(|column| !old_columns.iter().any(|c| c.name == column.name)) - .cloned() + .enumerate() + .filter(|(idx, column)| { + *idx != new_schema.timestamp_index() + && !old_columns.iter().any(|c| c.name == column.name) + }) + .map(|(_, column)| column.clone()) .collect(); Ok(new_columns) }