Skip to content

Commit

Permalink
Make Tree objects topologically sorted
Browse files Browse the repository at this point in the history
I have submitted the following PR against remote-apis, which adds a hint
to ActionResult to indicate that the Directory messages contained in a
Tree are topologically sorted:

bazelbuild/remote-apis#230

The advantage of having Tree objects in this form is that it makes it
possible by doing instantiation of Tree objects on a local file system
using a simple forward scan.

As this change hasn't been merged yet, this comment only adds the logic
for generating topologically sorted Tree objects. I will only add the
code for announcing the hint as soon as the linked PR gets merged.
  • Loading branch information
exbucks committed Oct 14, 2022
1 parent c3f0df6 commit 5ecce31
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 83 deletions.
1 change: 1 addition & 0 deletions pkg/builder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_library(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//encoding/protowire",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/durationpb",
Expand Down
213 changes: 130 additions & 83 deletions pkg/builder/output_hierarchy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/filesystem"
"github.com/buildbarn/bb-storage/pkg/filesystem/path"
"github.com/buildbarn/bb-storage/pkg/util"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -201,97 +203,57 @@ func (s *uploadOutputsState) saveError(err error) {
}
}

// UploadDirectory is called to upload a single directory. Elements in
// the directory are stored in a remoteexecution.Directory, so that they
// can be placed in a remoteexecution.Tree.
func (s *uploadOutputsState) uploadDirectory(d UploadableDirectory, dPath *path.Trace, children map[digest.Digest]*remoteexecution.Directory) *remoteexecution.Directory {
files, err := d.ReadDir()
if err != nil {
s.saveError(util.StatusWrapf(err, "Failed to read output directory %#v", dPath.String()))
return &remoteexecution.Directory{}
}

var directory remoteexecution.Directory
for _, file := range files {
name := file.Name()
childPath := dPath.Append(name)
switch fileType := file.Type(); fileType {
case filesystem.FileTypeRegularFile:
if childDigest, err := d.UploadFile(s.context, name, s.digestFunction); err == nil {
directory.Files = append(directory.Files, &remoteexecution.FileNode{
Name: name.String(),
Digest: childDigest.GetProto(),
IsExecutable: file.IsExecutable(),
})
} else {
s.saveError(util.StatusWrapf(err, "Failed to store output file %#v", childPath.String()))
}
case filesystem.FileTypeDirectory:
if childDirectory, err := d.EnterUploadableDirectory(name); err == nil {
child := s.uploadDirectory(childDirectory, dPath, children)
childDirectory.Close()

// Compute digest of the child directory. This requires serializing it.
if data, err := proto.Marshal(child); err == nil {
digestGenerator := s.digestFunction.NewGenerator()
if _, err := digestGenerator.Write(data); err == nil {
childDigest := digestGenerator.Sum()
children[childDigest] = child
directory.Directories = append(directory.Directories, &remoteexecution.DirectoryNode{
Name: name.String(),
Digest: childDigest.GetProto(),
})
} else {
s.saveError(util.StatusWrapf(err, "Failed to compute digest of output directory %#v", childPath.String()))
}
} else {
s.saveError(util.StatusWrapf(err, "Failed to marshal output directory %#v", childPath.String()))
}
} else {
s.saveError(util.StatusWrapf(err, "Failed to enter output directory %#v", childPath.String()))
}
case filesystem.FileTypeSymlink:
if target, err := d.Readlink(name); err == nil {
directory.Symlinks = append(directory.Symlinks, &remoteexecution.SymlinkNode{
Name: name.String(),
Target: target,
})
} else {
s.saveError(util.StatusWrapf(err, "Failed to read output symlink %#v", childPath.String()))
}
}
}
return &directory
}

// UploadOutputDirectoryEntered is called to upload a single output
// directory as a remoteexecution.Tree. The root directory is assumed to
// already be opened.
func (s *uploadOutputsState) uploadOutputDirectoryEntered(d UploadableDirectory, dPath *path.Trace, paths []string) {
children := map[digest.Digest]*remoteexecution.Directory{}
tree := &remoteexecution.Tree{
Root: s.uploadDirectory(d, dPath, children),
}

childDigests := digest.NewSetBuilder()
for childDigest := range children {
childDigests.Add(childDigest)
}
for _, childDigest := range childDigests.Build().Items() {
tree.Children = append(tree.Children, children[childDigest])
dState := uploadOutputDirectoryState{
uploadOutputsState: s,
directoriesSeen: map[digest.Digest]struct{}{},
}
if rootDirectory, err := dState.uploadDirectory(d, dPath); err == nil {
// Approximate the size of the resulting Tree object, so
// that we may allocate all space at once.
directories := append(dState.directories, rootDirectory)
maximumTreeSizeBytes := 0
for _, directory := range directories {
maximumTreeSizeBytes += len(directory)
}
maximumTreeSizeBytes += len(directories) * (1 + protowire.SizeVarint(uint64(maximumTreeSizeBytes)))

// Construct the Tree object. We don't want to use
// proto.Marshal() for this, as it would require us to
// marshal all of the directories a second time.
treeData := make([]byte, 0, maximumTreeSizeBytes)
tag := byte(blobstore.TreeRootFieldNumber<<3) | byte(protowire.BytesType)
for i := len(directories); i > 0; i-- {
directory := directories[i-1]
treeData = append(treeData, tag)
treeData = protowire.AppendVarint(treeData, uint64(len(directory)))
treeData = append(treeData, directory...)
tag = byte(blobstore.TreeChildrenFieldNumber<<3) | byte(protowire.BytesType)
}

if treeDigest, err := blobstore.CASPutProto(s.context, s.contentAddressableStorage, tree, s.digestFunction); err == nil {
for _, path := range paths {
s.actionResult.OutputDirectories = append(
s.actionResult.OutputDirectories,
&remoteexecution.OutputDirectory{
Path: path,
TreeDigest: treeDigest.GetProto(),
})
digestGenerator := s.digestFunction.NewGenerator()
if _, err := digestGenerator.Write(treeData); err != nil {
panic(err)
}
treeDigest := digestGenerator.Sum()

if err := s.contentAddressableStorage.Put(s.context, treeDigest, buffer.NewValidatedBufferFromByteSlice(treeData)); err == nil {
for _, path := range paths {
s.actionResult.OutputDirectories = append(
s.actionResult.OutputDirectories,
&remoteexecution.OutputDirectory{
Path: path,
TreeDigest: treeDigest.GetProto(),
})
}
} else {
s.saveError(util.StatusWrapf(err, "Failed to store output directory %#v", dPath.String()))
}
} else {
s.saveError(util.StatusWrapf(err, "Failed to store output directory %#v", dPath.String()))
s.saveError(err)
}
}

Expand Down Expand Up @@ -341,6 +303,91 @@ func (s *uploadOutputsState) uploadOutputSymlink(d UploadableDirectory, name pat
}
}

// UploadOutputDirectoryState is used by OutputHierarchy.UploadOutputs()
// to track state specific to uploading a single output directory.
type uploadOutputDirectoryState struct {
*uploadOutputsState

directories [][]byte
directoriesSeen map[digest.Digest]struct{}
}

// UploadDirectory is called to upload a single directory. Elements in
// the directory are stored in a remoteexecution.Directory, so that they
// can be placed in a remoteexecution.Tree.
func (s *uploadOutputDirectoryState) uploadDirectory(d UploadableDirectory, dPath *path.Trace) ([]byte, error) {
files, err := d.ReadDir()
if err != nil {
return nil, util.StatusWrapf(err, "Failed to read output directory %#v", dPath.String())
}

var directory remoteexecution.Directory
for _, file := range files {
name := file.Name()
childPath := dPath.Append(name)
switch fileType := file.Type(); fileType {
case filesystem.FileTypeRegularFile:
if childDigest, err := d.UploadFile(s.context, name, s.digestFunction); err == nil {
directory.Files = append(directory.Files, &remoteexecution.FileNode{
Name: name.String(),
Digest: childDigest.GetProto(),
IsExecutable: file.IsExecutable(),
})
} else {
s.saveError(util.StatusWrapf(err, "Failed to store output file %#v", childPath.String()))
}
case filesystem.FileTypeDirectory:
if childDirectory, err := d.EnterUploadableDirectory(name); err == nil {
childData, err := s.uploadDirectory(childDirectory, dPath)
childDirectory.Close()
if err == nil {
// Compute the digest of the child
// directory, so that it may be
// referenced by the parent.
digestGenerator := s.digestFunction.NewGenerator()
if _, err := digestGenerator.Write(childData); err != nil {
panic(err)
}
childDigest := digestGenerator.Sum()

// There is no need to make the
// directory part of the Tree if we
// have seen an identical directory
// previously.
if _, ok := s.directoriesSeen[childDigest]; !ok {
s.directories = append(s.directories, childData)
s.directoriesSeen[childDigest] = struct{}{}
}

directory.Directories = append(directory.Directories, &remoteexecution.DirectoryNode{
Name: name.String(),
Digest: childDigest.GetProto(),
})
} else {
s.saveError(err)
}
} else {
s.saveError(util.StatusWrapf(err, "Failed to enter output directory %#v", childPath.String()))
}
case filesystem.FileTypeSymlink:
if target, err := d.Readlink(name); err == nil {
directory.Symlinks = append(directory.Symlinks, &remoteexecution.SymlinkNode{
Name: name.String(),
Target: target,
})
} else {
s.saveError(util.StatusWrapf(err, "Failed to read output symlink %#v", childPath.String()))
}
}
}

data, err := proto.Marshal(&directory)
if err != nil {
return nil, util.StatusWrapf(err, "Failed to marshal output directory %#v", dPath.String())
}
return data, nil
}

// outputNodePath is an implementation of path.ComponentWalker that is
// used by NewOutputHierarchy() to compute normalized paths of outputs
// of a build action.
Expand Down

0 comments on commit 5ecce31

Please sign in to comment.