Skip to content

Commit c50b0ba

Browse files
pkg/wal: Implement a Write-Ahead-Log
This is a simple implementation of a Write-Ahead-Log (WAL). A mechanism typically used in databases to ensure durability of data. The Cilium agent sometimes needs to persist data across restarts. A naive solution is to write the full state to disk. However, re-writing the full state is expensive and slow. You can do it periodically instead of on every change to lower the cost, but this risks losing recent changes in the event of a crash, which is unacceptable in certain scenarios. A WAL solves this problem by appending changes in a log like manner. By flushing the changes to disk we ensure the latest events are recorded at a fraction of the cost. A side effect is that you need to "replay" the events to process modifications and deletions. To prevent the log from growing indefinitely, it needs to be compacted periodically. This WAL is implemented currently as a single file containing length- value encoded events. An event is a byte slice, marshalling and unmarshaling of these is up to the caller. Signed-off-by: Dylan Reimerink <dylan.reimerink@isovalent.com>
1 parent 23db74a commit c50b0ba

File tree

3 files changed

+514
-0
lines changed

3 files changed

+514
-0
lines changed

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,7 @@ Makefile* @cilium/build
649649
/pkg/util/ @cilium/sig-datapath
650650
/pkg/version/ @cilium/sig-agent
651651
/pkg/versioncheck/ @cilium/sig-agent
652+
/pkg/wal/ @cilium/sig-foundations
652653
/pkg/wireguard @cilium/wireguard
653654
/pkg/xds/ @cilium/envoy
654655
/plugins/cilium-cni/ @cilium/sig-k8s

pkg/wal/wal.go

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Authors of Cilium
3+
4+
package wal
5+
6+
import (
7+
"bytes"
8+
"encoding"
9+
"encoding/binary"
10+
"fmt"
11+
"io"
12+
"iter"
13+
"os"
14+
"strconv"
15+
"strings"
16+
17+
"github.com/cilium/cilium/pkg/lock"
18+
)
19+
20+
type Event interface {
21+
encoding.BinaryMarshaler
22+
}
23+
24+
// Read reads all events from the WAL at logPath using the provided unmarshaller function.
25+
func Read[T Event](logPath string, unmarshaller func(data []byte) (T, error)) (iter.Seq2[T, error], error) {
26+
file, err := os.OpenFile(logPath, os.O_RDONLY, 0600)
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
r := &lvReader{r: file}
32+
return func(yield func(T, error) bool) {
33+
defer file.Close()
34+
35+
for data := range r.Events() {
36+
e, err := unmarshaller(data)
37+
if !yield(e, err) {
38+
return
39+
}
40+
}
41+
}, nil
42+
}
43+
44+
type Writer[T Event] struct {
45+
logPath string
46+
47+
mu lock.Mutex
48+
log *os.File
49+
}
50+
51+
// NewWriter creates a new WAL writer for events of type T at the specified logPath.
52+
// The log file is created if it does not exist, and truncated if it does.
53+
func NewWriter[T Event](logPath string) (*Writer[T], error) {
54+
w := &Writer[T]{
55+
logPath: logPath,
56+
}
57+
58+
// Open the log file, create it if it doesn't exist, and truncate it to start fresh.
59+
if log, err := w.open(logPath, true); err != nil {
60+
return nil, err
61+
} else {
62+
w.log = log
63+
}
64+
65+
return w, nil
66+
}
67+
68+
func (w *Writer[T]) open(path string, truncate bool) (*os.File, error) {
69+
flags := os.O_WRONLY | os.O_CREATE | os.O_APPEND
70+
if truncate {
71+
flags |= os.O_TRUNC
72+
}
73+
74+
log, err := os.OpenFile(path, flags, 0600)
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
return log, nil
80+
}
81+
82+
type BatchError struct {
83+
Index int
84+
Err error
85+
}
86+
87+
func (be BatchError) Error() string {
88+
return strconv.Itoa(be.Index) + ": " + be.Err.Error()
89+
}
90+
91+
type BatchErrors []BatchError
92+
93+
func (be BatchErrors) Error() string {
94+
var builder strings.Builder
95+
for i, e := range be {
96+
if i > 0 {
97+
builder.WriteString("; ")
98+
}
99+
builder.WriteString(e.Error())
100+
}
101+
return builder.String()
102+
}
103+
104+
// Write appends an event to the WAL. Data is flushed to disk before returning.
105+
func (w *Writer[T]) Write(e ...T) error {
106+
w.mu.Lock()
107+
defer w.mu.Unlock()
108+
109+
if w.log == nil {
110+
return fmt.Errorf("wal closed")
111+
}
112+
113+
var ba BatchErrors
114+
for i, e := range e {
115+
data, err := e.MarshalBinary()
116+
if err != nil {
117+
ba = append(ba, BatchError{Index: i, Err: err})
118+
continue
119+
}
120+
121+
lv := &lvWriter{w: w.log}
122+
if err := lv.Write(data); err != nil {
123+
ba = append(ba, BatchError{Index: i, Err: err})
124+
continue
125+
}
126+
}
127+
128+
// Ensure the data is flushed to disk.
129+
if err := w.log.Sync(); err != nil {
130+
return err
131+
}
132+
133+
if len(ba) > 0 {
134+
return ba
135+
}
136+
return nil
137+
}
138+
139+
// Compact rewrites the WAL to contain only the provided events, removing any redundant or obsolete entries.
140+
func (w *Writer[T]) Compact(all iter.Seq[T]) error {
141+
w.mu.Lock()
142+
defer w.mu.Unlock()
143+
144+
// Create a new temporary log file.
145+
tmpPath := w.logPath + ".tmp"
146+
tmpLog, err := w.open(tmpPath, true)
147+
if err != nil {
148+
return err
149+
}
150+
151+
// Write all events to the temporary log file.
152+
lv := &lvWriter{w: tmpLog}
153+
for e := range all {
154+
data, err := e.MarshalBinary()
155+
if err != nil {
156+
tmpLog.Close()
157+
os.Remove(tmpLog.Name())
158+
return err
159+
}
160+
161+
if err := lv.Write(data); err != nil {
162+
tmpLog.Close()
163+
os.Remove(tmpLog.Name())
164+
return err
165+
}
166+
}
167+
168+
// Ensure the temporary log file is flushed to disk.
169+
if err := tmpLog.Sync(); err != nil {
170+
tmpLog.Close()
171+
os.Remove(tmpLog.Name())
172+
return err
173+
}
174+
175+
// Close the temporary log file.
176+
if err := tmpLog.Close(); err != nil {
177+
os.Remove(tmpLog.Name())
178+
return err
179+
}
180+
181+
// Close the current log file.
182+
if err := w.close(); err != nil {
183+
return err
184+
}
185+
186+
// Replace the current log file with the temporary log file.
187+
if err := os.Rename(tmpPath, w.logPath); err != nil {
188+
return err
189+
}
190+
191+
// Re-open the log file for appending.
192+
log, err := w.open(w.logPath, false)
193+
if err != nil {
194+
return err
195+
}
196+
w.log = log
197+
return nil
198+
}
199+
200+
func (w *Writer[T]) close() error {
201+
var err error
202+
if w.log != nil {
203+
err = w.log.Close()
204+
w.log = nil
205+
}
206+
return err
207+
}
208+
209+
func (w *Writer[T]) Close() error {
210+
w.mu.Lock()
211+
defer w.mu.Unlock()
212+
213+
return w.close()
214+
}
215+
216+
// Length-Value writer
217+
type lvWriter struct {
218+
w io.Writer
219+
}
220+
221+
func (w *lvWriter) Write(e []byte) error {
222+
var buf [8]byte
223+
binary.LittleEndian.PutUint64(buf[:], uint64(len(e)))
224+
n, err := w.w.Write(buf[:])
225+
if err != nil {
226+
return err
227+
}
228+
if n != len(buf) {
229+
return io.ErrShortWrite
230+
}
231+
232+
_, err = io.Copy(w.w, bytes.NewReader(e))
233+
return err
234+
}
235+
236+
// Length-Value reader
237+
type lvReader struct {
238+
r io.Reader
239+
}
240+
241+
func (r *lvReader) Events() iter.Seq[[]byte] {
242+
return func(yield func([]byte) bool) {
243+
for {
244+
var buf [8]byte
245+
n, err := r.r.Read(buf[:])
246+
if err != nil {
247+
return
248+
}
249+
if n != len(buf) {
250+
return
251+
}
252+
253+
dataLen := int(binary.LittleEndian.Uint64(buf[:]))
254+
if dataLen == 0 {
255+
if !yield([]byte{}) {
256+
return
257+
}
258+
continue
259+
}
260+
261+
dataBuf := make([]byte, dataLen)
262+
var read int
263+
for {
264+
n, err = r.r.Read(dataBuf[read:])
265+
if err != nil {
266+
if err == io.EOF {
267+
break
268+
}
269+
return
270+
}
271+
if n == 0 {
272+
return
273+
}
274+
read += n
275+
if read >= dataLen {
276+
break
277+
}
278+
}
279+
if !yield(dataBuf) {
280+
return
281+
}
282+
}
283+
}
284+
}

0 commit comments

Comments
 (0)