diff --git a/internal/spanner/services/services.go b/internal/spanner/services/services.go index 92c8127..1b46724 100644 --- a/internal/spanner/services/services.go +++ b/internal/spanner/services/services.go @@ -1080,6 +1080,7 @@ func (s *SpannerService) CreateSpannerTable(ctx context.Context, parent string, if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) // Check if we have any PROTO columns and create the necessary proto bundles for _, column := range table.Schema.Columns { @@ -1121,9 +1122,24 @@ func (s *SpannerService) CreateSpannerTable(ctx context.Context, parent string, } // Create table - err = db.Table(tableId).Migrator().CreateTable(&structInstance) + _, err = utils.Retry(3, 5*time.Second, func() (interface{}, error) { + err = db.Table(tableId).Migrator().CreateTable(&structInstance) + if err != nil { + if status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Unavailable { + return nil, err + } + + return nil, utils.NonRetryableError(err) + } + + return nil, nil + }) if err != nil { - return nil, status.Errorf(codes.Internal, "Error creating table: %v", err) + if status.Code(err) == codes.FailedPrecondition && strings.Contains(err.Error(), fmt.Sprintf("Duplicate name in schema: %s", tableId)) { + return nil, status.Errorf(codes.AlreadyExists, "Table (%s) already exists", table.Name) + } + + return nil, err } if err := UpdateColumnMetadata(ctx, db, tableId, table.Schema.Columns); err != nil { @@ -1178,6 +1194,7 @@ func (s *SpannerService) GetSpannerTable(ctx context.Context, name string) (*Spa if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) columnTypes, err := db.Migrator().ColumnTypes(tableId) if err != nil { @@ -1454,6 +1471,7 @@ func (s *SpannerService) ListSpannerTables(ctx context.Context, parent string) ( if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) tableNames, err := db.Migrator().GetTables() if err != nil { @@ -1597,6 +1615,7 @@ func (s *SpannerService) UpdateSpannerTable(ctx context.Context, table *SpannerT if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) // Check if we have any PROTO columns and create the necessary proto bundles for _, column := range table.Schema.Columns { @@ -1778,6 +1797,7 @@ func (s *SpannerService) DeleteSpannerTable(ctx context.Context, name string) (* if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) // Drop table err = db.Migrator().DropTable(tableId) @@ -1905,6 +1925,7 @@ func (s *SpannerService) GetTableIamBinding(ctx context.Context, parent string, if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) var rows []*TablePermissionsRow res := db.Raw("SELECT * FROM INFORMATION_SCHEMA.TABLE_PRIVILEGES WHERE table_name = ? AND grantee = ?", tableId, role).Scan(&rows) @@ -2063,6 +2084,7 @@ func (s *SpannerService) CreateSpannerTableIndex(ctx context.Context, parent str if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) // Get parent table _, err = s.GetSpannerTable(ctx, parent) @@ -2123,6 +2145,7 @@ func (s *SpannerService) GetSpannerTableIndex(ctx context.Context, parent string if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) indexes, err := GetIndexes(db, tableId) if err != nil { @@ -2175,6 +2198,7 @@ func (s *SpannerService) ListSpannerTableIndices(ctx context.Context, parent str if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) indexes, err := GetIndexes(db, tableId) if err != nil { @@ -2229,6 +2253,7 @@ func (s *SpannerService) DeleteSpannerTableIndex(ctx context.Context, parent str if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) err = db.Migrator().DropIndex(tableId, indexName) if err != nil { @@ -2305,6 +2330,7 @@ func (s *SpannerService) CreateSpannerTableForeignKeyConstraint(ctx context.Cont if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) sqlStatement := fmt.Sprintf("ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s(%s)", tableId, constraint.Name, constraint.Column, constraint.ReferencedTable, constraint.ReferencedColumn) if constraint.OnDelete != SpannerTableForeignKeyConstraintActionUnspecified { @@ -2354,6 +2380,7 @@ func (s *SpannerService) GetSpannerTableForeignKeyConstraint(ctx context.Context if err != nil { return nil, status.Errorf(codes.Internal, "Error connecting to database: %v", err) } + db = db.WithContext(ctx) sqlStatement := ` SELECT