Skip to content

Commit

Permalink
encryption: refine key manager watcher loop (#4111) (#4155)
Browse files Browse the repository at this point in the history
* encryption: refine key manager watcher loop

Signed-off-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
ti-chi-bot authored Nov 30, 2021
1 parent c0fece4 commit 32885c6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 32 deletions.
79 changes: 48 additions & 31 deletions server/encryptionkm/key_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ func (m *KeyManager) keysRevision() int64 {
func (m *KeyManager) StartBackgroundLoop(ctx context.Context) {
// Setup key dictionary watcher
watcher := clientv3.NewWatcher(m.etcdClient)
watchChan := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision()))
watcherEnabled := true
defer watcher.Close()
// Check data key rotation every min(dataKeyRotationPeriod, keyRotationCheckPeriod).
checkPeriod := m.dataKeyRotationPeriod
Expand All @@ -216,54 +214,73 @@ func (m *KeyManager) StartBackgroundLoop(ctx context.Context) {
}
ticker := time.NewTicker(checkPeriod)
defer ticker.Stop()
// Loop

for {
select {
// Reload encryption keys updated by PD leader (could be ourselves).
case resp := <-watchChan:
if resp.Canceled {
// If the watcher failed, we fallback to reload every 10 minutes.
log.Warn("encryption key watcher canceled")
watcherEnabled = false
continue
}
for _, event := range resp.Events {
if event.Type != mvccpb.PUT {
log.Warn("encryption keys is deleted unexpectedly")
continue
var (
resp clientv3.WatchResponse
ok bool
)
rch := watcher.Watch(ctx, EncryptionKeysPath, clientv3.WithRev(m.keysRevision()))

keyWatchLoop:
for {
select {
case resp, ok = <-rch:
if !ok || resp.Err() != nil {
// If chan is closed or canceled, exit watch loop
// Ref https://etcd.io/docs/v3.4/learning/api/#watch-streams
break keyWatchLoop
}
_, err := m.loadKeysFromKV(event.Kv)
if err != nil {
log.Warn("fail to get encryption keys from watcher result", errs.ZapError(err))
for _, event := range resp.Events {
if event.Type != mvccpb.PUT {
log.Warn("encryption keys is deleted unexpectedly")
continue
}
_, err := m.loadKeysFromKV(event.Kv)
if err != nil {
log.Warn("fail to get encryption keys from watcher result", errs.ZapError(err))
}
}
m.helper.eventAfterReloadByWatcher()
case <-m.helper.tick(ticker):
m.checkOnTick()
m.helper.eventAfterTicker()
}
m.helper.eventAfterReloadByWatcher()
case <-m.helper.tick(ticker):
m.checkOnTick(watcherEnabled)
m.helper.eventAfterTicker()
}

select {
case <-ctx.Done():
// Server shutdown.
return
default:
}

if resp.CompactRevision != 0 {
// meet compacted error
log.Warn("revision has been compacted, the watcher will watch again",
zap.Int64("revision", m.keysRevision()),
zap.Int64("compact-revision", resp.CompactRevision))
} else {
// other error
log.Error("encryption key watcher canceled, the watcher will watch again",
errs.ZapError(errs.ErrEncryptionKeysWatcher, resp.Err()))
}

if _, err := m.loadKeys(); err != nil {
log.Error("encryption key reload failed", errs.ZapError(err))
}
}
}

// checkOnTick perform key rotation and key reload on timer tick, if necessary.
func (m *KeyManager) checkOnTick(watcherEnabled bool) {
func (m *KeyManager) checkOnTick() {
m.mu.Lock()
defer m.mu.Unlock()
// Check data key rotation in case we are the PD leader.
err := m.rotateKeyIfNeeded(false /*forceUpdate*/)
if err != nil {
log.Warn("fail to rotate data encryption key", errs.ZapError(err))
}
// Fallback mechanism to reload keys if watcher failed.
if !watcherEnabled {
_, err = m.loadKeysImpl()
if err != nil {
log.Warn("fail to reload keys after watcher failed", errs.ZapError(err))
}
}
}

// loadKeysFromKVImpl reload keys from etcd result.
Expand Down
2 changes: 1 addition & 1 deletion server/encryptionkm/key_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestKeyManager(t *testing.T) {

type testKeyManagerSuite struct{}

var _ = Suite(&testKeyManagerSuite{})
var _ = SerialSuites(&testKeyManagerSuite{})

const (
testMasterKey = "8fd7e3e917c170d92f3e51a981dd7bc8fba11f3df7d8df994842f6e86f69b530"
Expand Down

0 comments on commit 32885c6

Please sign in to comment.