Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creation time is added to wal records #17233

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ mvccpb.KeyValue.value: ""
mvccpb.KeyValue.version: ""
walpb.Record: ""
walpb.Record.crc: ""
walpb.Record.created_at: "3.6"
walpb.Record.data: ""
walpb.Record.type: ""
walpb.Snapshot: ""
Expand Down
2 changes: 1 addition & 1 deletion server/storage/wal/repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestRepairWriteTearLast(t *testing.T) {
}
return f.Truncate(offset)
}
testRepair(t, makeEnts(50), corruptf, 40)
testRepair(t, makeEnts(50), corruptf, 29)
}

// TestRepairWriteTearMiddle repairs the WAL when there is write tearing
Expand Down
25 changes: 18 additions & 7 deletions server/storage/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"github.com/jonboulle/clockwork"

"go.uber.org/zap"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
Expand Down Expand Up @@ -92,12 +94,14 @@ type WAL struct {

locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
fp *filePipeline

clock clockwork.Clock
}

// Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved with ReadAll
// after the file is Open.
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
func Create(lg *zap.Logger, dirpath string, metadata []byte, walClock ...clockwork.Clock) (*WAL, error) {
if Exist(dirpath) {
return nil, os.ErrExist
}
Expand Down Expand Up @@ -158,6 +162,12 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
dir: dirpath,
metadata: metadata,
}
if walClock != nil {
w.clock = walClock[0]
} else {
w.clock = clockwork.NewRealClock()
}

w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
Expand All @@ -166,7 +176,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
if err = w.saveCrc(0); err != nil {
return nil, err
}
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata}); err != nil {
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata, CreatedAt: w.clock.Now().Unix()}); err != nil {
return nil, err
}
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
Expand Down Expand Up @@ -354,6 +364,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
decoder: NewDecoder(rs...),
readClose: closer,
locks: ls,
clock: clockwork.NewRealClock(),
}

if write {
Expand Down Expand Up @@ -748,7 +759,7 @@ func (w *WAL) cut() error {
return err
}

if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata}); err != nil {
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata, CreatedAt: w.clock.Now().Unix()}); err != nil {
return err
}

Expand Down Expand Up @@ -905,7 +916,7 @@ func (w *WAL) Close() error {
func (w *WAL) saveEntry(e *raftpb.Entry) error {
// TODO: add MustMarshalTo to reduce one allocation.
b := pbutil.MustMarshal(e)
rec := &walpb.Record{Type: EntryType, Data: b}
rec := &walpb.Record{Type: EntryType, Data: b, CreatedAt: w.clock.Now().Unix()}
if err := w.encoder.encode(rec); err != nil {
return err
}
Expand All @@ -919,7 +930,7 @@ func (w *WAL) saveState(s *raftpb.HardState) error {
}
w.state = *s
b := pbutil.MustMarshal(s)
rec := &walpb.Record{Type: StateType, Data: b}
rec := &walpb.Record{Type: StateType, Data: b, CreatedAt: w.clock.Now().Unix()}
return w.encoder.encode(rec)
}

Expand Down Expand Up @@ -971,7 +982,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
w.mu.Lock()
defer w.mu.Unlock()

rec := &walpb.Record{Type: SnapshotType, Data: b}
rec := &walpb.Record{Type: SnapshotType, Data: b, CreatedAt: w.clock.Now().Unix()}
if err := w.encoder.encode(rec); err != nil {
return err
}
Expand All @@ -983,7 +994,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
}

func (w *WAL) saveCrc(prevCrc uint32) error {
return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc})
return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc, CreatedAt: w.clock.Now().Unix()})
}

func (w *WAL) tail() *fileutil.LockedFile {
Expand Down
13 changes: 8 additions & 5 deletions server/storage/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"strings"
"testing"

"github.com/jonboulle/clockwork"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand All @@ -49,7 +51,7 @@ var (
func TestNew(t *testing.T) {
p := t.TempDir()

w, err := Create(zaptest.NewLogger(t), p, []byte("somedata"))
w, err := Create(zaptest.NewLogger(t), p, []byte("somedata"), clockwork.NewFakeClock())
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
Expand All @@ -75,17 +77,18 @@ func TestNew(t *testing.T) {

var wb bytes.Buffer
e := newEncoder(&wb, 0, 0)
err = e.encode(&walpb.Record{Type: CrcType, Crc: 0})
err = e.encode(&walpb.Record{Type: CrcType, Crc: 0, CreatedAt: w.clock.Now().Unix()})
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata")})
err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata"), CreatedAt: w.clock.Now().Unix()})
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
r := &walpb.Record{
Type: SnapshotType,
Data: pbutil.MustMarshal(&walpb.Snapshot{}),
Type: SnapshotType,
Data: pbutil.MustMarshal(&walpb.Snapshot{}),
CreatedAt: w.clock.Now().Unix(),
}
if err = e.encode(r); err != nil {
t.Fatalf("err = %v, want nil", err)
Expand Down
64 changes: 46 additions & 18 deletions server/storage/wal/walpb/record.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions server/storage/wal/walpb/record.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package walpb;

import "gogoproto/gogo.proto";
import "raftpb/raft.proto";
import "etcd/api/versionpb/version.proto";

option go_package = "go.etcd.io/etcd/server/v3/storage/wal/walpb";

Expand All @@ -15,6 +16,7 @@ message Record {
optional int64 type = 1 [(gogoproto.nullable) = false];
optional uint32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
optional int64 created_at = 4 [(gogoproto.nullable) = false, (versionpb.etcd_version_field)="3.6"];
}

// Keep in sync with raftpb.SnapshotMetadata.
Expand Down