From c5a7d457e82c5a643b4460d345ecbb7258276bdb Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Tue, 27 Jun 2023 17:14:52 -0400 Subject: [PATCH] feat(bigtable): add change stream config to create and update table --- bigtable/admin.go | 71 ++++++++++++++++++++-------- bigtable/admin_test.go | 92 +++++++++++++++++++++++++++++++++--- bigtable/integration_test.go | 87 ++++++++++++++++++++++++++++++++++ 3 files changed, 224 insertions(+), 26 deletions(-) diff --git a/bigtable/admin.go b/bigtable/admin.go index c070826d4974..d62dea9288d1 100644 --- a/bigtable/admin.go +++ b/bigtable/admin.go @@ -41,6 +41,7 @@ import ( btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/types/known/durationpb" field_mask "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -211,6 +212,11 @@ func (ac *AdminClient) Tables(ctx context.Context) ([]string, error) { return names, nil } +// ChangeStreamRetention indicates how long bigtable should retain change data. +// Minimum is 1 day. Maximum is 7. nil to not change the retention period. 0 to +// disable change stream retention. +type ChangeStreamRetention optional.Duration + // DeletionProtection indicates whether the table is protected against data loss // i.e. when set to protected, deleting the table, the column families in the table, // and the instance containing the table would be prohibited. @@ -233,13 +239,14 @@ type TableConf struct { Families map[string]GCPolicy // DeletionProtection can be none, protected or unprotected // set to protected to make the table protected against data loss - DeletionProtection DeletionProtection + DeletionProtection DeletionProtection + ChangeStreamRetention ChangeStreamRetention } // CreateTable creates a new table in the instance. // This method may return before the table's creation is complete. func (ac *AdminClient) CreateTable(ctx context.Context, table string) error { - return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, DeletionProtection: None}) + return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, ChangeStreamRetention: nil, DeletionProtection: None}) } // CreatePresplitTable creates a new table in the instance. @@ -248,7 +255,7 @@ func (ac *AdminClient) CreateTable(ctx context.Context, table string) error { // spanning the key ranges: [, s1), [s1, s2), [s2, ). // This method may return before the table's creation is complete. func (ac *AdminClient) CreatePresplitTable(ctx context.Context, table string, splitKeys []string) error { - return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys, DeletionProtection: None}) + return ac.CreateTableFromConf(ctx, &TableConf{TableID: table, SplitKeys: splitKeys, ChangeStreamRetention: nil, DeletionProtection: None}) } // CreateTableFromConf creates a new table in the instance from the given configuration. @@ -269,6 +276,10 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf) } else if conf.DeletionProtection == Unprotected { tbl.DeletionProtection = false } + if conf.ChangeStreamRetention != nil && conf.ChangeStreamRetention.(time.Duration) != 0 { + tbl.ChangeStreamConfig = &btapb.ChangeStreamConfig{} + tbl.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.ChangeStreamRetention.(time.Duration)) + } if conf.Families != nil { tbl.ColumnFamilies = make(map[string]*btapb.ColumnFamily) for fam, policy := range conf.Families { @@ -307,12 +318,23 @@ type UpdateTableConf struct { tableID string // deletionProtection can be unset, true or false // set to true to make the table protected against data loss - deletionProtection DeletionProtection + deletionProtection DeletionProtection + changeStreamRetention ChangeStreamRetention +} + +// UpdateTableDisableChangeStream updates a table to disable change stream for table ID. +func (ac *AdminClient) UpdateTableDisableChangeStream(ctx context.Context, tableID string) error { + return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, time.Duration(0)}) +} + +// UpdateTableWithChangeStream updates a table to with the given table ID and change stream config. +func (ac *AdminClient) UpdateTableWithChangeStream(ctx context.Context, tableID string, changeStreamRetention ChangeStreamRetention) error { + return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, changeStreamRetention}) } // UpdateTableWithDeletionProtection updates a table with the given table ID and deletion protection parameter. func (ac *AdminClient) UpdateTableWithDeletionProtection(ctx context.Context, tableID string, deletionProtection DeletionProtection) error { - return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, deletionProtection}) + return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, deletionProtection, nil}) } // updateTableWithConf updates a table in the instance from the given configuration. @@ -323,30 +345,34 @@ func (ac *AdminClient) updateTableWithConf(ctx context.Context, conf *UpdateTabl return errors.New("TableID is required") } - if conf.deletionProtection == None { - return errors.New("deletion protection is required") - } - ctx = mergeOutgoingMetadata(ctx, ac.md) updateMask := &field_mask.FieldMask{ - Paths: []string{ - "deletion_protection", - }, - } - - deletionProtection := true - if conf.deletionProtection == Unprotected { - deletionProtection = false + Paths: []string{}, } prefix := ac.instancePrefix() req := &btapb.UpdateTableRequest{ Table: &btapb.Table{ - Name: prefix + "/tables/" + conf.tableID, - DeletionProtection: deletionProtection, + Name: prefix + "/tables/" + conf.tableID, }, UpdateMask: updateMask, } + + if conf.deletionProtection != None { + updateMask.Paths = append(updateMask.Paths, "deletion_protection") + req.Table.DeletionProtection = conf.deletionProtection != Unprotected + } + + if conf.changeStreamRetention != nil { + if conf.changeStreamRetention.(time.Duration) == time.Duration(0) { + updateMask.Paths = append(updateMask.Paths, "change_stream_config") + } else { + updateMask.Paths = append(updateMask.Paths, "change_stream_config.retention_period") + req.Table.ChangeStreamConfig = &btapb.ChangeStreamConfig{} + req.Table.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.changeStreamRetention.(time.Duration)) + } + } + lro, err := ac.tClient.UpdateTable(ctx, req) if err != nil { return fmt.Errorf("error from update: %w", err) @@ -394,7 +420,8 @@ type TableInfo struct { // DeletionProtection indicates whether the table is protected against data loss // DeletionProtection could be None depending on the table view // for example when using NAME_ONLY, the response does not contain DeletionProtection and the value should be None - DeletionProtection DeletionProtection + DeletionProtection DeletionProtection + ChangeStreamRetention ChangeStreamRetention } // FamilyInfo represents information about a column family. @@ -450,6 +477,10 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo, } else { ti.DeletionProtection = Unprotected } + if res.ChangeStreamConfig != nil && res.ChangeStreamConfig.RetentionPeriod != nil { + ti.ChangeStreamRetention = res.ChangeStreamConfig.RetentionPeriod.AsDuration() + } + return ti, nil } diff --git a/bigtable/admin_test.go b/bigtable/admin_test.go index 908e45fa0485..bd3225f2aff6 100644 --- a/bigtable/admin_test.go +++ b/bigtable/admin_test.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "testing" + "time" longrunning "cloud.google.com/go/longrunning/autogen/longrunningpb" "github.com/google/go-cmp/cmp" @@ -101,6 +102,45 @@ func TestTableAdmin_CreateTableFromConf_DeletionProtection_Unprotected(t *testin } } +func TestTableAdmin_CreateTableFromConf_ChangeStream_Valid(t *testing.T) { + mock := &mockTableAdminClock{} + c := setupTableClient(t, mock) + + changeStreamRetention, err := time.ParseDuration("24h") + if err != nil { + t.Fatalf("ChangeStreamRetention not valid: %v", err) + } + err = c.CreateTableFromConf(context.Background(), &TableConf{TableID: "My-table", ChangeStreamRetention: changeStreamRetention}) + if err != nil { + t.Fatalf("CreateTableFromConf failed: %v", err) + } + createTableReq := mock.createTableReq + if !cmp.Equal(createTableReq.TableId, "My-table") { + t.Errorf("Unexpected table ID: %v, expected %v", createTableReq.TableId, "My-table") + } + if !cmp.Equal(createTableReq.Table.ChangeStreamConfig.RetentionPeriod.Seconds, int64(changeStreamRetention.Seconds())) { + t.Errorf("Unexpected table change stream retention: %v, expected %v", createTableReq.Table.ChangeStreamConfig.RetentionPeriod.Seconds, changeStreamRetention.Seconds()) + } +} + +func TestTableAdmin_CreateTableFromConf_ChangeStream_Disable(t *testing.T) { + mock := &mockTableAdminClock{} + c := setupTableClient(t, mock) + + changeStreamRetention := time.Duration(0) + err := c.CreateTableFromConf(context.Background(), &TableConf{TableID: "My-table", ChangeStreamRetention: changeStreamRetention}) + if err != nil { + t.Fatalf("CreateTableFromConf failed: %v", err) + } + createTableReq := mock.createTableReq + if !cmp.Equal(createTableReq.TableId, "My-table") { + t.Errorf("Unexpected table ID: %v, expected %v", createTableReq.TableId, "My-table") + } + if createTableReq.Table.ChangeStreamConfig != nil { + t.Errorf("Unexpected table change stream retention: %v should be empty", createTableReq.Table.ChangeStreamConfig) + } +} + func TestTableAdmin_UpdateTableWithDeletionProtection(t *testing.T) { mock := &mockTableAdminClock{} c := setupTableClient(t, mock) @@ -118,6 +158,9 @@ func TestTableAdmin_UpdateTableWithDeletionProtection(t *testing.T) { if !cmp.Equal(updateTableReq.Table.DeletionProtection, true) { t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name) } + if !cmp.Equal(len(updateTableReq.UpdateMask.Paths), 1) { + t.Errorf("UpdateTableRequest does not match, UpdateMask has length of %d, expected 1", len(updateTableReq.UpdateMask.Paths)) + } if !cmp.Equal(updateTableReq.UpdateMask.Paths[0], "deletion_protection") { t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name) } @@ -148,16 +191,53 @@ func TestTableAdmin_UpdateTable_TableID_NotProvided(t *testing.T) { } } -func TestTableAdmin_UpdateTable_DeletionProtection_NotProvided(t *testing.T) { +func TestTableAdmin_UpdateTableWithChangeStreamRetention(t *testing.T) { mock := &mockTableAdminClock{} c := setupTableClient(t, mock) - deletionProtection := None + changeStreamRetention, err := time.ParseDuration("24h") + if err != nil { + t.Fatalf("ChangeStreamRetention not valid: %v", err) + } - // Check if the update fails when deletion protection is not provided - err := c.UpdateTableWithDeletionProtection(context.Background(), "My-table", deletionProtection) + err = c.UpdateTableWithChangeStream(context.Background(), "My-table", changeStreamRetention) + if err != nil { + t.Fatalf("UpdateTableWithChangeStream failed: %v", err) + } + updateTableReq := mock.updateTableReq + if !cmp.Equal(updateTableReq.Table.Name, "projects/my-cool-project/instances/my-cool-instance/tables/My-table") { + t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name) + } + if !cmp.Equal(updateTableReq.Table.ChangeStreamConfig.RetentionPeriod.Seconds, int64(changeStreamRetention.Seconds())) { + t.Errorf("UpdateTableRequest does not match, ChangeStreamConfig: %v", updateTableReq.Table.ChangeStreamConfig) + } + if !cmp.Equal(len(updateTableReq.UpdateMask.Paths), 1) { + t.Errorf("UpdateTableRequest does not match, UpdateMask has length of %d, expected 1", len(updateTableReq.UpdateMask.Paths)) + } + if !cmp.Equal(updateTableReq.UpdateMask.Paths[0], "change_stream_config.retention_period") { + t.Errorf("UpdateTableRequest does not match, UpdateMask: %v", updateTableReq.UpdateMask.Paths[0]) + } +} - if fmt.Sprint(err) != "deletion protection is required" { - t.Fatalf("UpdateTable failed: %v", err) +func TestTableAdmin_UpdateTableDisableChangeStream(t *testing.T) { + mock := &mockTableAdminClock{} + c := setupTableClient(t, mock) + + err := c.UpdateTableDisableChangeStream(context.Background(), "My-table") + if err != nil { + t.Fatalf("UpdateTableDisableChangeStream failed: %v", err) + } + updateTableReq := mock.updateTableReq + if !cmp.Equal(updateTableReq.Table.Name, "projects/my-cool-project/instances/my-cool-instance/tables/My-table") { + t.Errorf("UpdateTableRequest does not match, TableID: %v", updateTableReq.Table.Name) + } + if updateTableReq.Table.ChangeStreamConfig != nil { + t.Errorf("UpdateTableRequest does not match, ChangeStreamConfig: %v should be empty", updateTableReq.Table.ChangeStreamConfig) + } + if !cmp.Equal(len(updateTableReq.UpdateMask.Paths), 1) { + t.Errorf("UpdateTableRequest does not match, UpdateMask has length of %d, expected 1", len(updateTableReq.UpdateMask.Paths)) + } + if !cmp.Equal(updateTableReq.UpdateMask.Paths[0], "change_stream_config") { + t.Errorf("UpdateTableRequest does not match, UpdateMask: %v", updateTableReq.UpdateMask.Paths[0]) } } diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 9dd2eb9a1d83..a801c268c669 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -1535,6 +1535,93 @@ func TestIntegration_TableDeletionProtection(t *testing.T) { } } +// testing if change stream works properly i.e. can create table with change +// stream and disable change stream on existing table and delete fails if change +// stream is enabled. +func TestIntegration_EnableChangeStream(t *testing.T) { + testEnv, err := NewIntegrationEnv() + if err != nil { + t.Fatalf("IntegrationEnv: %v", err) + } + defer testEnv.Close() + + timeout := 2 * time.Second + if testEnv.Config().UseProd { + timeout = 5 * time.Minute + } + ctx, _ := context.WithTimeout(context.Background(), timeout) + + adminClient, err := testEnv.NewAdminClient() + if err != nil { + t.Fatalf("NewAdminClient: %v", err) + } + defer adminClient.Close() + + changeStreamRetention, err := time.ParseDuration("24h") + if err != nil { + t.Fatalf("ChangeStreamRetention not valid: %v", err) + } + + tableConf := TableConf{ + TableID: myTableName, + Families: map[string]GCPolicy{ + "fam1": MaxVersionsPolicy(1), + "fam2": MaxVersionsPolicy(2), + }, + ChangeStreamRetention: changeStreamRetention, + } + + if err := adminClient.CreateTableFromConf(ctx, &tableConf); err != nil { + t.Fatalf("Create table from config: %v", err) + } + + table, err := adminClient.TableInfo(ctx, myTableName) + if err != nil { + t.Fatalf("Getting table info: %v", err) + } + + if table.ChangeStreamRetention != changeStreamRetention { + t.Errorf("Expect table change stream to be enabled for table: %v has info: %v", myTableName, table) + } + + // Update retention + changeStreamRetention, err = time.ParseDuration("70h") + if err != nil { + t.Fatalf("ChangeStreamRetention not valid: %v", err) + } + + if err := adminClient.UpdateTableWithChangeStream(ctx, myTableName, changeStreamRetention); err != nil { + t.Fatalf("Update table from config: %v", err) + } + + table, err = adminClient.TableInfo(ctx, myTableName) + if err != nil { + t.Fatalf("Getting table info: %v", err) + } + + if table.ChangeStreamRetention != changeStreamRetention { + t.Errorf("Expect table change stream to be enabled for table: %v has info: %v", myTableName, table) + } + + // Disable change stream + if err := adminClient.UpdateTableDisableChangeStream(ctx, myTableName); err != nil { + t.Fatalf("Update table from config: %v", err) + } + + table, err = adminClient.TableInfo(ctx, myTableName) + if err != nil { + t.Fatalf("Getting table info: %v", err) + } + + if table.ChangeStreamRetention != nil { + t.Errorf("Expect table change stream to be disabled for table: %v has info: %v", myTableName, table) + } + + if err = adminClient.DeleteTable(ctx, tableConf.TableID); err != nil { + t.Errorf("Deleting the table failed when change stream is disabled: %v", err) + } +} + func TestIntegration_Admin(t *testing.T) { testEnv, err := NewIntegrationEnv() if err != nil {