Skip to content

Commit

Permalink
Refactoring of persistence unit tests (uber#1111)
Browse files Browse the repository at this point in the history
* Implement metadata persistence in SQL

This implementation (sqlMetadataPersistence.go) implements the
following "v2" MetadataManager interface:

- CreateDomain
- GetDomain
- UpdateDomain
- DeleteDomain
- DeleteDomainByName
- ListDomain
- GetMetadata

The test creates and deletes the table in "sql/domains.sql" by itself.
You just need to stand up a MySQL instance with a
databased named "catalyst_test".
At the moment everything is hardcoded to
use uber:uber@localhost:3306
with database "catalyst_test". This should be generalized.

* Test GetDomain with blank id and name (uber#2)

* Big rename (uber#3)

Put Cassandra and SQL implementations and the common tests into separate packages under `persistence`

* Shard persistence, some executions persistence (uber#4)

* Pass TestPersistenceStartWorkflow, TestGetWorkflow, TestCreateGetUpdateGetShard
* Moved SQL scripts into one file

* Pass some more tests (uber#5)

* Pass TestUpdateWorkflow

* Pass TestGetCurrentWorkflow

* Pass  TestPersistenceStartWorkflowWithReplicationState

Perturb test slightly to force ShardOwnershipLost instead of WorkflowAlredyExists

* Pass TestDeleteWorkflow

* TestTransferTasksThroughUpdate

* Pass TestCancelTransferTaskTasks

* Pass TestSignalTransferTaskTasks, TestCreateTask

* Pass TestLeaseAndUpdateTaskList

* add stubs

* Put transferTaskTypeTransferTargetRunID back in cassandra; remove superfluous WHERE conditions

* More readable variable names

* Add error if getTaskList fails with error other than errNoRows

* Add field names for struct literals

* Add logger member to task persistence so we can panic, like the Cassandra impl does

* typo: forgot to add logger argument after adding logger parameter to constructor

* TestLeaseAndUpdateTaskList_Sticky, TestGetDecisionTasks, TestComplete… (uber#7)

* TestLeaseAndUpdateTaskList_Sticky, TestGetDecisionTasks, TestCompleteDecisionTask, TestReplicationTasks, TestTimerTasks
* Add & to struct arguments for named SQL queries
* Add error handling on type casts. This causes one of the tests to stop panicking

* ContinueAsNew
==
Squashed commit of the following:

commit 7a626d9
Author: Quincy Lam <qlam@uber.com>
Date:   Mon Jul 30 19:20:19 2018 -0700

    Delete panics

commit 14cb2c3
Author: Quincy Lam <qlam@uber.com>
Date:   Mon Jul 30 19:17:51 2018 -0700

    Pass

commit 04ae7b9
Author: Quincy Lam <qlam@uber.com>
Date:   Mon Jul 30 17:33:03 2018 -0700

    almost

* TestCreateGetShard_Backfill

Squashed commit of the following:

commit 05b0ddd
Author: Quincy Lam <qlam@uber.com>
Date:   Mon Jul 30 21:12:40 2018 -0700

    Pass

* TestWorkflowReplicationState

Squashed commit of the following:

commit 5ac28b4
Author: Quincy Lam <qlam@uber.com>
Date:   Tue Jul 31 13:48:09 2018 -0700

    Pass

* History persistence

Squashed commit of the following:

commit a86050e
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 14:38:16 2018 -0700

    use  error message to inspect duplicate key insert failure

commit d60de82
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 14:13:50 2018 -0700

    reorder imports

commit 774fe2a
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 14:10:34 2018 -0700

    Const queries

commit da7ab8d
Author: Quincy Lam <qlam@uber.com>
Date:   Wed Aug 1 22:15:27 2018 -0700

    Pass TestDeleteHistoryEvents

commit c2978a2
Author: Quincy Lam <qlam@uber.com>
Date:   Wed Aug 1 22:07:33 2018 -0700

    Pass TestGetHistoryEvents, TestGetHistoryEventsCompatibility

commit c775b53
Author: Quincy Lam <qlam@uber.com>
Date:   Wed Aug 1 21:44:27 2018 -0700

    TestAppendHistoryEvents

* Mutable state; fixed possible regression with TestWorkflowReplicationState

Squashed commit of the following:

commit a0f75b1
Author: Quincy Lam <qlam@uber.com>
Date:   Fri Aug 3 01:25:43 2018 -0700

    Pass TestResetMutableState

commit ef41d12
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 23:51:13 2018 -0700

    Fix regression with GetWorkflow (need to check if execution row exists before locking it; otherwise return EntityNotExists in order to pass DeleteWorkflow test)

commit fd485d9
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 23:35:35 2018 -0700

    Pass TestWorkflowMutableState_SignalRequested

commit d0a2c79
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 22:35:26 2018 -0700

    Pass TestWorkflowMutableState_BufferedReplicationTasks

commit 6d220db
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 21:47:22 2018 -0700

    Pass TestWorkflowMutableState_SignalInfo

commit 2115127
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 21:17:00 2018 -0700

    Request cancel info

commit 79733c2
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 20:42:49 2018 -0700

    parent executions; TestWorkflowMutableState_ChildExecutions

commit 2751bf1
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 18:02:54 2018 -0700

    Pass TestWorkflowMutableState_Timers

commit 72b99de
Author: Quincy Lam <qlam@uber.com>
Date:   Thu Aug 2 16:08:56 2018 -0700

    Make some better query templates

commit 6ec3532
Author: Quincy Lam <qlam@uber.com>
Date:   Tue Jul 31 23:11:47 2018 -0700

    Pass TestWorkflowMutableState_Activities

* go fmt, rename GetVisilibityTS

* Add branches with sql stuff

* deleted and rebuilt glide.lock

* typos with the branches

* Add metadata switch

* Add history branch

* Pass shard persistence tests

* hello world with sql executions+metadata+history+shards works

* Some debugging prompts

* childworkflow works

The bug was that createTimerTasks within ContinueAsNew logic was being caled on the transfer tasks. THere should be a test to make sure this typo doesn't occur

* Added MapperFunc to sql db field mapping

* Fixed syntax errors

* Moved from hmgle/sqlx to jmoiron/sqlx with batch update patch

* Renamed TestBase.CassandraTestCluster to PersistenceTestCluster

* Refactored persistence unit tests to support shared tests for various backends

* Renamed TestBaseOptions.Cluster... to DB...

* IUpdated CassandraTestCluster methods to more generic names

* Refactored cassandra test initialization out of persistence-tests

* Removed SQL implementation

* Linter fixes

* Removed config changes

* Renamed persistence to p prefix

* Renamed persistence to p prefix in cassandra package

* Renamed persistence to p prefix in service packages

* PR comments

* Added InitTestSuiteWithMetadata and fixed Integration_domain_failover_test
  • Loading branch information
mfateev authored and samarabbas committed Sep 14, 2018
1 parent a36c849 commit 1a22c01
Show file tree
Hide file tree
Showing 42 changed files with 2,050 additions and 1,623 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence
package cassandra

import (
"encoding/json"
"fmt"

"github.com/gocql/gocql"
"github.com/uber-common/bark"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/logging"
p "github.com/uber/cadence/common/persistence"
)

const (
Expand Down Expand Up @@ -65,13 +65,13 @@ type (
cassandraHistoryPersistence struct {
session *gocql.Session
logger bark.Logger
serializerFactory HistorySerializerFactory
serializerFactory p.HistorySerializerFactory
}
)

// NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation
func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string,
numConns int, logger bark.Logger) (HistoryManager,
// NewHistoryPersistence is used to create an instance of HistoryManager implementation
func NewHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string,
numConns int, logger bark.Logger) (p.HistoryManager,
error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
Expand All @@ -86,7 +86,7 @@ func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc s
return nil, err
}

return &cassandraHistoryPersistence{session: session, logger: logger, serializerFactory: NewHistorySerializerFactory()}, nil
return &cassandraHistoryPersistence{session: session, logger: logger, serializerFactory: p.NewHistorySerializerFactory()}, nil
}

// Close gracefully releases the resources held by this object
Expand All @@ -96,7 +96,7 @@ func (h *cassandraHistoryPersistence) Close() {
}
}

func (h *cassandraHistoryPersistence) AppendHistoryEvents(request *AppendHistoryEventsRequest) error {
func (h *cassandraHistoryPersistence) AppendHistoryEvents(request *p.AppendHistoryEventsRequest) error {
var query *gocql.Query

if request.Overwrite {
Expand Down Expand Up @@ -137,24 +137,24 @@ func (h *cassandraHistoryPersistence) AppendHistoryEvents(request *AppendHistory
} else if isTimeoutError(err) {
// Write may have succeeded, but we don't know
// return this info to the caller so they have the option of trying to find out by executing a read
return &TimeoutError{Msg: fmt.Sprintf("AppendHistoryEvents timed out. Error: %v", err)}
return &p.TimeoutError{Msg: fmt.Sprintf("AppendHistoryEvents timed out. Error: %v", err)}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("AppendHistoryEvents operation failed. Error: %v", err),
}
}

if !applied {
return &ConditionFailedError{
return &p.ConditionFailedError{
Msg: "Failed to append history events.",
}
}

return nil
}

func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest) (
*GetWorkflowExecutionHistoryResponse, error) {
func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.GetWorkflowExecutionHistoryRequest) (
*p.GetWorkflowExecutionHistoryResponse, error) {
execution := request.Execution
token, err := h.deserializeToken(request)
if err != nil {
Expand All @@ -180,7 +180,7 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *GetWo
eventBatchVersionPointer := new(int64)
eventBatchVersion := common.EmptyVersion
lastFirstEventID := common.EmptyEventID // first_event_id of last batch
eventBatch := SerializedHistoryEventBatch{}
eventBatch := p.SerializedHistoryEventBatch{}
history := &workflow.History{}
for iter.Scan(nil, &eventBatchVersionPointer, &eventBatch.Data, &eventBatch.EncodingType, &eventBatch.Version) {
found = true
Expand Down Expand Up @@ -216,7 +216,7 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *GetWo

eventBatchVersionPointer = new(int64)
eventBatchVersion = common.EmptyVersion
eventBatch = SerializedHistoryEventBatch{}
eventBatch = p.SerializedHistoryEventBatch{}
}

data, err := h.serializeToken(token)
Expand All @@ -238,7 +238,7 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *GetWo
}
}

response := &GetWorkflowExecutionHistoryResponse{
response := &p.GetWorkflowExecutionHistoryResponse{
NextPageToken: data,
History: history,
LastFirstEventID: lastFirstEventID,
Expand All @@ -247,14 +247,14 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *GetWo
return response, nil
}

func (h *cassandraHistoryPersistence) deserializeEvents(e *SerializedHistoryEventBatch) (*HistoryEventBatch, error) {
SetSerializedHistoryDefaults(e)
func (h *cassandraHistoryPersistence) deserializeEvents(e *p.SerializedHistoryEventBatch) (*p.HistoryEventBatch, error) {
p.SetSerializedHistoryDefaults(e)
s, _ := h.serializerFactory.Get(e.EncodingType)
return s.Deserialize(e)
}

func (h *cassandraHistoryPersistence) DeleteWorkflowExecutionHistory(
request *DeleteWorkflowExecutionHistoryRequest) error {
request *p.DeleteWorkflowExecutionHistoryRequest) error {
execution := request.Execution
query := h.session.Query(templateDeleteWorkflowExecutionHistory,
request.DomainID,
Expand Down Expand Up @@ -288,7 +288,7 @@ func (h *cassandraHistoryPersistence) serializeToken(token *historyToken) ([]byt
return data, nil
}

func (h *cassandraHistoryPersistence) deserializeToken(request *GetWorkflowExecutionHistoryRequest) (*historyToken, error) {
func (h *cassandraHistoryPersistence) deserializeToken(request *p.GetWorkflowExecutionHistoryRequest) (*historyToken, error) {
token := &historyToken{
LastEventBatchVersion: common.EmptyVersion,
LastEventID: request.FirstEventID - 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence
package cassandra

import (
"fmt"

"github.com/gocql/gocql"
"github.com/uber-common/bark"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
p "github.com/uber/cadence/common/persistence"

"github.com/gocql/gocql"
"github.com/uber-common/bark"
)

const (
Expand Down Expand Up @@ -97,9 +98,9 @@ type (
}
)

// NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation
func NewCassandraMetadataPersistence(hosts string, port int, user, password, dc string, keyspace string,
currentClusterName string, logger bark.Logger) (MetadataManager,
// NewMetadataPersistence is used to create an instance of HistoryManager implementation
func NewMetadataPersistence(hosts string, port int, user, password, dc string, keyspace string,
currentClusterName string, logger bark.Logger) (p.MetadataManager,
error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
Expand Down Expand Up @@ -131,7 +132,7 @@ func (m *cassandraMetadataPersistence) Close() {
// 'Domains' table and then do a conditional insert into domains_by_name table. If the conditional write fails we
// delete the orphaned entry from domains table. There is a chance delete entry could fail and we never delete the
// orphaned entry from domains table. We might need a background job to delete those orphaned record.
func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error) {
func (m *cassandraMetadataPersistence) CreateDomain(request *p.CreateDomainRequest) (*p.CreateDomainResponse, error) {
query := m.session.Query(templateCreateDomainQuery, request.Info.ID, request.Info.Name)
applied, err := query.ScanCAS()
if err != nil {
Expand All @@ -156,7 +157,7 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest
request.Config.Retention,
request.Config.EmitMetric,
request.ReplicationConfig.ActiveClusterName,
serializeClusterConfigs(request.ReplicationConfig.Clusters),
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.IsGlobalDomain,
request.ConfigVersion,
request.FailoverVersion,
Expand Down Expand Up @@ -189,15 +190,15 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest
}
}

return &CreateDomainResponse{ID: request.Info.ID}, nil
return &p.CreateDomainResponse{ID: request.Info.ID}, nil
}

func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*GetDomainResponse, error) {
func (m *cassandraMetadataPersistence) GetDomain(request *p.GetDomainRequest) (*p.GetDomainResponse, error) {
var query *gocql.Query
var err error
info := &DomainInfo{}
config := &DomainConfig{}
replicationConfig := &DomainReplicationConfig{}
info := &p.DomainInfo{}
config := &p.DomainConfig{}
replicationConfig := &p.DomainReplicationConfig{}
var replicationClusters []map[string]interface{}
var dbVersion int64
var failoverVersion int64
Expand Down Expand Up @@ -260,11 +261,11 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
return nil, handleError(request.Name, request.ID, err)
}

replicationConfig.ActiveClusterName = GetOrUseDefaultActiveCluster(m.currentClusterName, replicationConfig.ActiveClusterName)
replicationConfig.Clusters = deserializeClusterConfigs(replicationClusters)
replicationConfig.Clusters = GetOrUseDefaultClusters(m.currentClusterName, replicationConfig.Clusters)
replicationConfig.ActiveClusterName = p.GetOrUseDefaultActiveCluster(m.currentClusterName, replicationConfig.ActiveClusterName)
replicationConfig.Clusters = p.DeserializeClusterConfigs(replicationClusters)
replicationConfig.Clusters = p.GetOrUseDefaultClusters(m.currentClusterName, replicationConfig.Clusters)

return &GetDomainResponse{
return &p.GetDomainResponse{
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
Expand All @@ -275,7 +276,7 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
}, nil
}

func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest) error {
func (m *cassandraMetadataPersistence) UpdateDomain(request *p.UpdateDomainRequest) error {
var nextVersion int64 = 1
var currentVersion *int64
if request.NotificationVersion > 0 {
Expand All @@ -292,7 +293,7 @@ func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest
request.Config.Retention,
request.Config.EmitMetric,
request.ReplicationConfig.ActiveClusterName,
serializeClusterConfigs(request.ReplicationConfig.Clusters),
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.ConfigVersion,
request.FailoverVersion,
nextVersion,
Expand All @@ -315,7 +316,7 @@ func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest
return nil
}

func (m *cassandraMetadataPersistence) DeleteDomain(request *DeleteDomainRequest) error {
func (m *cassandraMetadataPersistence) DeleteDomain(request *p.DeleteDomainRequest) error {
var name string
query := m.session.Query(templateGetDomainQuery, request.ID)
err := query.Scan(&name)
Expand All @@ -329,7 +330,7 @@ func (m *cassandraMetadataPersistence) DeleteDomain(request *DeleteDomainRequest
return m.deleteDomain(name, request.ID)
}

func (m *cassandraMetadataPersistence) DeleteDomainByName(request *DeleteDomainByNameRequest) error {
func (m *cassandraMetadataPersistence) DeleteDomainByName(request *p.DeleteDomainByNameRequest) error {
var ID string
query := m.session.Query(templateGetDomainByNameQuery, request.Name)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
Expand All @@ -342,11 +343,11 @@ func (m *cassandraMetadataPersistence) DeleteDomainByName(request *DeleteDomainB
return m.deleteDomain(request.Name, ID)
}

func (m *cassandraMetadataPersistence) ListDomains(request *ListDomainsRequest) (*ListDomainsResponse, error) {
func (m *cassandraMetadataPersistence) ListDomains(request *p.ListDomainsRequest) (*p.ListDomainsResponse, error) {
panic("cassandraMetadataPersistence do not support list domain operation.")
}

func (m *cassandraMetadataPersistence) GetMetadata() (*GetMetadataResponse, error) {
func (m *cassandraMetadataPersistence) GetMetadata() (*p.GetMetadataResponse, error) {
panic("cassandraMetadataPersistence do not support get metadata operation.")
}

Expand All @@ -367,21 +368,3 @@ func (m *cassandraMetadataPersistence) deleteDomain(name, ID string) error {

return nil
}

func serializeClusterConfigs(replicationConfigs []*ClusterReplicationConfig) []map[string]interface{} {
seriaizedReplicationConfigs := []map[string]interface{}{}
for index := range replicationConfigs {
seriaizedReplicationConfigs = append(seriaizedReplicationConfigs, replicationConfigs[index].serialize())
}
return seriaizedReplicationConfigs
}

func deserializeClusterConfigs(replicationConfigs []map[string]interface{}) []*ClusterReplicationConfig {
deseriaizedReplicationConfigs := []*ClusterReplicationConfig{}
for index := range replicationConfigs {
deseriaizedReplicationConfig := &ClusterReplicationConfig{}
deseriaizedReplicationConfig.deserialize(replicationConfigs[index])
deseriaizedReplicationConfigs = append(deseriaizedReplicationConfigs, deseriaizedReplicationConfig)
}
return deseriaizedReplicationConfigs
}
Loading

0 comments on commit 1a22c01

Please sign in to comment.