Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
}
if opts.WALFailover != nil {
walOpts.Secondary = opts.WALFailover.Secondary
walOpts.Secondary.ID = opts.WALFailover.Secondary.ID
// Lock the secondary WAL directory, if distinct from the data directory
// and primary WAL directory.
if secondaryWalDirName != dirname && secondaryWalDirName != walDirname {
Expand Down Expand Up @@ -424,10 +425,19 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
}
}

if opts.WALFailover != nil {
walDir, err := wal.ValidateOrInitWALDir(walOpts.Secondary)
if err != nil {
return nil, err
}
walOpts.Secondary = walDir
opts.WALFailover.Secondary.ID = walDir.ID
}
walManager, err := wal.Init(walOpts, retainedWALs)
if err != nil {
return nil, err
}

defer maybeCleanUp(walManager.Close)
d.mu.log.manager = walManager

Expand Down
3 changes: 3 additions & 0 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ func TestOpen_WALFailover(t *testing.T) {
if o.FS == nil {
return "no path"
}
// Set a constant identifier for testing to avoid flaky tests
wal.SetGenerateStableIdentifierForTesting("9f69f2c3ffb3c247767290a9b3215fc5")
defer wal.ResetGenerateStableIdentifierForTesting()
d, err := Open(dataDir, o)
if err != nil {
return err.Error()
Expand Down
57 changes: 57 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/crlib/fifo"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/humanize"
Expand Down Expand Up @@ -1863,6 +1864,9 @@ func (o *Options) String() string {
fmt.Fprintf(&buf, "\n")
fmt.Fprintf(&buf, "[WAL Failover]\n")
fmt.Fprintf(&buf, " secondary_dir=%s\n", o.WALFailover.Secondary.Dirname)
if o.WALFailover.Secondary.ID != "" {
fmt.Fprintf(&buf, " secondary_identifier=%s\n", o.WALFailover.Secondary.ID)
}
fmt.Fprintf(&buf, " primary_dir_probe_interval=%s\n", o.WALFailover.FailoverOptions.PrimaryDirProbeInterval)
fmt.Fprintf(&buf, " healthy_probe_latency_threshold=%s\n", o.WALFailover.FailoverOptions.HealthyProbeLatencyThreshold)
fmt.Fprintf(&buf, " healthy_interval=%s\n", o.WALFailover.FailoverOptions.HealthyInterval)
Expand Down Expand Up @@ -2317,6 +2321,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
switch key {
case "secondary_dir":
o.WALFailover.Secondary = wal.Dir{Dirname: value, FS: vfs.Default}
case "secondary_identifier":
o.WALFailover.Secondary.ID = value
case "primary_dir_probe_interval":
o.WALFailover.PrimaryDirProbeInterval, err = time.ParseDuration(value)
case "healthy_probe_latency_threshold":
Expand Down Expand Up @@ -2435,6 +2441,21 @@ func (e ErrMissingWALRecoveryDir) Error() string {
return fmt.Sprintf("directory %q may contain relevant WALs but is not in WALRecoveryDirs%s", e.Dir, e.ExtraInfo)
}

// ErrSecondaryIdentifierMismatch is an error returned when the secondary directory
// identifier doesn't match the expected identifier, indicating the wrong disk
// may have been mounted at the expected path.
type ErrSecondaryIdentifierMismatch struct {
ExpectedIdentifier string
ActualIdentifier string
SecondaryDir string
}

// Error implements error.
func (e ErrSecondaryIdentifierMismatch) Error() string {
return fmt.Sprintf("secondary directory %q has identifier %q but expected %q - wrong disk may be mounted",
e.SecondaryDir, e.ActualIdentifier, e.ExpectedIdentifier)
}

// CheckCompatibility verifies the options are compatible with the previous options
// serialized by Options.String(). For example, the Comparer and Merger must be
// the same, or data will not be able to be properly read from the DB.
Expand Down Expand Up @@ -2502,6 +2523,12 @@ func (o *Options) checkWALDir(storeDir, walDir, errContext string) error {
for _, d := range o.WALRecoveryDirs {
// TODO(radu): should we also check that d.FS is the same as walDir's FS?
if walPath == resolveStorePath(storeDir, d.Dirname) {
if d.ID != "" {
if err := o.validateWALRecoveryDirIdentifier(d); err != nil {
return err
}

}
return nil
}
}
Expand Down Expand Up @@ -2789,3 +2816,33 @@ func resolveStorePath(storeDir, path string) string {
}
return path
}

// validateWALRecoveryDirIdentifier validates that the identifier in the
// provided wal.Dir matches the expected ID encoded in the OPTIONS file to
// ensure that we're using the correct directory.
func (o *Options) validateWALRecoveryDirIdentifier(d wal.Dir) error {
identifierFile := d.FS.PathJoin(d.Dirname, "failover_identifier")
f, err := d.FS.Open(identifierFile)
if err != nil {
if oserror.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()

existingIdentifier, err := io.ReadAll(f)
if err != nil {
return err
}

if err != nil {
return errors.Newf("failed to read secondary identifier from WALRecoveryDir %q: %v",
d.Dirname, err)
}
if strings.TrimSpace(string(existingIdentifier)) != d.ID {
return errors.Newf("WALRecoveryDir %q has identifier %q but expected %q",
d.Dirname, existingIdentifier, d.ID)
}
return nil
}
49 changes: 48 additions & 1 deletion options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"bytes"
"fmt"
"io"
"math/rand/v2"
"runtime"
"strings"
Expand Down Expand Up @@ -259,7 +260,7 @@ func TestOptionsCheckCompatibility(t *testing.T) {

// Check that an OPTIONS file that configured an explicit WALDir that will
// no longer be used errors if it's not also present in WALRecoveryDirs.
//require.Equal(t, ErrMissingWALRecoveryDir{Dir: "external-wal-dir"},
// require.Equal(t, ErrMissingWALRecoveryDir{Dir: "external-wal-dir"},
err := DefaultOptions().CheckCompatibility(storeDir, `
[Options]
wal_dir=external-wal-dir
Expand Down Expand Up @@ -339,6 +340,52 @@ func TestOptionsCheckCompatibility(t *testing.T) {
`))
}

func TestWALRecoveryDirValidation(t *testing.T) {
storeDir := "/mnt/foo"
mem := vfs.NewMem()
recoveryDir := "/mnt/wrong-disk-dir"
err := mem.MkdirAll(recoveryDir, 0755)
require.NoError(t, err)

// Create failover_identifier file with different ID.
identifierFile := mem.PathJoin(recoveryDir, "failover_identifier")
wrongID := "11111111111111111111111111111111"
err = writeTestIdentifierToFile(mem, identifierFile, wrongID)
require.NoError(t, err)

opts := &Options{
FS: mem,
WALRecoveryDirs: []wal.Dir{
{
FS: mem,
Dirname: recoveryDir,
ID: "22222222222222222222222222222222",
},
},
}
opts.EnsureDefaults()

err = opts.checkWALDir(storeDir, recoveryDir, "test context")
require.Error(t, err)
require.Contains(t, err.Error(), "has identifier \"11111111111111111111111111111111\" but expected \"22222222222222222222222222222222\"")
}

// writeTestIdentifierToFile is a helper function to write an identifier to a file
func writeTestIdentifierToFile(fs vfs.FS, filename, identifier string) error {
f, err := fs.Create(filename, "pebble-wal")
if err != nil {
return err
}
defer f.Close()

_, err = io.WriteString(f, identifier)
if err != nil {
return err
}

return f.Sync()
}

type testCleaner struct{}

func (testCleaner) Clean(fs vfs.FS, fileType base.FileType, path string) error {
Expand Down
1 change: 1 addition & 0 deletions testdata/open_wal_failover
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ list path=(a,data)
grep-between path=(a,data/OPTIONS-000007) start=(\[WAL Failover\]) end=^$
----
secondary_dir=secondary-wals
secondary_identifier=9f69f2c3ffb3c247767290a9b3215fc5
primary_dir_probe_interval=1s
healthy_probe_latency_threshold=25ms
healthy_interval=15s
Expand Down
117 changes: 115 additions & 2 deletions wal/failover_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ package wal

import (
"cmp"
crand "crypto/rand"
"fmt"
"io"
"math/rand/v2"
mathrand "math/rand/v2"
"os"
"slices"
"strings"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -52,6 +55,71 @@ const probeHistoryLength = 128
// Large value.
const failedProbeDuration = 24 * 60 * 60 * time.Second

// For testing, generateStableIdentifierForTesting can be overridden to return
// a constant value when we generate stable identifiers.
var generateStableIdentifierForTesting = ""

// SetGenerateStableIdentifierForTesting sets a constant identifier for testing.
// This should only be used in tests to avoid flaky behavior.
func SetGenerateStableIdentifierForTesting(identifier string) {
generateStableIdentifierForTesting = identifier
}

// ResetGenerateStableIdentifierForTesting resets the testing override.
// This should only be used in tests.
func ResetGenerateStableIdentifierForTesting() {
generateStableIdentifierForTesting = ""
}

// generateStableIdentifier generates a random hex string from 16 bytes.
func generateStableIdentifier() (string, error) {
// For testing, return a constant value if set.
if generateStableIdentifierForTesting != "" {
return generateStableIdentifierForTesting, nil
}

var uuid [16]byte
if _, err := crand.Read(uuid[:]); err != nil {
return "", err
}
return fmt.Sprintf("%x", uuid), nil
}

// readSecondaryIdentifier reads the identifier from the secondary directory.
func readSecondaryIdentifier(fs vfs.FS, identifierFile string) (string, error) {
f, err := fs.Open(identifierFile)
if err != nil {
if oserror.IsNotExist(err) {
return "", nil
}
return "", err
}
defer f.Close()

data, err := io.ReadAll(f)
if err != nil {
return "", err
}

// Trim whitespace and return the identifier.
return strings.TrimSpace(string(data)), nil
}

// writeSecondaryIdentifier writes the identifier to the secondary directory.
func writeSecondaryIdentifier(fs vfs.FS, identifierFile string, identifier string) error {
f, err := fs.Create(identifierFile, "pebble-wal")
if err != nil {
return err
}

if _, err := io.WriteString(f, identifier); err != nil {
f.Close()
return err
}

return errors.CombineErrors(f.Sync(), f.Close())
}

// init takes a stopper in order to connect the dirProber's long-running
// goroutines with the stopper's wait group, but the dirProber has its own
// stop() method that should be invoked to trigger the shutdown.
Expand All @@ -73,7 +141,7 @@ func (p *dirProber) init(
}
// Random bytes for writing, to defeat any FS compression optimization.
for i := range p.buf {
p.buf[i] = byte(rand.Uint32())
p.buf[i] = byte(mathrand.Uint32())
}
// dirProber has an explicit stop() method instead of listening on
// stopper.shouldQuiesce. This structure helps negotiate the shutdown
Expand Down Expand Up @@ -538,6 +606,46 @@ func (wm *failoverManager) init(o Options, initial Logs) error {
return nil
}

// ValidateOrInitWALDir manages the secondary directory identifier for
// failover validation. It ensures the correct secondary directory is mounted
// by validating or generating a stable identifier.
func ValidateOrInitWALDir(walDir Dir) (Dir, error) {
identifierFile := walDir.FS.PathJoin(walDir.Dirname, "failover_identifier")
// If we have an identifier from the OPTIONS file, validate it matches what's
// in the directory.
if walDir.ID != "" {
existingIdentifier, err := readSecondaryIdentifier(walDir.FS, identifierFile)
if err != nil {
return Dir{}, errors.Newf("failed to read secondary identifier: %v", err)
}
// Not the same identifier, wrong disk may be mounted.
if existingIdentifier != walDir.ID {
return Dir{}, errors.Newf("secondary directory %q has identifier %q but expected %q - wrong disk may be mounted",
walDir.Dirname, existingIdentifier, walDir.ID)
}
} else {
// No identifier in OPTIONS file, check if one exists in the directory.
existingIdentifier, err := readSecondaryIdentifier(walDir.FS, identifierFile)
if err != nil {
return Dir{}, errors.Newf("failed to read secondary identifier: %v", err)
}
if existingIdentifier == "" {
// Generate a new identifier.
identifier, err := generateStableIdentifier()
if err != nil {
return Dir{}, errors.Newf("failed to generate UUID: %v", err)
}
if err := writeSecondaryIdentifier(walDir.FS, identifierFile, identifier); err != nil {
return Dir{}, errors.Newf("failed to write secondary identifier: %v", err)
}
walDir.ID = identifier
} else {
walDir.ID = existingIdentifier
}
}
return walDir, nil
}

// List implements Manager.
func (wm *failoverManager) List() Logs {
wm.mu.Lock()
Expand Down Expand Up @@ -843,6 +951,11 @@ func (wm *failoverManager) logCreator(
return logFile, 0, err
}

// Opts implements Manager.
func (wm *failoverManager) Opts() Options {
return wm.opts
}

type stopper struct {
quiescer chan struct{} // Closed when quiescing
wg sync.WaitGroup
Expand Down
5 changes: 5 additions & 0 deletions wal/standalone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ func (m *StandaloneManager) Close() error {
return err
}

// Opts implements Manager.
func (m *StandaloneManager) Opts() Options {
return m.o
}

// RecyclerForTesting implements Manager.
func (m *StandaloneManager) RecyclerForTesting() *LogRecycler {
return &m.recycler
Expand Down
Loading