Skip to content

Commit 7fc7ee9

Browse files
committed
fix: rootfs direct provider
1 parent 3e7dfeb commit 7fc7ee9

File tree

16 files changed

+386
-73
lines changed

16 files changed

+386
-73
lines changed

packages/orchestrator/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,4 +253,5 @@ require (
253253
google.golang.org/genproto/googleapis/api v0.0.0-20250908214217-97024824d090 // indirect
254254
google.golang.org/genproto/googleapis/rpc v0.0.0-20250908214217-97024824d090 // indirect
255255
gopkg.in/yaml.v3 v3.0.1 // indirect
256+
lukechampine.com/blake3 v1.4.1
256257
)

packages/orchestrator/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/orchestrator/internal/cfg/model.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type BuilderConfig struct {
2727
TemplateCacheDir string `env:"TEMPLATE_CACHE_DIR,expand" envDefault:"${ORCHESTRATOR_BASE_PATH}/template"`
2828

2929
NetworkConfig network.Config
30+
31+
RootfsChecksumVerification bool `env:"ROOTFS_CHECKSUM_VERIFICATION" envDefault:"false"`
3032
}
3133

3234
func (bc BuilderConfig) GetSandboxCacheDir() string { return bc.SandboxCacheDir }

packages/orchestrator/internal/sandbox/block/cache.go

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,10 @@ import (
1212
"sync/atomic"
1313
"syscall"
1414

15-
"github.com/bits-and-blooms/bitset"
1615
"github.com/edsrzf/mmap-go"
1716
"go.opentelemetry.io/otel"
18-
"go.uber.org/zap"
1917
"golang.org/x/sys/unix"
2018

21-
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
2219
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
2320
)
2421

@@ -118,42 +115,18 @@ func (m *Cache) ExportToDiff(ctx context.Context, out io.Writer) (*header.DiffMe
118115
return nil, fmt.Errorf("error flushing mmap: %w", err)
119116
}
120117

121-
dirty := bitset.New(uint(header.TotalBlocks(m.size, m.blockSize)))
122-
empty := bitset.New(0)
118+
builder := header.NewDiffMetadataBuilder(m.size, m.blockSize)
123119

124-
for _, key := range m.dirtySortedKeys() {
125-
blockIdx := header.BlockIdx(key, m.blockSize)
120+
for _, offset := range m.dirtySortedKeys() {
121+
block := (*m.mmap)[offset : offset+m.blockSize]
126122

127-
block := (*m.mmap)[key : key+m.blockSize]
128-
isEmpty, err := header.IsEmptyBlock(block, m.blockSize)
123+
err := builder.Process(ctx, block, out, offset)
129124
if err != nil {
130-
return nil, fmt.Errorf("error checking empty block: %w", err)
131-
}
132-
if isEmpty {
133-
empty.Set(uint(blockIdx))
134-
135-
continue
136-
}
137-
138-
dirty.Set(uint(blockIdx))
139-
n, err := out.Write(block)
140-
if err != nil {
141-
logger.L().Error(ctx, "error writing to out", zap.Error(err))
142-
143-
return nil, err
144-
}
145-
146-
if int64(n) != m.blockSize {
147-
return nil, fmt.Errorf("short write: %d != %d", int64(n), m.blockSize)
125+
return nil, fmt.Errorf("error processing block %d: %w", offset, err)
148126
}
149127
}
150128

151-
return &header.DiffMetadata{
152-
Dirty: dirty,
153-
Empty: empty,
154-
155-
BlockSize: m.blockSize,
156-
}, nil
129+
return builder.Build()
157130
}
158131

159132
func (m *Cache) ReadAt(b []byte, off int64) (int, error) {
@@ -281,10 +254,6 @@ func (m *Cache) dirtySortedKeys() []int64 {
281254
return keys
282255
}
283256

284-
func (m *Cache) MarkAllAsDirty() {
285-
m.setIsCached(0, m.size)
286-
}
287-
288257
// FileSize returns the size of the cache on disk.
289258
// The size might differ from the dirty size, as it may not be fully on disk.
290259
func (m *Cache) FileSize() (int64, error) {

packages/orchestrator/internal/sandbox/build/build.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro
7171
// We will use this to handle base builds that are already diffs.
7272
// The passed slice p must start as empty, otherwise we would need to copy the empty values there.
7373
if *buildID == uuid.Nil {
74+
isEmpty, err := header.IsEmptyBlock(p[n:int64(n)+readLength], int64(b.header.Metadata.BlockSize))
75+
if err != nil {
76+
return 0, fmt.Errorf("failed to check if block is empty: %w", err)
77+
}
78+
if !isEmpty {
79+
return 0, fmt.Errorf("block is not empty")
80+
}
81+
7482
n += int(readLength)
7583

7684
continue

packages/orchestrator/internal/sandbox/rootfs/direct.go

Lines changed: 124 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,78 @@ package rootfs
22

33
import (
44
"context"
5+
"encoding/hex"
56
"errors"
67
"fmt"
78
"io"
9+
"os"
810
"sync/atomic"
911

12+
"github.com/edsrzf/mmap-go"
1013
"go.uber.org/zap"
14+
"golang.org/x/sys/unix"
1115

16+
"github.com/e2b-dev/infra/packages/orchestrator/internal/cfg"
1217
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
1318
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
1419
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
1520
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
1621
)
1722

1823
type DirectProvider struct {
19-
cache *block.Cache
20-
path string
24+
config cfg.BuilderConfig
25+
26+
header *header.Header
27+
28+
path string
29+
blockSize int64
2130

2231
// TODO: Remove when the snapshot flow is improved
2332
finishedOperations chan struct{}
2433
// TODO: Remove when the snapshot flow is improved
2534
exporting atomic.Bool
35+
36+
mmap *mmap.MMap
2637
}
2738

28-
func NewDirectProvider(rootfs block.ReadonlyDevice, path string) (Provider, error) {
29-
size, err := rootfs.Size()
39+
func NewDirectProvider(config cfg.BuilderConfig, rootfs block.ReadonlyDevice, path string) (Provider, error) {
40+
blockSize := rootfs.BlockSize()
41+
42+
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o644)
3043
if err != nil {
31-
return nil, fmt.Errorf("error getting device size: %w", err)
44+
return nil, fmt.Errorf("error opening file: %w", err)
3245
}
46+
defer f.Close()
3347

34-
blockSize := rootfs.BlockSize()
48+
size, err := rootfs.Size()
49+
if err != nil {
50+
return nil, fmt.Errorf("error getting size: %w", err)
51+
}
3552

36-
cache, err := block.NewCache(size, blockSize, path, true)
53+
mm, err := mmap.MapRegion(f, int(size), unix.PROT_READ|unix.PROT_WRITE, 0, 0)
3754
if err != nil {
38-
return nil, fmt.Errorf("error creating cache: %w", err)
55+
return nil, fmt.Errorf("error mapping region: %w", err)
3956
}
4057

4158
return &DirectProvider{
42-
cache: cache,
43-
path: path,
59+
config: config,
60+
61+
header: rootfs.Header(),
62+
63+
path: path,
64+
blockSize: blockSize,
4465

4566
finishedOperations: make(chan struct{}, 1),
67+
68+
mmap: &mm,
4669
}, nil
4770
}
4871

72+
func (o *DirectProvider) Verify(ctx context.Context) error {
73+
// No verification needed for direct provider for now
74+
return nil
75+
}
76+
4977
func (o *DirectProvider) Start(_ context.Context) error {
5078
return nil
5179
}
@@ -54,15 +82,16 @@ func (o *DirectProvider) ExportDiff(
5482
ctx context.Context,
5583
out io.Writer,
5684
stopSandbox func(context.Context) error,
57-
) (h *header.DiffMetadata, e error) {
85+
) (*header.DiffMetadata, error) {
5886
ctx, childSpan := tracer.Start(ctx, "direct-provider-export")
5987
defer childSpan.End()
6088

6189
o.exporting.CompareAndSwap(false, true)
6290

6391
defer func() {
64-
if e != nil {
65-
e = errors.Join(e, o.cache.Close())
92+
err := o.mmap.Unmap()
93+
if err != nil {
94+
logger.L().Error(ctx, "error unmapping mmap", zap.Error(err))
6695
}
6796
}()
6897

@@ -81,10 +110,22 @@ func (o *DirectProvider) ExportDiff(
81110
}
82111
telemetry.ReportEvent(ctx, "sandbox stopped")
83112

84-
o.cache.MarkAllAsDirty()
85-
m, err := o.cache.ExportToDiff(ctx, out)
113+
m, err := o.exportToDiff(ctx, out)
86114
if err != nil {
87-
return nil, fmt.Errorf("error exporting cache: %w", err)
115+
return nil, fmt.Errorf("error building diff metadata: %w", err)
116+
}
117+
118+
if o.config.RootfsChecksumVerification {
119+
checksums, err := calculateChecksums(ctx, o.path, o.blockSize)
120+
if err != nil {
121+
return nil, fmt.Errorf("error calculating checksum: %w", err)
122+
}
123+
124+
logger.L().Debug(ctx, "exported rootfs checksum direct",
125+
zap.String("checksum", hex.EncodeToString(checksums.Checksum[:])),
126+
)
127+
128+
m.Checksums = &checksums
88129
}
89130

90131
telemetry.ReportEvent(ctx, "cache exported")
@@ -99,13 +140,78 @@ func (o *DirectProvider) Close(ctx context.Context) error {
99140
return nil
100141
}
101142

102-
return errors.Join(o.sync(ctx), o.cache.Close())
143+
return errors.Join(o.sync(ctx), o.mmap.Unmap())
103144
}
104145

105146
func (o *DirectProvider) Path() (string, error) {
106147
return o.path, nil
107148
}
108149

150+
func (o *DirectProvider) exportToDiff(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) {
151+
err := o.sync(ctx)
152+
if err != nil {
153+
return nil, fmt.Errorf("error flushing path: %w", err)
154+
}
155+
156+
builder := header.NewDiffMetadataBuilder(int64(o.header.Metadata.Size), o.blockSize)
157+
158+
f, err := os.Open(o.path)
159+
if err != nil {
160+
return nil, fmt.Errorf("error opening path: %w", err)
161+
}
162+
defer f.Close()
163+
164+
block := make([]byte, o.blockSize)
165+
for i := int64(0); i < int64(o.header.Metadata.Size); i += o.blockSize {
166+
n, err := f.ReadAt(block, i)
167+
if err != nil {
168+
return nil, fmt.Errorf("error reading from file: %w", err)
169+
}
170+
171+
err = builder.Process(ctx, block[:n], out, i)
172+
if err != nil {
173+
return nil, fmt.Errorf("error processing block %d: %w", i, err)
174+
}
175+
}
176+
177+
m, err := builder.Build()
178+
if err != nil {
179+
return nil, fmt.Errorf("error building diff metadata: %w", err)
180+
}
181+
182+
return m, nil
183+
}
184+
109185
func (o *DirectProvider) sync(ctx context.Context) error {
110-
return errors.Join(o.cache.Sync(), flush(ctx, o.path))
186+
err := o.mmap.Flush()
187+
if err != nil {
188+
return fmt.Errorf("error flushing mmap: %w", err)
189+
}
190+
191+
return flush(ctx, o.path)
192+
}
193+
194+
type FileCtx struct {
195+
*os.File
196+
}
197+
198+
func (f *FileCtx) ReadAt(_ context.Context, p []byte, off int64) (int, error) {
199+
return f.File.ReadAt(p, off)
200+
}
201+
202+
func calculateChecksums(ctx context.Context, path string, blockSize int64) (header.Checksums, error) {
203+
f, err := os.Open(path)
204+
if err != nil {
205+
return header.Checksums{}, fmt.Errorf("error opening path: %w", err)
206+
}
207+
defer f.Close()
208+
209+
size, err := f.Stat()
210+
if err != nil {
211+
return header.Checksums{}, fmt.Errorf("error getting file size: %w", err)
212+
}
213+
214+
fctx := &FileCtx{File: f}
215+
216+
return CalculateChecksumsReader(ctx, fctx, int64(size.Size()), blockSize)
111217
}

0 commit comments

Comments
 (0)