-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathconvert2.go
147 lines (131 loc) · 3.09 KB
/
convert2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package etcd
import (
"context"
"encoding/json"
"fmt"
"github.com/cybozu-go/log"
"github.com/cybozu-go/sabakan/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
func (d *driver) convertTo2Machines(ctx context.Context, mu *concurrency.Mutex, ipam *sabakan.IPAMConfig) error {
const limitMachines = 20
key := KeyMachines
endKey := clientv3.GetPrefixRangeEnd(KeyMachines)
resp, err := d.client.Get(ctx, key,
clientv3.WithRange(endKey),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(limitMachines),
)
if err != nil {
return err
}
rev := resp.Header.Revision // to retrieve following pages at the same revision.
kvs := resp.Kvs
var ops []clientv3.Op
REDO:
if len(kvs) == 0 {
return nil
}
ops = make([]clientv3.Op, len(kvs))
for i, kv := range kvs {
var m sabakan.Machine
err = json.Unmarshal(kv.Value, &m)
if err != nil {
return fmt.Errorf("failed to unmarshal %s: %v", string(kv.Key[len(KeyMachines):]), err)
}
// fill Machine.Info
ipam.GenerateIP(&m)
data, err := json.Marshal(m)
if err != nil {
return err
}
ops[i] = clientv3.OpPut(KeyMachines+m.Spec.Serial, string(data))
}
tresp, err := d.client.Txn(ctx).If(mu.IsOwner()).
Then(ops...).
Commit()
if err != nil {
return err
}
if !tresp.Succeeded {
return errLostOwner
}
if resp.More {
resp, err = d.client.Get(ctx, string(resp.Kvs[len(resp.Kvs)-1].Key),
clientv3.WithRange(endKey),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(limitMachines),
clientv3.WithRev(rev),
)
if err != nil {
return err
}
// ignore the first key
kvs = resp.Kvs[1:]
goto REDO
}
return nil
}
func (d *driver) convertTo2(ctx context.Context, mu *concurrency.Mutex) error {
// we cannot use getDHCPConfig / getIPAMConfig before starting stateless watcher.
resp, err := d.client.Get(ctx, KeyIPAM)
if err != nil {
return err
}
if resp.Count == 0 {
// not initialized
return nil
}
ipam := new(sabakan.IPAMConfig)
err = json.Unmarshal(resp.Kvs[0].Value, ipam)
if err != nil {
return err
}
// copy gateway-offset from dhcp.json to ipam.json
resp, err = d.client.Get(ctx, KeyDHCP)
if err != nil {
return err
}
if resp.Count > 0 {
dc := new(sabakan.DHCPConfig)
err := json.Unmarshal(resp.Kvs[0].Value, dc)
if err != nil {
return err
}
ipam.NodeGatewayOffset = dc.GatewayOffset
data, err := json.Marshal(ipam)
if err != nil {
return err
}
tresp, err := d.client.Txn(ctx).If(mu.IsOwner()).
Then(clientv3.OpPut(KeyIPAM, string(data))).
Commit()
if err != nil {
return err
}
if !tresp.Succeeded {
return errLostOwner
}
}
// update Machine.Info
err = d.convertTo2Machines(ctx, mu, ipam)
if err != nil {
return err
}
// update schema version
const thisVersion = "2"
tresp, err := d.client.Txn(ctx).If(mu.IsOwner()).
Then(clientv3.OpPut(KeyVersion, thisVersion)).
Commit()
if err != nil {
return err
}
if !tresp.Succeeded {
return errLostOwner
}
log.Info("updated schema version", map[string]interface{}{
"to": thisVersion,
})
return nil
}