Skip to content

Commit

Permalink
Implement config store for MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Sep 19, 2023
1 parent 45ca74f commit 245a21b
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 36 deletions.
45 changes: 15 additions & 30 deletions common/persistence/persistence-tests/configStorePersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ func (s *ConfigStorePersistenceSuite) TestFetchFromEmptyTable() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

s.DefaultTestCluster.TearDownTestDatabase()
s.DefaultTestCluster.SetupTestDatabase()

snapshot, err := s.FetchDynamicConfig(ctx)
snapshot, err := s.FetchDynamicConfig(ctx, 0)
s.Nil(snapshot)
s.NotNil(err)
}
Expand All @@ -82,15 +79,12 @@ func (s *ConfigStorePersistenceSuite) TestUpdateSimpleSuccess() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

s.DefaultTestCluster.TearDownTestDatabase()
s.DefaultTestCluster.SetupTestDatabase()

snapshot := generateRandomSnapshot(1)
err := s.UpdateDynamicConfig(ctx, snapshot)
err := s.UpdateDynamicConfig(ctx, snapshot, 1)
s.Nil(err)

retSnapshot, err := s.FetchDynamicConfig(ctx)
s.NotNil(snapshot)
retSnapshot, err := s.FetchDynamicConfig(ctx, 1)
s.NotNil(retSnapshot)
s.Nil(err)
s.Equal(snapshot.Version, retSnapshot.Version)
s.Equal(snapshot.Values.Entries[0].Name, retSnapshot.Values.Entries[0].Name)
Expand All @@ -100,14 +94,11 @@ func (s *ConfigStorePersistenceSuite) TestUpdateVersionCollisionFailure() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

s.DefaultTestCluster.TearDownTestDatabase()
s.DefaultTestCluster.SetupTestDatabase()

snapshot := generateRandomSnapshot(1)
err := s.UpdateDynamicConfig(ctx, snapshot)
err := s.UpdateDynamicConfig(ctx, snapshot, 2)
s.Nil(err)

err = s.UpdateDynamicConfig(ctx, snapshot)
err = s.UpdateDynamicConfig(ctx, snapshot, 2)
var condErr *p.ConditionFailedError
s.True(errors.As(err, &condErr))
}
Expand All @@ -116,32 +107,26 @@ func (s *ConfigStorePersistenceSuite) TestUpdateIncrementalVersionSuccess() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

s.DefaultTestCluster.TearDownTestDatabase()
s.DefaultTestCluster.SetupTestDatabase()

snapshot2 := generateRandomSnapshot(2)
err := s.UpdateDynamicConfig(ctx, snapshot2)
err := s.UpdateDynamicConfig(ctx, snapshot2, 3)
s.Nil(err)
snapshot3 := generateRandomSnapshot(3)
err = s.UpdateDynamicConfig(ctx, snapshot3)
err = s.UpdateDynamicConfig(ctx, snapshot3, 3)
s.Nil(err)
}

func (s *ConfigStorePersistenceSuite) TestFetchLatestVersionSuccess() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

s.DefaultTestCluster.TearDownTestDatabase()
s.DefaultTestCluster.SetupTestDatabase()

snapshot2 := generateRandomSnapshot(2)
err := s.UpdateDynamicConfig(ctx, snapshot2)
err := s.UpdateDynamicConfig(ctx, snapshot2, 4)
s.Nil(err)
snapshot3 := generateRandomSnapshot(3)
err = s.UpdateDynamicConfig(ctx, snapshot3)
err = s.UpdateDynamicConfig(ctx, snapshot3, 4)
s.Nil(err)

snapshot, err := s.FetchDynamicConfig(ctx)
snapshot, err := s.FetchDynamicConfig(ctx, 4)
s.NotNil(snapshot)
s.Nil(err)
s.Equal(int64(3), snapshot.Version)
Expand Down Expand Up @@ -174,8 +159,8 @@ func generateRandomSnapshot(version int64) *p.DynamicConfigSnapshot {
}
}

func (s *ConfigStorePersistenceSuite) FetchDynamicConfig(ctx context.Context) (*p.DynamicConfigSnapshot, error) {
response, err := s.ConfigStoreManager.FetchDynamicConfig(ctx, p.DynamicConfig)
func (s *ConfigStorePersistenceSuite) FetchDynamicConfig(ctx context.Context, rowType int) (*p.DynamicConfigSnapshot, error) {
response, err := s.ConfigStoreManager.FetchDynamicConfig(ctx, p.ConfigType(rowType))
if err != nil {
return nil, err
}
Expand All @@ -185,6 +170,6 @@ func (s *ConfigStorePersistenceSuite) FetchDynamicConfig(ctx context.Context) (*
return response.Snapshot, nil
}

func (s *ConfigStorePersistenceSuite) UpdateDynamicConfig(ctx context.Context, snapshot *p.DynamicConfigSnapshot) error {
return s.ConfigStoreManager.UpdateDynamicConfig(ctx, &p.UpdateDynamicConfigRequest{Snapshot: snapshot}, p.DynamicConfig)
func (s *ConfigStorePersistenceSuite) UpdateDynamicConfig(ctx context.Context, snapshot *p.DynamicConfigSnapshot, rowType int) error {
return s.ConfigStoreManager.UpdateDynamicConfig(ctx, &p.UpdateDynamicConfigRequest{Snapshot: snapshot}, p.ConfigType(rowType))
}
3 changes: 1 addition & 2 deletions common/persistence/sql/sqlConfigStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ func (m *sqlConfigStore) FetchConfig(ctx context.Context, configType persistence
func (m *sqlConfigStore) UpdateConfig(ctx context.Context, value *persistence.InternalConfigStoreEntry) error {
err := m.db.InsertConfig(ctx, value)
if err != nil {
if _, ok := err.(*nosqlplugin.ConditionFailure); ok {
if m.db.IsDupEntryError(err) {
return &persistence.ConditionFailedError{Msg: fmt.Sprintf("Version %v already exists. Condition Failed", value.Version)}
}
return convertCommonErrors(m.db, "UpdateConfig", "", err)
Expand Down
9 changes: 9 additions & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,15 @@ type (
Data []byte
}

// ClusterConfigRow represents a row in cluster_config table
ClusterConfigRow struct {
RowType int
Version int64
Timestamp time.Time
Data []byte
DataEncoding string
}

// tableCRUD defines the API for interacting with the database tables
tableCRUD interface {
InsertIntoDomain(ctx context.Context, rows *DomainRow) (sql.Result, error)
Expand Down
28 changes: 25 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,37 @@ package mysql

import (
"context"
"fmt"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

const (
_selectLatestConfigQuery = "SELECT row_type, version, timestamp, data, data_encoding FROM cluster_config WHERE row_type = ? ORDER BY version LIMIT 1;"

_insertConfigQuery = "INSERT INTO cluster_config (row_type, version, timestamp, data, data_encoding) VALUES(?, ?, ?, ?, ?)"
)

func (mdb *db) InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error {
return fmt.Errorf("not implemented")
_, err := mdb.driver.ExecContext(ctx, sqlplugin.DbDefaultShard, _insertConfigQuery, row.RowType, -1*row.Version, mdb.converter.ToMySQLDateTime(row.Timestamp), row.Values.Data, row.Values.Encoding)
return err
}

func (mdb *db) SelectLatestConfig(ctx context.Context, rowType int) (*persistence.InternalConfigStoreEntry, error) {
return nil, fmt.Errorf("not implemented")
var row sqlplugin.ClusterConfigRow
err := mdb.driver.GetContext(ctx, sqlplugin.DbDefaultShard, &row, _selectLatestConfigQuery, rowType)
if err != nil {
return nil, err
}
row.Version *= -1
return &persistence.InternalConfigStoreEntry{
RowType: row.RowType,
Version: row.Version,
Timestamp: mdb.converter.FromMySQLDateTime(row.Timestamp),
Values: &persistence.DataBlob{
Data: row.Data,
Encoding: common.EncodingType(row.DataEncoding),
},
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,11 @@ func TestMySQLQueuePersistence(t *testing.T) {
s.TestBase.Setup()
suite.Run(t, s)
}

func TestMySQLConfigPersistence(t *testing.T) {
testflags.RequireMySQL(t)
s := new(pt.ConfigStorePersistenceSuite)
s.TestBase = pt.NewTestBaseWithSQL(GetTestClusterOption())
s.TestBase.Setup()
suite.Run(t, s)
}
10 changes: 10 additions & 0 deletions schema/mysql/v8/cadence/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,13 @@ CREATE TABLE queue_metadata (
data MEDIUMBLOB NOT NULL,
PRIMARY KEY(queue_type)
);

CREATE TABLE cluster_config (
row_type INT NOT NULL,
version BIGINT NOT NULL,
--
timestamp DATETIME(6) NOT NULL,
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(16) NOT NULL,
PRIMARY KEY (row_type, version)
);
9 changes: 9 additions & 0 deletions schema/mysql/v8/cadence/versioned/v0.6/cluster_config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE cluster_config (
row_type INT NOT NULL,
version BIGINT NOT NULL,
--
timestamp DATETIME(6) NOT NULL,
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(16) NOT NULL,
PRIMARY KEY (row_type, version)
);
8 changes: 8 additions & 0 deletions schema/mysql/v8/cadence/versioned/v0.6/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.6",
"MinCompatibleVersion": "0.6",
"Description": "create cluster config table",
"SchemaUpdateCqlFiles": [
"cluster_config.sql"
]
}
2 changes: 1 addition & 1 deletion tools/common/schema/updatetask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *UpdateTaskTestSuite) TestReadSchemaDirFromEmbeddings() {
s.NoError(err)
ans, err = readSchemaDir(fsys, "0.3", "")
s.NoError(err)
s.Equal([]string{"v0.4", "v0.5"}, ans)
s.Equal([]string{"v0.4", "v0.5", "v0.6"}, ans)

fsys, err = fs.Sub(mysql.SchemaFS, "v8/visibility/versioned")
s.NoError(err)
Expand Down

0 comments on commit 245a21b

Please sign in to comment.