Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/orchestrator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ require (
google.golang.org/grpc v1.75.1
google.golang.org/protobuf v1.36.9
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
lukechampine.com/blake3 v1.4.1
)

require (
Expand Down
2 changes: 2 additions & 0 deletions packages/orchestrator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/orchestrator/internal/cfg/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type BuilderConfig struct {
TemplateCacheDir string `env:"TEMPLATE_CACHE_DIR,expand" envDefault:"${ORCHESTRATOR_BASE_PATH}/template"`

NetworkConfig network.Config

RootfsChecksumVerification bool `env:"ROOTFS_CHECKSUM_VERIFICATION" envDefault:"false"`
}

func (bc BuilderConfig) GetSandboxCacheDir() string { return bc.SandboxCacheDir }
Expand Down
43 changes: 6 additions & 37 deletions packages/orchestrator/internal/sandbox/block/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@ import (
"sync/atomic"
"syscall"

"github.com/bits-and-blooms/bitset"
"github.com/edsrzf/mmap-go"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"golang.org/x/sys/unix"

"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

Expand Down Expand Up @@ -118,42 +115,18 @@ func (m *Cache) ExportToDiff(ctx context.Context, out io.Writer) (*header.DiffMe
return nil, fmt.Errorf("error flushing mmap: %w", err)
}

dirty := bitset.New(uint(header.TotalBlocks(m.size, m.blockSize)))
empty := bitset.New(0)
builder := header.NewDiffMetadataBuilder(m.size, m.blockSize)

for _, key := range m.dirtySortedKeys() {
blockIdx := header.BlockIdx(key, m.blockSize)
for _, offset := range m.dirtySortedKeys() {
block := (*m.mmap)[offset : offset+m.blockSize]

block := (*m.mmap)[key : key+m.blockSize]
isEmpty, err := header.IsEmptyBlock(block, m.blockSize)
err := builder.Process(ctx, block, out, offset)
if err != nil {
return nil, fmt.Errorf("error checking empty block: %w", err)
}
if isEmpty {
empty.Set(uint(blockIdx))

continue
}

dirty.Set(uint(blockIdx))
n, err := out.Write(block)
if err != nil {
logger.L().Error(ctx, "error writing to out", zap.Error(err))

return nil, err
}

if int64(n) != m.blockSize {
return nil, fmt.Errorf("short write: %d != %d", int64(n), m.blockSize)
return nil, fmt.Errorf("error processing block %d: %w", offset, err)
}
}

return &header.DiffMetadata{
Dirty: dirty,
Empty: empty,

BlockSize: m.blockSize,
}, nil
return builder.Build()
}

func (m *Cache) ReadAt(b []byte, off int64) (int, error) {
Expand Down Expand Up @@ -281,10 +254,6 @@ func (m *Cache) dirtySortedKeys() []int64 {
return keys
}

func (m *Cache) MarkAllAsDirty() {
m.setIsCached(0, m.size)
}

// FileSize returns the size of the cache on disk.
// The size might differ from the dirty size, as it may not be fully on disk.
func (m *Cache) FileSize() (int64, error) {
Expand Down
8 changes: 8 additions & 0 deletions packages/orchestrator/internal/sandbox/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro
// We will use this to handle base builds that are already diffs.
// The passed slice p must start as empty, otherwise we would need to copy the empty values there.
if *buildID == uuid.Nil {
isEmpty, err := header.IsEmptyBlock(p[n:int64(n)+readLength], int64(b.header.Metadata.BlockSize))
Copy link
Contributor Author

@dobrac dobrac Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might not be needed, it's just check to be sure that the block is empty

if err != nil {
return 0, fmt.Errorf("failed to check if block is empty: %w", err)
}
if !isEmpty {
return 0, fmt.Errorf("block is not empty")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Empty block check fails for non-blockSize reads

The IsEmptyBlock function compares the input slice against a pre-allocated empty buffer of exactly blockSize bytes using bytes.Equal. However, the slice passed here has length readLength (from p[n:int64(n)+readLength]), which can differ from blockSize when reads span multiple mappings or aren't block-aligned. When readLength != blockSize, bytes.Equal will return false due to the length mismatch, incorrectly reporting non-empty blocks and causing spurious errors even when the slice contains all zeros.

Fix in Cursor Fix in Web


n += int(readLength)

continue
Expand Down
142 changes: 124 additions & 18 deletions packages/orchestrator/internal/sandbox/rootfs/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,78 @@ package rootfs

import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"sync/atomic"

"github.com/edsrzf/mmap-go"
"go.uber.org/zap"
"golang.org/x/sys/unix"

"github.com/e2b-dev/infra/packages/orchestrator/internal/cfg"
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
)

type DirectProvider struct {
cache *block.Cache
path string
config cfg.BuilderConfig

header *header.Header

path string
blockSize int64

// TODO: Remove when the snapshot flow is improved
finishedOperations chan struct{}
// TODO: Remove when the snapshot flow is improved
exporting atomic.Bool

mmap *mmap.MMap
}

func NewDirectProvider(rootfs block.ReadonlyDevice, path string) (Provider, error) {
size, err := rootfs.Size()
func NewDirectProvider(config cfg.BuilderConfig, rootfs block.ReadonlyDevice, path string) (Provider, error) {
blockSize := rootfs.BlockSize()

f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o644)
if err != nil {
return nil, fmt.Errorf("error getting device size: %w", err)
return nil, fmt.Errorf("error opening file: %w", err)
}
defer f.Close()

blockSize := rootfs.BlockSize()
size, err := rootfs.Size()
if err != nil {
return nil, fmt.Errorf("error getting size: %w", err)
}

cache, err := block.NewCache(size, blockSize, path, true)
mm, err := mmap.MapRegion(f, int(size), unix.PROT_READ|unix.PROT_WRITE, 0, 0)
if err != nil {
return nil, fmt.Errorf("error creating cache: %w", err)
return nil, fmt.Errorf("error mapping region: %w", err)
}

return &DirectProvider{
cache: cache,
path: path,
config: config,

header: rootfs.Header(),

path: path,
blockSize: blockSize,

finishedOperations: make(chan struct{}, 1),

mmap: &mm,
}, nil
}

func (o *DirectProvider) Verify(_ context.Context) error {
// No verification needed for direct provider for now
return nil
}

func (o *DirectProvider) Start(_ context.Context) error {
return nil
}
Expand All @@ -54,15 +82,16 @@ func (o *DirectProvider) ExportDiff(
ctx context.Context,
out io.Writer,
stopSandbox func(context.Context) error,
) (h *header.DiffMetadata, e error) {
) (*header.DiffMetadata, error) {
ctx, childSpan := tracer.Start(ctx, "direct-provider-export")
defer childSpan.End()

o.exporting.CompareAndSwap(false, true)

defer func() {
if e != nil {
e = errors.Join(e, o.cache.Close())
err := o.mmap.Unmap()
if err != nil {
logger.L().Error(ctx, "error unmapping mmap", zap.Error(err))
}
}()

Expand All @@ -81,10 +110,22 @@ func (o *DirectProvider) ExportDiff(
}
telemetry.ReportEvent(ctx, "sandbox stopped")

o.cache.MarkAllAsDirty()
m, err := o.cache.ExportToDiff(ctx, out)
m, err := o.exportToDiff(ctx, out)
if err != nil {
return nil, fmt.Errorf("error exporting cache: %w", err)
return nil, fmt.Errorf("error building diff metadata: %w", err)
}

if o.config.RootfsChecksumVerification {
checksums, err := calculateChecksums(ctx, o.path, o.blockSize)
if err != nil {
return nil, fmt.Errorf("error calculating checksum: %w", err)
}

logger.L().Debug(ctx, "exported rootfs checksum direct",
zap.String("checksum", hex.EncodeToString(checksums.Checksum[:])),
)

m.Checksums = &checksums
}

telemetry.ReportEvent(ctx, "cache exported")
Expand All @@ -99,13 +140,78 @@ func (o *DirectProvider) Close(ctx context.Context) error {
return nil
}

return errors.Join(o.sync(ctx), o.cache.Close())
return errors.Join(o.sync(ctx), o.mmap.Unmap())
}

func (o *DirectProvider) Path() (string, error) {
return o.path, nil
}

func (o *DirectProvider) exportToDiff(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) {
err := o.sync(ctx)
if err != nil {
return nil, fmt.Errorf("error flushing path: %w", err)
}

builder := header.NewDiffMetadataBuilder(int64(o.header.Metadata.Size), o.blockSize)

f, err := os.Open(o.path)
if err != nil {
return nil, fmt.Errorf("error opening path: %w", err)
}
defer f.Close()

block := make([]byte, o.blockSize)
for i := int64(0); i < int64(o.header.Metadata.Size); i += o.blockSize {
n, err := f.ReadAt(block, i)
if err != nil {
return nil, fmt.Errorf("error reading from file: %w", err)
}

err = builder.Process(ctx, block[:n], out, i)
if err != nil {
return nil, fmt.Errorf("error processing block %d: %w", i, err)
}
}

m, err := builder.Build()
if err != nil {
return nil, fmt.Errorf("error building diff metadata: %w", err)
}

return m, nil
}
Comment on lines +177 to +183
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do the same as in the Cache, also Build() can't return error, so that can be also simplified

Suggested change
m, err := builder.Build()
if err != nil {
return nil, fmt.Errorf("error building diff metadata: %w", err)
}
return m, nil
}
return builder.Build()
}


func (o *DirectProvider) sync(ctx context.Context) error {
return errors.Join(o.cache.Sync(), flush(ctx, o.path))
err := o.mmap.Flush()
if err != nil {
return fmt.Errorf("error flushing mmap: %w", err)
}

return flush(ctx, o.path)
}

type FileCtx struct {
*os.File
}

func (f *FileCtx) ReadAt(_ context.Context, p []byte, off int64) (int, error) {
return f.File.ReadAt(p, off)
}

func calculateChecksums(ctx context.Context, path string, blockSize int64) (header.Checksums, error) {
f, err := os.Open(path)
if err != nil {
return header.Checksums{}, fmt.Errorf("error opening path: %w", err)
}
defer f.Close()

size, err := f.Stat()
if err != nil {
return header.Checksums{}, fmt.Errorf("error getting file size: %w", err)
}

fctx := &FileCtx{File: f}

return CalculateChecksumsReader(ctx, fctx, size.Size(), blockSize)
}
Loading