-
Notifications
You must be signed in to change notification settings - Fork 203
chore: refactor rootfs direct provider and add checksums #1543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,6 +71,14 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro | |
| // We will use this to handle base builds that are already diffs. | ||
| // The passed slice p must start as empty, otherwise we would need to copy the empty values there. | ||
| if *buildID == uuid.Nil { | ||
| isEmpty, err := header.IsEmptyBlock(p[n:int64(n)+readLength], int64(b.header.Metadata.BlockSize)) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("failed to check if block is empty: %w", err) | ||
| } | ||
| if !isEmpty { | ||
| return 0, fmt.Errorf("block is not empty") | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Empty block check fails for non-blockSize readsThe |
||
|
|
||
| n += int(readLength) | ||
|
|
||
| continue | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,50 +2,78 @@ package rootfs | |||||||||||||||||||
|
|
||||||||||||||||||||
| import ( | ||||||||||||||||||||
| "context" | ||||||||||||||||||||
| "encoding/hex" | ||||||||||||||||||||
| "errors" | ||||||||||||||||||||
| "fmt" | ||||||||||||||||||||
| "io" | ||||||||||||||||||||
| "os" | ||||||||||||||||||||
| "sync/atomic" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| "github.com/edsrzf/mmap-go" | ||||||||||||||||||||
| "go.uber.org/zap" | ||||||||||||||||||||
| "golang.org/x/sys/unix" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| "github.com/e2b-dev/infra/packages/orchestrator/internal/cfg" | ||||||||||||||||||||
| "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block" | ||||||||||||||||||||
| "github.com/e2b-dev/infra/packages/shared/pkg/logger" | ||||||||||||||||||||
| "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" | ||||||||||||||||||||
| "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| type DirectProvider struct { | ||||||||||||||||||||
| cache *block.Cache | ||||||||||||||||||||
| path string | ||||||||||||||||||||
| config cfg.BuilderConfig | ||||||||||||||||||||
|
|
||||||||||||||||||||
| header *header.Header | ||||||||||||||||||||
|
|
||||||||||||||||||||
| path string | ||||||||||||||||||||
| blockSize int64 | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // TODO: Remove when the snapshot flow is improved | ||||||||||||||||||||
| finishedOperations chan struct{} | ||||||||||||||||||||
| // TODO: Remove when the snapshot flow is improved | ||||||||||||||||||||
| exporting atomic.Bool | ||||||||||||||||||||
|
|
||||||||||||||||||||
| mmap *mmap.MMap | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func NewDirectProvider(rootfs block.ReadonlyDevice, path string) (Provider, error) { | ||||||||||||||||||||
| size, err := rootfs.Size() | ||||||||||||||||||||
| func NewDirectProvider(config cfg.BuilderConfig, rootfs block.ReadonlyDevice, path string) (Provider, error) { | ||||||||||||||||||||
| blockSize := rootfs.BlockSize() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o644) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error getting device size: %w", err) | ||||||||||||||||||||
| return nil, fmt.Errorf("error opening file: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| defer f.Close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| blockSize := rootfs.BlockSize() | ||||||||||||||||||||
| size, err := rootfs.Size() | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error getting size: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| cache, err := block.NewCache(size, blockSize, path, true) | ||||||||||||||||||||
| mm, err := mmap.MapRegion(f, int(size), unix.PROT_READ|unix.PROT_WRITE, 0, 0) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error creating cache: %w", err) | ||||||||||||||||||||
| return nil, fmt.Errorf("error mapping region: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return &DirectProvider{ | ||||||||||||||||||||
| cache: cache, | ||||||||||||||||||||
| path: path, | ||||||||||||||||||||
| config: config, | ||||||||||||||||||||
|
|
||||||||||||||||||||
| header: rootfs.Header(), | ||||||||||||||||||||
|
|
||||||||||||||||||||
| path: path, | ||||||||||||||||||||
| blockSize: blockSize, | ||||||||||||||||||||
|
|
||||||||||||||||||||
| finishedOperations: make(chan struct{}, 1), | ||||||||||||||||||||
|
|
||||||||||||||||||||
| mmap: &mm, | ||||||||||||||||||||
| }, nil | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (o *DirectProvider) Verify(_ context.Context) error { | ||||||||||||||||||||
| // No verification needed for direct provider for now | ||||||||||||||||||||
| return nil | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (o *DirectProvider) Start(_ context.Context) error { | ||||||||||||||||||||
| return nil | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
@@ -54,15 +82,16 @@ func (o *DirectProvider) ExportDiff( | |||||||||||||||||||
| ctx context.Context, | ||||||||||||||||||||
| out io.Writer, | ||||||||||||||||||||
| stopSandbox func(context.Context) error, | ||||||||||||||||||||
| ) (h *header.DiffMetadata, e error) { | ||||||||||||||||||||
| ) (*header.DiffMetadata, error) { | ||||||||||||||||||||
| ctx, childSpan := tracer.Start(ctx, "direct-provider-export") | ||||||||||||||||||||
| defer childSpan.End() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| o.exporting.CompareAndSwap(false, true) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| defer func() { | ||||||||||||||||||||
| if e != nil { | ||||||||||||||||||||
| e = errors.Join(e, o.cache.Close()) | ||||||||||||||||||||
| err := o.mmap.Unmap() | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| logger.L().Error(ctx, "error unmapping mmap", zap.Error(err)) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
@@ -81,10 +110,22 @@ func (o *DirectProvider) ExportDiff( | |||||||||||||||||||
| } | ||||||||||||||||||||
| telemetry.ReportEvent(ctx, "sandbox stopped") | ||||||||||||||||||||
|
|
||||||||||||||||||||
| o.cache.MarkAllAsDirty() | ||||||||||||||||||||
| m, err := o.cache.ExportToDiff(ctx, out) | ||||||||||||||||||||
| m, err := o.exportToDiff(ctx, out) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error exporting cache: %w", err) | ||||||||||||||||||||
| return nil, fmt.Errorf("error building diff metadata: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if o.config.RootfsChecksumVerification { | ||||||||||||||||||||
| checksums, err := calculateChecksums(ctx, o.path, o.blockSize) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error calculating checksum: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| logger.L().Debug(ctx, "exported rootfs checksum direct", | ||||||||||||||||||||
| zap.String("checksum", hex.EncodeToString(checksums.Checksum[:])), | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| m.Checksums = &checksums | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| telemetry.ReportEvent(ctx, "cache exported") | ||||||||||||||||||||
|
|
@@ -99,13 +140,78 @@ func (o *DirectProvider) Close(ctx context.Context) error { | |||||||||||||||||||
| return nil | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return errors.Join(o.sync(ctx), o.cache.Close()) | ||||||||||||||||||||
| return errors.Join(o.sync(ctx), o.mmap.Unmap()) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (o *DirectProvider) Path() (string, error) { | ||||||||||||||||||||
| return o.path, nil | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (o *DirectProvider) exportToDiff(ctx context.Context, out io.Writer) (*header.DiffMetadata, error) { | ||||||||||||||||||||
| err := o.sync(ctx) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error flushing path: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| builder := header.NewDiffMetadataBuilder(int64(o.header.Metadata.Size), o.blockSize) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| f, err := os.Open(o.path) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error opening path: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| defer f.Close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| block := make([]byte, o.blockSize) | ||||||||||||||||||||
| for i := int64(0); i < int64(o.header.Metadata.Size); i += o.blockSize { | ||||||||||||||||||||
| n, err := f.ReadAt(block, i) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error reading from file: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| err = builder.Process(ctx, block[:n], out, i) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error processing block %d: %w", i, err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| m, err := builder.Build() | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return nil, fmt.Errorf("error building diff metadata: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return m, nil | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
Comment on lines
+177
to
+183
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can do the same as in the Cache, also
Suggested change
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| func (o *DirectProvider) sync(ctx context.Context) error { | ||||||||||||||||||||
| return errors.Join(o.cache.Sync(), flush(ctx, o.path)) | ||||||||||||||||||||
| err := o.mmap.Flush() | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return fmt.Errorf("error flushing mmap: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return flush(ctx, o.path) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| type FileCtx struct { | ||||||||||||||||||||
| *os.File | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (f *FileCtx) ReadAt(_ context.Context, p []byte, off int64) (int, error) { | ||||||||||||||||||||
| return f.File.ReadAt(p, off) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func calculateChecksums(ctx context.Context, path string, blockSize int64) (header.Checksums, error) { | ||||||||||||||||||||
| f, err := os.Open(path) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return header.Checksums{}, fmt.Errorf("error opening path: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
| defer f.Close() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| size, err := f.Stat() | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| return header.Checksums{}, fmt.Errorf("error getting file size: %w", err) | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| fctx := &FileCtx{File: f} | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return CalculateChecksumsReader(ctx, fctx, size.Size(), blockSize) | ||||||||||||||||||||
| } | ||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might not be needed, it's just check to be sure that the block is empty