Skip to content

Commit

Permalink
Persistence refactoring to be pluggable for any database (uber#2836)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Dec 5, 2019
1 parent ac457cd commit a76b957
Show file tree
Hide file tree
Showing 54 changed files with 1,240 additions and 1,224 deletions.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ install-schema-mysql: bins
./cadence-sql-tool --ep 127.0.0.1 --db cadence_visibility update-schema -d ./schema/mysql/v57/visibility/versioned

install-schema-postgres: bins
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres create --db cadence
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence setup -v 0.0
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence update-schema -d ./schema/postgres/cadence/versioned
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres create --db cadence_visibility
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence_visibility setup-schema -v 0.0
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence_visibility update-schema -d ./schema/postgres/visibility/versioned
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres create --db cadence
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres --db cadence setup -v 0.0
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres --db cadence update-schema -d ./schema/postgres/cadence/versioned
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres create --db cadence_visibility
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres --db cadence_visibility setup-schema -v 0.0
./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --pl postgres --db cadence_visibility update-schema -d ./schema/postgres/visibility/versioned

start: bins
./cadence-server start
Expand Down
7 changes: 3 additions & 4 deletions cmd/server/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ import (
"strings"
"syscall"

"github.com/urfave/cli"

"github.com/uber/cadence/common"
_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/tools/cassandra"
"github.com/uber/cadence/tools/sql"
_ "github.com/uber/cadence/tools/sql-extensions/mysql" // needed to load mysql extensions
_ "github.com/uber/cadence/tools/sql-extensions/postgres" // needed to load postgres extensions

"github.com/urfave/cli"
)

// validServices is the list of all valid cadence services
Expand Down
3 changes: 1 addition & 2 deletions cmd/tools/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ package main
import (
"os"

_ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin
"github.com/uber/cadence/tools/sql"
_ "github.com/uber/cadence/tools/sql-extensions/mysql" // needed to load mysql extensions
_ "github.com/uber/cadence/tools/sql-extensions/postgres" // needed to load postgres extensions
)

func main() {
Expand Down
17 changes: 9 additions & 8 deletions common/persistence/cassandra/cassandraPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import (
)

const (
testUser = ""
testPassword = ""
testSchemaDir = "schema/cassandra/"
)

Expand All @@ -52,7 +50,7 @@ type TestCluster struct {
}

// NewTestCluster returns a new cassandra test cluster
func NewTestCluster(keyspace string, port int, schemaDir string) *TestCluster {
func NewTestCluster(keyspace, username, password, host string, port int, schemaDir string) *TestCluster {
var result TestCluster
result.keyspace = keyspace
if port == 0 {
Expand All @@ -61,11 +59,14 @@ func NewTestCluster(keyspace string, port int, schemaDir string) *TestCluster {
if schemaDir == "" {
schemaDir = testSchemaDir
}
if host == "" {
host = environment.GetCassandraAddress()
}
result.schemaDir = schemaDir
result.cfg = config.Cassandra{
User: testUser,
Password: testPassword,
Hosts: environment.GetCassandraAddress(),
User: username,
Password: password,
Hosts: host,
Port: port,
MaxConns: 2,
Keyspace: keyspace,
Expand Down Expand Up @@ -120,8 +121,8 @@ func (s *TestCluster) CreateSession() {
s.cluster = cassandra.NewCassandraCluster(config.Cassandra{
Hosts: s.cfg.Hosts,
Port: s.cfg.Port,
User: testUser,
Password: testPassword,
User: s.cfg.User,
Password: s.cfg.Password,
})
s.cluster.Consistency = gocql.Consistency(1)
s.cluster.Keyspace = "system"
Expand Down
18 changes: 14 additions & 4 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ type (

// TestBaseOptions options to configure workflow test base.
TestBaseOptions struct {
SQLDBPluginName string
DBName string
DBUsername string
DBPassword string
DBHost string
DBPort int `yaml:"-"`
StoreType string `yaml:"-"`
SchemaDir string `yaml:"-"`
Expand Down Expand Up @@ -85,7 +89,6 @@ type (
DatabaseName() string
SetupTestDatabase()
TearDownTestDatabase()
CreateSession()
DropDatabase()
Config() config.Persistence
LoadSchema(fileNames []string, schemaDir string)
Expand All @@ -107,16 +110,16 @@ func NewTestBaseWithCassandra(options *TestBaseOptions) TestBase {
if options.DBName == "" {
options.DBName = "test_" + GenerateRandomDBName(10)
}
testCluster := cassandra.NewTestCluster(options.DBName, options.DBPort, options.SchemaDir)
testCluster := cassandra.NewTestCluster(options.DBName, options.DBUsername, options.DBPassword, options.DBHost, options.DBPort, options.SchemaDir)
return newTestBase(options, testCluster)
}

// NewTestBaseWithSQL returns a new persistence test base backed by SQL
func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
if options.DBName == "" {
options.DBName = GenerateRandomDBName(10)
options.DBName = "test_" + GenerateRandomDBName(10)
}
testCluster := sql.NewTestCluster(options.DBName, options.DBPort, options.SchemaDir)
testCluster := sql.NewTestCluster(options.SQLDBPluginName, options.DBName, options.DBUsername, options.DBPassword, options.DBHost, options.DBPort, options.SchemaDir)
return newTestBase(options, testCluster)
}

Expand Down Expand Up @@ -1261,6 +1264,13 @@ func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID

// TearDownWorkflowStore to cleanup
func (s *TestBase) TearDownWorkflowStore() {
s.ExecutionMgrFactory.Close()
// TODO VisibilityMgr/Store is created with a separated code path, this is incorrect and may cause leaking connection
// And Postgres requires all connection to be closed before dropping a database
// https://github.com/uber/cadence/issues/2854
// Remove the below line after the issue is fix
s.VisibilityMgr.Close()

s.DefaultTestCluster.TearDownTestDatabase()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
s.Equal(1, len(resp.Executions))
s.assertOpenExecutionEquals(startReq1, resp.Executions[0])

// TODO: See if it is possible in Cassandra to not return non empty token which is going to return empty result
if s.ExecutionManager.GetName() == "cassandra" {
// It is possible to not return non empty token which is going to return empty result
if len(resp.NextPageToken) != 0 {
// Now should get empty result by using token
resp, err4 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
Expand All @@ -249,8 +249,6 @@ func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
})
s.Nil(err4)
s.Equal(0, len(resp.Executions))
} else {
s.Equal(0, len(resp.NextPageToken))
}
}

Expand Down Expand Up @@ -684,7 +682,7 @@ func (s *VisibilityPersistenceSuite) assertClosedExecutionEquals(

func (s *VisibilityPersistenceSuite) assertOpenExecutionEquals(
req *p.RecordWorkflowExecutionStartedRequest, resp *gen.WorkflowExecutionInfo) {
s.Equal(req.Execution.RunId, resp.Execution.RunId)
s.Equal(req.Execution.GetRunId(), resp.Execution.GetRunId())
s.Equal(req.Execution.WorkflowId, resp.Execution.WorkflowId)
s.Equal(req.WorkflowTypeName, resp.GetType().GetName())
s.Equal(s.nanosToMillis(req.StartTimestamp), s.nanosToMillis(resp.GetStartTime()))
Expand Down
19 changes: 4 additions & 15 deletions common/persistence/sql/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,20 @@ import (
"encoding/gob"
"fmt"

"github.com/go-sql-driver/mysql"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

// TODO: Rename all SQL Managers to Stores
type sqlStore struct {
db sqldb.Interface
db sqlplugin.DB
logger log.Logger
}

func (m *sqlStore) GetName() string {
return m.db.DriverName()
return m.db.PluginName()
}

func (m *sqlStore) Close() {
Expand All @@ -50,7 +48,7 @@ func (m *sqlStore) Close() {
}
}

func (m *sqlStore) txExecute(operation string, f func(tx sqldb.Tx) error) error {
func (m *sqlStore) txExecute(operation string, f func(tx sqlplugin.Tx) error) error {
tx, err := m.db.BeginTx()
if err != nil {
return &workflow.InternalServiceError{
Expand Down Expand Up @@ -82,15 +80,6 @@ func (m *sqlStore) txExecute(operation string, f func(tx sqldb.Tx) error) error
return nil
}

// ErrDupEntry MySQL Error 1062 indicates a duplicate primary key i.e. the row already exists,
// so we don't do the insert and return a ConditionalUpdate error.
const ErrDupEntry = 1062

func isDupEntry(err error) bool {
sqlErr, ok := err.(*mysql.MySQLError)
return ok && sqlErr.Number == ErrDupEntry
}

func gobSerialize(x interface{}) ([]byte, error) {
b := bytes.Buffer{}
e := gob.NewEncoder(&b)
Expand Down
24 changes: 14 additions & 10 deletions common/persistence/sql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
package sql

import (
"fmt"
"sync"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/storage"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
"github.com/uber/cadence/common/service/config"
)

Expand All @@ -45,7 +45,7 @@ type (
// additional reference counting
dbConn struct {
sync.Mutex
sqldb.Interface
sqlplugin.DB
refCnt int
cfg *config.SQL
}
Expand Down Expand Up @@ -138,15 +138,15 @@ func newRefCountedDBConn(cfg *config.SQL) dbConn {
// get returns a mysql db connection and increments a reference count
// this method will create a new connection, if an existing connection
// does not exist
func (c *dbConn) get() (sqldb.Interface, error) {
func (c *dbConn) get() (sqlplugin.DB, error) {
c.Lock()
defer c.Unlock()
if c.refCnt == 0 {
conn, err := storage.NewSQLDB(c.cfg)
conn, err := NewSQLDB(c.cfg)
if err != nil {
return nil, err
}
c.Interface = conn
c.DB = conn
}
c.refCnt++
return c, nil
Expand All @@ -156,8 +156,12 @@ func (c *dbConn) get() (sqldb.Interface, error) {
func (c *dbConn) forceClose() {
c.Lock()
defer c.Unlock()
if c.Interface != nil {
c.Interface.Close()
if c.DB != nil {
err := c.DB.Close()
if err != nil {
fmt.Println("failed to close database connection, may leak some connection", err)
}

}
c.refCnt = 0
}
Expand All @@ -168,8 +172,8 @@ func (c *dbConn) Close() error {
defer c.Unlock()
c.refCnt--
if c.refCnt == 0 {
err := c.Interface.Close()
c.Interface = nil
err := c.DB.Close()
c.DB = nil
return err
}
return nil
Expand Down
Loading

0 comments on commit a76b957

Please sign in to comment.