Skip to content

Commit

Permalink
Merge pull request #101 from nktks/fix/parse-change-stream
Browse files Browse the repository at this point in the history
modify database ApplyDDL to skip CREATE CHANGE STREAM
  • Loading branch information
kazegusuri authored Feb 13, 2024
2 parents 2f2af35 + ed617e9 commit a33cef5
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ You can also operate databases or instances as Cloud Spanner supports. Please al
* Long running operations
* Replace
* wrong behavior on conflict
* [Change Stream Schema](https://cloud.google.com/spanner/docs/change-streams)
* only can parse in [Database operations](#Database-operations) with `schema` flag.

## Implementation

Expand Down
3 changes: 3 additions & 0 deletions fake/testdata/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ CREATE TABLE ArrayTypes (
ArrayFloat ARRAY<FLOAT64>,
ArrayDate ARRAY<DATE>,
) PRIMARY KEY(Id);

CREATE CHANGE STREAM EverythingStream
FOR ALL;
3 changes: 3 additions & 0 deletions server/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ func (d *database) ApplyDDL(ctx context.Context, ddl ast.DDL) error {
case *ast.AlterTable:
return status.Errorf(codes.Unimplemented, "Alter Table is not supported yet")

case *ast.CreateChangeStream:
// skip CreateChangeStream
return nil
default:
return status.Errorf(codes.Unknown, "unknown DDL statement: %v", val)
}
Expand Down
8 changes: 7 additions & 1 deletion server/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

var (
allSchema = []string{schemaSimple, schemaInterleaved, schemaInterleavedCascade, schemaInterleavedNoAction, schemaForeignCascade, schemaForeignNoAction, schemaCompositePrimaryKeys, schemaFullTypes, schemaArrayTypes, schemaJoinA, schemaJoinB, schemaFromTable, schemaGeneratedValues, schemaGeneratedColumn, schemaDefaultValues}
allSchema = []string{schemaSimple, schemaInterleaved, schemaInterleavedCascade, schemaInterleavedNoAction, schemaForeignCascade, schemaForeignNoAction, schemaCompositePrimaryKeys, schemaFullTypes, schemaArrayTypes, schemaJoinA, schemaJoinB, schemaFromTable, schemaGeneratedValues, schemaGeneratedColumn, schemaDefaultValues, schemaChangeStream}
schemaSimple = `CREATE TABLE Simple (
Id INT64 NOT NULL,
Value STRING(MAX) NOT NULL,
Expand Down Expand Up @@ -195,6 +195,9 @@ CREATE INDEX FullTypesByTimestamp ON FullTypes(FTTimestamp);
) PRIMARY KEY(Id);
`

schemaChangeStream = `CREATE CHANGE STREAM EverythingStream
FOR ALL;
`
compositePrimaryKeysKeys = []string{
"Id", "PKey1", "PKey2", "Error", "X", "Y", "Z",
}
Expand Down Expand Up @@ -639,6 +642,9 @@ func TestApplyDDL(t *testing.T) {
{
ddl: schemaDefaultValues,
},
{
ddl: schemaChangeStream,
},
}

for _, tt := range table {
Expand Down

0 comments on commit a33cef5

Please sign in to comment.