Skip to content

Commit

Permalink
Implement config store for PostgresSQL (uber#5405)
Browse files Browse the repository at this point in the history
* Implement config store for PostgresSQL

* Fix config store client
  • Loading branch information
Shaddoll authored Sep 26, 2023
1 parent b624c35 commit 2df19da
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 11 deletions.
9 changes: 3 additions & 6 deletions common/dynamicconfig/configstore/config_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,16 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig,
return nil, errors.New("persistence cfg is nil")
}

store, ok := persistenceCfg.DataStores[persistenceCfg.DefaultStore]
ds, ok := persistenceCfg.DataStores[persistenceCfg.DefaultStore]
if !ok {
return nil, errors.New("default persistence config missing")
}

if store.NoSQL == nil && store.ShardedNoSQL == nil {
return nil, errors.New("NoSQL and ShardedNoSQL structs are nil")
}

if err := validateClientConfig(clientCfg); err != nil {
logger.Warn("invalid ClientConfig values, using default values")
clientCfg = defaultConfigValues
}

ds := persistenceCfg.DataStores[persistenceCfg.DefaultStore]
client, err := newConfigStoreClient(clientCfg, &ds, logger, configType)
if err != nil {
return nil, err
Expand Down Expand Up @@ -138,6 +133,7 @@ func newConfigStoreClient(
}
store, err = sql.NewSQLConfigStore(db, logger, nil)
default:
return nil, errors.New("both NoSQL and SQL store are not provided")
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -376,6 +372,7 @@ func (csc *configStoreClient) Stop() {
return
}
close(csc.doneCh)
csc.configStoreManager.Close()
}

func (csc *configStoreClient) Start() {
Expand Down
28 changes: 25 additions & 3 deletions common/persistence/sql/sqlplugin/postgres/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,37 @@ package postgres

import (
"context"
"fmt"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

const (
_selectLatestConfigQuery = "SELECT row_type, version, timestamp, data, data_encoding FROM cluster_config WHERE row_type = $1 ORDER BY version LIMIT 1;"

_insertConfigQuery = "INSERT INTO cluster_config (row_type, version, timestamp, data, data_encoding) VALUES($1, $2, $3, $4, $5)"
)

func (pdb *db) InsertConfig(ctx context.Context, row *persistence.InternalConfigStoreEntry) error {
return fmt.Errorf("not implemented")
_, err := pdb.driver.ExecContext(ctx, sqlplugin.DbDefaultShard, _insertConfigQuery, row.RowType, -1*row.Version, pdb.converter.ToPostgresDateTime(row.Timestamp), row.Values.Data, row.Values.Encoding)
return err
}

func (pdb *db) SelectLatestConfig(ctx context.Context, rowType int) (*persistence.InternalConfigStoreEntry, error) {
return nil, fmt.Errorf("not implemented")
var row sqlplugin.ClusterConfigRow
err := pdb.driver.GetContext(ctx, sqlplugin.DbDefaultShard, &row, _selectLatestConfigQuery, rowType)
if err != nil {
return nil, err
}
row.Version *= -1
return &persistence.InternalConfigStoreEntry{
RowType: row.RowType,
Version: row.Version,
Timestamp: pdb.converter.FromPostgresDateTime(row.Timestamp),
Values: &persistence.DataBlob{
Data: row.Data,
Encoding: common.EncodingType(row.DataEncoding),
},
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,11 @@ FAIL: TestPostgresSQLQueuePersistence/TestDomainReplicationQueue (0.26s)
// s.TestBase.Setup()
// suite.Run(t, s)
//}

func TestPostgresSQLConfigPersistence(t *testing.T) {
testflags.RequirePostgres(t)
s := new(pt.ConfigStorePersistenceSuite)
s.TestBase = pt.NewTestBaseWithSQL(GetTestClusterOption())
s.TestBase.Setup()
suite.Run(t, s)
}
10 changes: 10 additions & 0 deletions schema/postgres/cadence/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,13 @@ CREATE TABLE queue_metadata (
data BYTEA NOT NULL,
PRIMARY KEY(queue_type)
);

CREATE TABLE cluster_config (
row_type INTEGER NOT NULL,
version BIGINT NOT NULL,
--
timestamp TIMESTAMP NOT NULL,
data BYTEA NOT NULL,
data_encoding VARCHAR(16) NOT NULL,
PRIMARY KEY (row_type, version)
);
9 changes: 9 additions & 0 deletions schema/postgres/cadence/versioned/v0.5/cluster_config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE cluster_config (
row_type INTEGER NOT NULL,
version BIGINT NOT NULL,
--
timestamp TIMESTAMP NOT NULL,
data BYTEA NOT NULL,
data_encoding VARCHAR(16) NOT NULL,
PRIMARY KEY (row_type, version)
);
8 changes: 8 additions & 0 deletions schema/postgres/cadence/versioned/v0.5/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.5",
"MinCompatibleVersion": "0.5",
"Description": "create cluster config table",
"SchemaUpdateCqlFiles": [
"cluster_config.sql"
]
}
2 changes: 1 addition & 1 deletion schema/postgres/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package postgres

// Version is the Postgres database release version
// Cadence supports both MySQL and Postgres officially, so upgrade should be perform for both MySQL and Postgres
const Version = "0.4"
const Version = "0.5"

// VisibilityVersion is the Postgres visibility database release version
// Cadence supports both MySQL and Postgres officially, so upgrade should be perform for both MySQL and Postgres
Expand Down
2 changes: 1 addition & 1 deletion tools/common/schema/updatetask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *UpdateTaskTestSuite) TestReadSchemaDirFromEmbeddings() {
s.NoError(err)
ans, err = readSchemaDir(fsys, "0.3", "")
s.NoError(err)
s.Equal([]string{"v0.4"}, ans)
s.Equal([]string{"v0.4", "v0.5"}, ans)

fsys, err = fs.Sub(postgres.SchemaFS, "visibility/versioned")
s.NoError(err)
Expand Down

0 comments on commit 2df19da

Please sign in to comment.