Skip to content

Commit 831c211

Browse files
elhimovdmyger
authored andcommitted
cluster: support switching master over TcS
Prior to this patch `failover switch` and `failover switch-status` commands worked only with etcd config storage. Now they also work with Tarantool Config Storage. Closes #1193 @TarantoolBot document Title: `tt cluster failover` commands support TcS as config storage
1 parent 1e02f07 commit 831c211

File tree

13 files changed

+610
-143
lines changed

13 files changed

+610
-143
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1111

1212
- `tt create`: add template for Tarantool Config Storage.
1313
- `tt create`: add template for non-vshard Cluster.
14+
- `tt cluster failover`: support Tarantool Config Storage.
1415

1516
### Changed
1617

cli/cluster/cmd/failover.go

Lines changed: 26 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"github.com/google/uuid"
1010
libcluster "github.com/tarantool/tt/lib/cluster"
1111
"github.com/tarantool/tt/lib/connect"
12-
"go.etcd.io/etcd/api/v3/mvccpb"
13-
clientv3 "go.etcd.io/etcd/client/v3"
1412
"gopkg.in/yaml.v2"
1513
)
1614

@@ -63,36 +61,22 @@ type SwitchStatusCtx struct {
6361
TaskID string
6462
}
6563

66-
func makeEtcdOpts(uriOpts connect.UriOpts) libcluster.EtcdOpts {
67-
opts := libcluster.EtcdOpts{
68-
Endpoints: []string{uriOpts.Endpoint},
69-
Username: uriOpts.Username,
70-
Password: uriOpts.Password,
71-
KeyFile: uriOpts.KeyFile,
72-
CertFile: uriOpts.CertFile,
73-
CaPath: uriOpts.CaPath,
74-
CaFile: uriOpts.CaFile,
75-
SkipHostVerify: uriOpts.SkipHostVerify,
76-
Timeout: uriOpts.Timeout,
77-
}
78-
79-
return opts
80-
}
81-
8264
// Switch master instance.
8365
func Switch(url string, switchCtx SwitchCtx) error {
8466
uriOpts, err := connect.CreateUriOpts(url)
8567
if err != nil {
8668
return fmt.Errorf("invalid URL %q: %w", url, err)
8769
}
70+
connOpts := libcluster.ConnectOpts{
71+
Username: switchCtx.Username,
72+
Password: switchCtx.Password,
73+
}
8874

89-
opts := makeEtcdOpts(uriOpts)
90-
91-
etcd, err := libcluster.ConnectEtcd(opts)
75+
conn, err := libcluster.ConnectCStorage(uriOpts, connOpts)
9276
if err != nil {
93-
return fmt.Errorf("unable to connect to etcd: %w", err)
77+
return fmt.Errorf("unable to connect to config storage: %w", err)
9478
}
95-
defer etcd.Close()
79+
defer conn.Close()
9680

9781
cmd := switchCmd{
9882
Command: "switch",
@@ -109,54 +93,40 @@ func Switch(url string, switchCtx SwitchCtx) error {
10993
key := uriOpts.Prefix + failoverPath + uuid
11094

11195
if switchCtx.Wait {
112-
ctx, cancel_watch := context.WithTimeout(context.Background(),
96+
ctxWatch, cancelWatch := context.WithTimeout(context.Background(),
11397
time.Duration(switchCtx.Timeout)*time.Second+cmdAdditionalWait)
114-
outputChan := make(chan *clientv3.Event)
115-
defer cancel_watch()
116-
117-
go func() {
118-
waitChan := etcd.Watch(ctx, key)
119-
defer close(outputChan)
120-
121-
for resp := range waitChan {
122-
for _, ev := range resp.Events {
123-
switch ev.Type {
124-
case mvccpb.PUT:
125-
outputChan <- ev
126-
}
127-
}
128-
}
129-
}()
98+
defer cancelWatch()
99+
watchChan := conn.Watch(ctxWatch, key)
130100

131-
ctx_put, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
132-
_, err = etcd.Put(ctx_put, key, string(yamlCmd))
101+
ctx, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
102+
err = conn.Put(ctx, key, string(yamlCmd))
133103
cancel()
134104

135105
if err != nil {
136106
return err
137107
}
138108

139-
for ev := range outputChan {
140-
result := switchCmdResult{}
141-
err = yaml.Unmarshal(ev.Kv.Value, &result)
109+
for ev := range watchChan {
110+
var result switchCmdResult
111+
err = yaml.Unmarshal(ev.Value, &result)
142112
if err != nil {
143113
return err
144114
}
145-
fmt.Printf("%s", ev.Kv.Value)
115+
fmt.Printf("%s", ev.Value)
146116
if result.Status == "success" || result.Status == "failed" {
147117
return nil
148118
}
149119
}
150-
if ctx.Err() == context.DeadlineExceeded {
120+
if ctxWatch.Err() == context.DeadlineExceeded {
151121
log.Info("Timeout for command execution reached.")
152122
return nil
153123
}
154124

155-
return ctx.Err()
125+
return fmt.Errorf("unexpected problem with watch context: %w", ctxWatch.Err())
156126
}
157127

158128
ctx, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
159-
_, err = etcd.Put(ctx, key, string(yamlCmd))
129+
err = conn.Put(ctx, key, string(yamlCmd))
160130
cancel()
161131

162132
if err != nil {
@@ -177,30 +147,28 @@ func SwitchStatus(url string, switchCtx SwitchStatusCtx) error {
177147
if err != nil {
178148
return fmt.Errorf("invalid URL %q: %w", url, err)
179149
}
180-
181-
opts := makeEtcdOpts(uriOpts)
182-
183-
etcd, err := libcluster.ConnectEtcd(opts)
150+
var connOpts libcluster.ConnectOpts
151+
conn, err := libcluster.ConnectCStorage(uriOpts, connOpts)
184152
if err != nil {
185-
return fmt.Errorf("unable to connect to etcd: %w", err)
153+
return fmt.Errorf("unable to connect to config storage: %w", err)
186154
}
187-
defer etcd.Close()
155+
defer conn.Close()
188156

189157
key := uriOpts.Prefix + failoverPath + switchCtx.TaskID
190158

191159
ctx, cancel := context.WithTimeout(context.Background(), defaultEtcdTimeout)
192-
result, err := etcd.Get(ctx, key, clientv3.WithLimit(1))
160+
result, err := conn.Get(ctx, key)
193161
cancel()
194162

195163
if err != nil {
196164
return err
197165
}
198166

199-
if len(result.Kvs) != 1 {
167+
if len(result) != 1 {
200168
return fmt.Errorf("task with id `%s` is not found", switchCtx.TaskID)
201169
}
202170

203-
fmt.Print(string(result.Kvs[0].Value))
171+
fmt.Print(string(result[0].Value))
204172

205173
return nil
206174
}

cli/cmd/cluster.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,10 @@ environment variables < command flags < URL credentials.`,
8282
})
8383

8484
failoverUriHelp = libconnect.MakeURLHelp(map[string]any{
85-
"service": "etcd",
86-
"prefix": "a base path to Tarantool configuration in etcd",
85+
"service": "etcd or tarantool config storage",
86+
"prefix": "a base path to Tarantool configuration in" +
87+
" etcd or tarantool config storage",
88+
"env_TT_CLI_auth": "Tarantool",
8789
"env_TT_CLI_ETCD_auth": "Etcd",
8890
"footer": `The priority of credentials:
8991
environment variables < command flags < URL credentials.`,
@@ -235,9 +237,9 @@ func newClusterFailoverCmd() *cobra.Command {
235237
}
236238

237239
switchCmd.Flags().StringVarP(&switchCtx.Username, "username", "u", "",
238-
"username (used as etcd credentials)")
240+
"username (used as etcd/tarantool config storage credentials)")
239241
switchCmd.Flags().StringVarP(&switchCtx.Password, "password", "p", "",
240-
"password (used as etcd credentials)")
242+
"password (used as etcd/tarantool config storage credentials)")
241243
switchCmd.Flags().Uint64VarP(&switchCtx.Timeout, "timeout", "t", defaultSwitchTimeout,
242244
"timeout for command execution")
243245
switchCmd.Flags().BoolVarP(&switchCtx.Wait, "wait", "w", false,

lib/cluster/configstorage.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package cluster
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
libconnect "github.com/tarantool/tt/lib/connect"
8+
)
9+
10+
type CSWatchEvent struct {
11+
Key string
12+
Value []byte
13+
}
14+
15+
// CSConnection interface is to be used to implement access to config storage.
16+
type CSConnection interface {
17+
// Close closes connection.
18+
Close() error
19+
// Get retrieves value for key.
20+
Get(ctx context.Context, key string) ([]Data, error)
21+
// Put puts a key-value pair into config storage.
22+
Put(ctx context.Context, key, value string) error
23+
// Watch watches on a key and return watched events through the returned channel.
24+
Watch(ctx context.Context, key string) <-chan CSWatchEvent
25+
}
26+
27+
// ConnectCStorage connects to config storage according to connection options.
28+
func ConnectCStorage(
29+
uriOpts libconnect.UriOpts,
30+
connOpts ConnectOpts,
31+
) (CSConnection, error) {
32+
sc, errEtcd := connectEtcdCS(uriOpts, connOpts)
33+
if errEtcd == nil {
34+
return sc, nil
35+
}
36+
37+
sc, errTarantool := connectTarantoolCS(uriOpts, connOpts)
38+
if errTarantool == nil {
39+
return sc, nil
40+
}
41+
42+
return nil, fmt.Errorf("failed to establish a connection to tarantool or etcd: %w, %w",
43+
errTarantool, errEtcd)
44+
}

lib/cluster/etcd.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/tarantool/go-tarantool/v2"
1515
libconnect "github.com/tarantool/tt/lib/connect"
16+
"go.etcd.io/etcd/api/v3/mvccpb"
1617
"go.etcd.io/etcd/client/pkg/v3/transport"
1718
clientv3 "go.etcd.io/etcd/client/v3"
1819
"go.uber.org/zap"
@@ -882,3 +883,78 @@ func MakeEtcdOptsFromUriOpts(src libconnect.UriOpts) EtcdOpts {
882883
Timeout: src.Timeout,
883884
}
884885
}
886+
887+
type etcdCSConnection struct {
888+
cli *clientv3.Client
889+
}
890+
891+
func connectEtcdCS(uriOpts libconnect.UriOpts, connOpts ConnectOpts) (CSConnection, error) {
892+
cli, err := ConnectEtcdUriOpts(uriOpts, connOpts)
893+
if err != nil {
894+
return nil, err
895+
}
896+
return &etcdCSConnection{
897+
cli: cli,
898+
}, nil
899+
}
900+
901+
// Close implements ConnectCStorage interface.
902+
func (c *etcdCSConnection) Close() error {
903+
return c.cli.Close()
904+
}
905+
906+
// Get implements ConnectCStorage interface.
907+
func (c *etcdCSConnection) Get(ctx context.Context, key string) ([]Data, error) {
908+
resp, err := c.cli.Get(ctx, key)
909+
if err != nil {
910+
return nil, fmt.Errorf("failed to fetch data from etcd: %w", err)
911+
}
912+
913+
switch {
914+
case len(resp.Kvs) == 0:
915+
// It should not happen, but we need to be sure to avoid a null pointer
916+
// dereference.
917+
return nil, fmt.Errorf("a configuration data not found in etcd for key %q",
918+
key)
919+
case len(resp.Kvs) > 1:
920+
return nil, fmt.Errorf("too many responses (%v) from etcd for key %q",
921+
resp.Kvs, key)
922+
}
923+
924+
collected := []Data{
925+
{
926+
Source: string(resp.Kvs[0].Key),
927+
Value: resp.Kvs[0].Value,
928+
Revision: resp.Kvs[0].ModRevision,
929+
},
930+
}
931+
return collected, nil
932+
}
933+
934+
// Put implements ConnectCStorage interface.
935+
func (c *etcdCSConnection) Put(ctx context.Context, key, value string) error {
936+
_, err := c.cli.Put(ctx, key, value)
937+
return err
938+
}
939+
940+
// Watch implements ConnectCStorage interface.
941+
func (c *etcdCSConnection) Watch(ctx context.Context, key string) <-chan CSWatchEvent {
942+
ch := make(chan CSWatchEvent)
943+
innerCh := c.cli.Watch(ctx, key)
944+
go func() {
945+
defer close(ch)
946+
947+
for resp := range innerCh {
948+
for _, ev := range resp.Events {
949+
switch ev.Type {
950+
case mvccpb.PUT:
951+
ch <- CSWatchEvent{
952+
Key: string(ev.Kv.Value),
953+
Value: ev.Kv.Value,
954+
}
955+
}
956+
}
957+
}
958+
}()
959+
return ch
960+
}

0 commit comments

Comments
 (0)