Skip to content

Commit

Permalink
Merge pull request #18843 from ghouscht/defrag-fixes-backport-3.4
Browse files Browse the repository at this point in the history
[3.4] fix(defrag): handle errors during defrag
  • Loading branch information
ahrtr authored Nov 6, 2024
2 parents 463615d + 77c7c84 commit 76e2580
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 5 deletions.
23 changes: 18 additions & 5 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (

"github.com/coreos/pkg/capnslog"
humanize "github.com/dustin/go-humanize"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
)

var (
Expand Down Expand Up @@ -449,22 +450,21 @@ func (b *backend) defrag() error {
b.readTx.Lock()
defer b.readTx.Unlock()

b.batchTx.unsafeCommit(true)

b.batchTx.tx = nil

// Create a temporary file to ensure we start with a clean slate.
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
dir := filepath.Dir(b.db.Path())
temp, err := ioutil.TempFile(dir, "db.tmp.*")
if err != nil {
return err
}

options := bolt.Options{}
if boltOpenOptions != nil {
options = *boltOpenOptions
}
options.OpenFile = func(path string, i int, mode os.FileMode) (file *os.File, err error) {
// gofail: var defragOpenFileError string
// return nil, fmt.Errorf(defragOpenFileError)
return temp, nil
}
tdbp := temp.Name()
Expand All @@ -485,6 +485,11 @@ func (b *backend) defrag() error {
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
)
}

// Commit/stop and then reset current transactions (including the readTx)
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil

// gofail: var defragBeforeCopy struct{}
err = defragdb(b.db, tmpdb, defragLimit)
if err != nil {
Expand All @@ -496,6 +501,11 @@ func (b *backend) defrag() error {
plog.Fatalf("failed to remove db.tmp after defragmentation completed: %v", rmErr)
}
}

// restore the bbolt transactions if defragmentation fails
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.tx = b.unsafeBegin(false)

return err
}

Expand Down Expand Up @@ -564,6 +574,9 @@ func (b *backend) defrag() error {
}

func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// gofail: var defragdbFail string
// return fmt.Errorf(defragdbFail)

// open a tx on tmpdb for writes
tmptx, err := tmpdb.Begin(true)
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions tests/e2e/defrag_no_space_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestDefragNoSpace(t *testing.T) {
tests := []struct {
name string
failpoint string
err string
}{
{
name: "no space (#18810) - can't open/create new bbolt db",
failpoint: "defragOpenFileError",
err: "no space",
},
{
name: "defragdb failure",
failpoint: "defragdbFail",
err: "some random error",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
clus, err := newEtcdProcessCluster(t,
&etcdProcessClusterConfig{
clusterSize: 1,
debug: true,
goFailEnabled: true,
},
)
require.NoError(t, err)
t.Cleanup(func() { clus.Stop() })

member := clus.procs[0]
etcdctl := member.Etcdctl(clientNonTLS, false, false)

require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), tc.failpoint, fmt.Sprintf(`return("%s")`, tc.err)))
require.ErrorContains(t, etcdctl.Defragment(time.Minute), tc.err)

// Make sure etcd continues to run even after the failed defrag attempt
require.NoError(t, etcdctl.Put("foo", "bar"))
value, err := etcdctl.Get("foo")
require.NoError(t, err)
require.Len(t, value.Kvs, 1)
require.Equal(t, "bar", string(value.Kvs[0].Value))
})
}
}
10 changes: 10 additions & 0 deletions tests/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"go.etcd.io/etcd/clientv3"
)
Expand Down Expand Up @@ -141,6 +142,15 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
return nil, spawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev))
}

func (ctl *Etcdctl) Defragment(timeout time.Duration) error {
args := append(ctl.cmdArgs(), "defrag")
if timeout != 0 {
args = append(args, fmt.Sprintf("--command-timeout=%s", timeout))
}

return spawnWithExpect(args, "Finished defragmenting etcd member")
}

func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Expand Down

0 comments on commit 76e2580

Please sign in to comment.