Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDNC-4831] Added option to pass consistency level in the cassandra schema versio… #5327

Merged
merged 6 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/server/cadence/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"strings"
"syscall"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/urfave/cli"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -69,7 +71,7 @@ func startHandler(c *cli.Context) {
log.Fatalf("config validation failed: %v", err)
}
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence); err != nil {
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence, gocql.All); err != nil {
agautam478 marked this conversation as resolved.
Show resolved Hide resolved
log.Fatal("cassandra schema version compatibility check failed: ", err)
}
// sql schema version validation
Expand Down
4 changes: 3 additions & 1 deletion cmd/server/cadence/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/uber/cadence/testflags"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -94,7 +96,7 @@ func (s *ServerSuite) TestServerStartup() {
log.Fatalf("config validation failed: %v", err)
}
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence); err != nil {
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence, gocql.All); err != nil {
log.Fatal("cassandra schema version compatibility check failed: ", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -92,7 +94,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() {
TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit),
ErrorInjectionRate: dynamicconfig.GetFloatPropertyFn(0),
}
s.NoError(cassandra.VerifyCompatibleVersion(cfg))
s.NoError(cassandra.VerifyCompatibleVersion(cfg, gocql.All))
}

func (s *VersionTestSuite) TestCheckCompatibleVersion() {
Expand Down Expand Up @@ -121,7 +123,7 @@ func (s *VersionTestSuite) createKeyspace(keyspace string) func() {
NumReplicas: 1,
ProtoVersion: environment.GetCassandraProtoVersion(),
}
client, err := cassandra.NewCQLClient(cfg)
client, err := cassandra.NewCQLClient(cfg, gocql.All)
s.NoError(err)

err = client.CreateKeyspace(keyspace)
Expand Down Expand Up @@ -167,7 +169,7 @@ func (s *VersionTestSuite) runCheckCompatibleVersion(
Password: environment.GetCassandraPassword(),
Keyspace: keyspace,
}
err := cassandra.CheckCompatibleVersion(cfg, expected)
err := cassandra.CheckCompatibleVersion(cfg, expected, gocql.All)
if len(errStr) > 0 {
s.Error(err)
s.Contains(err.Error(), errStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package tests

import (
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/environment"
"github.com/uber/cadence/tools/cassandra"
)
Expand All @@ -39,7 +40,7 @@ func NewTestCQLClient(keyspace string) (cassandra.CqlClient, error) {
AllowedAuthenticators: environment.GetCassandraAllowedAuthenticators(),
NumReplicas: 1,
ProtoVersion: environment.GetCassandraProtoVersion(),
})
}, gocql.All)
}

func CreateTestCQLFileContent() string {
Expand Down
8 changes: 6 additions & 2 deletions tools/cassandra/cqlclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ const (
var _ schema.SchemaClient = (*CqlClientImpl)(nil)

// NewCQLClient returns a new instance of CQLClient
func NewCQLClient(cfg *CQLClientConfig) (CqlClient, error) {
func NewCQLClient(cfg *CQLClientConfig, expectedConsistency gocql.Consistency) (CqlClient, error) {
var err error

cqlClient := new(CqlClientImpl)
cqlClient.cfg = cfg
cqlClient.nReplicas = cfg.NumReplicas
consistency := gocql.All
if expectedConsistency == gocql.Quorum {
agautam478 marked this conversation as resolved.
Show resolved Hide resolved
consistency = gocql.Quorum
}
cqlClient.session, err = gocql.GetRegisteredClient().CreateSession(gocql.ClusterConfig{
Hosts: cfg.Hosts,
Port: cfg.Port,
Expand All @@ -129,7 +133,7 @@ func NewCQLClient(cfg *CQLClientConfig) (CqlClient, error) {
Timeout: time.Duration(cfg.Timeout) * time.Second,
ConnectTimeout: time.Duration(cfg.ConnectTimeout) * time.Second,
ProtoVersion: cfg.ProtoVersion,
Consistency: gocql.All,
Consistency: consistency,
})
if err != nil {
return nil, err
Expand Down
28 changes: 16 additions & 12 deletions tools/cassandra/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"fmt"
"log"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/urfave/cli"

"github.com/uber/cadence/common/config"
Expand All @@ -45,16 +47,17 @@ type SetupSchemaConfig struct {
// rollback, the code version (expected version) would fall lower than the actual version in
// cassandra.
func VerifyCompatibleVersion(
cfg config.Persistence,
cfg config.Persistence, expectedConsistency gocql.Consistency,
) error {

if ds, ok := cfg.DataStores[cfg.DefaultStore]; ok {
if err := verifyCompatibleVersion(ds, cassandra.Version); err != nil {
if err := verifyCompatibleVersion(ds, cassandra.Version, expectedConsistency); err != nil {
return err
}
}

if ds, ok := cfg.DataStores[cfg.VisibilityStore]; ok {
if err := verifyCompatibleVersion(ds, cassandra.VisibilityVersion); err != nil {
if err := verifyCompatibleVersion(ds, cassandra.VisibilityVersion, expectedConsistency); err != nil {
return err
}
}
Expand All @@ -64,14 +67,14 @@ func VerifyCompatibleVersion(

func verifyCompatibleVersion(
ds config.DataStore,
expectedCassandraVersion string,
expectedCassandraVersion string, expectedConsistency gocql.Consistency,
) error {
if ds.NoSQL != nil {
return verifyPluginVersion(ds.NoSQL, expectedCassandraVersion)
return verifyPluginVersion(ds.NoSQL, expectedCassandraVersion, expectedConsistency)
}
if ds.ShardedNoSQL != nil {
for shardName, connection := range ds.ShardedNoSQL.Connections {
err := verifyPluginVersion(connection.NoSQLPlugin, expectedCassandraVersion)
err := verifyPluginVersion(connection.NoSQLPlugin, expectedCassandraVersion, expectedConsistency)
if err != nil {
return fmt.Errorf("Failed to verify version for DB shard: %v. Error: %v", shardName, err.Error())
}
Expand All @@ -82,21 +85,22 @@ func verifyCompatibleVersion(
return nil
}

func verifyPluginVersion(plugin *config.NoSQL, expectedCassandraVersion string) error {
func verifyPluginVersion(plugin *config.NoSQL, expectedCassandraVersion string, expectedConsistency gocql.Consistency) error {
// Use hardcoded instead of constant because of cycle dependency issue.
// However, this file will be refactor to support NoSQL soon. After the refactoring, cycle dependency issue
// should be gone and we can use constant at that time
if plugin.PluginName != "cassandra" {
return fmt.Errorf("unknown NoSQL plugin name: %q", plugin.PluginName)
}

return CheckCompatibleVersion(*plugin, expectedCassandraVersion)
return CheckCompatibleVersion(*plugin, expectedCassandraVersion, expectedConsistency)
}

// CheckCompatibleVersion check the version compatibility
func CheckCompatibleVersion(
cfg config.Cassandra,
expectedVersion string,
expectedConsistency gocql.Consistency,
) error {

client, err := NewCQLClient(&CQLClientConfig{
Expand All @@ -110,7 +114,7 @@ func CheckCompatibleVersion(
ConnectTimeout: DefaultConnectTimeout,
TLS: cfg.TLS,
ProtoVersion: cfg.ProtoVersion,
})
}, expectedConsistency)
if err != nil {
return fmt.Errorf("creating CQL client: %w", err)
}
Expand All @@ -127,7 +131,7 @@ func setupSchema(cli *cli.Context) error {
if err != nil {
return handleErr(schema.NewConfigError(err.Error()))
}
client, err := NewCQLClient(config)
client, err := NewCQLClient(config, gocql.All)
if err != nil {
return handleErr(err)
}
Expand All @@ -145,7 +149,7 @@ func updateSchema(cli *cli.Context) error {
if err != nil {
return handleErr(schema.NewConfigError(err.Error()))
}
client, err := NewCQLClient(config)
client, err := NewCQLClient(config, gocql.All)
if err != nil {
return handleErr(err)
}
Expand Down Expand Up @@ -176,7 +180,7 @@ func createKeyspace(cli *cli.Context) error {

func doCreateKeyspace(cfg CQLClientConfig, name string, datacenter string) error {
cfg.Keyspace = SystemKeyspace
client, err := NewCQLClient(&cfg)
client, err := NewCQLClient(&cfg, gocql.All)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion tools/cassandra/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package cassandra
import (
"os"

"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"

"github.com/urfave/cli"

"github.com/uber/cadence/tools/common/schema"
Expand All @@ -39,7 +41,7 @@ func SetupSchema(config *SetupSchemaConfig) error {
if err := validateCQLClientConfig(&config.CQLClientConfig); err != nil {
return err
}
db, err := NewCQLClient(&config.CQLClientConfig)
db, err := NewCQLClient(&config.CQLClientConfig, gocql.All)
if err != nil {
return err
}
Expand Down