Skip to content

Commit ee8fda1

Browse files
committed
Make input root population for hardlinking workers run in parallel
Even though I think that people should not be using native build directories and use FUSE/NFSv4 instead, you see that people still use them in practice. Reasons include lack of permissions to use FUSE, or some workloads being incompatible with it. This change generalizes the existing 'download concurrency' option we have for prefetching for FUSE/NFSv4, and extends it to also apply to calls into FileFetcher.GetFile().
1 parent 9dbd838 commit ee8fda1

File tree

5 files changed

+292
-246
lines changed

5 files changed

+292
-246
lines changed

cmd/bb_worker/main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ func main() {
9292
}
9393
globalContentAddressableStorage = re_blobstore.NewExistencePreconditionBlobAccess(globalContentAddressableStorage)
9494

95-
var prefetchingDownloadConcurrency *semaphore.Weighted
9695
var fileSystemAccessCache blobstore.BlobAccess
9796
prefetchingConfiguration := configuration.Prefetching
9897
if prefetchingConfiguration != nil {
@@ -106,7 +105,6 @@ func main() {
106105
return util.StatusWrap(err, "Failed to create File System Access Cache")
107106
}
108107
fileSystemAccessCache = info.BlobAccess
109-
prefetchingDownloadConcurrency = semaphore.NewWeighted(prefetchingConfiguration.DownloadConcurrency)
110108
}
111109

112110
// Cached read access for Directory objects stored in the
@@ -161,6 +159,12 @@ func main() {
161159
}()
162160
}
163161

162+
inputDownloadConcurrency := configuration.InputDownloadConcurrency
163+
if inputDownloadConcurrency <= 0 {
164+
return status.Errorf(codes.InvalidArgument, "Nonpositive input download concurrency: ", inputDownloadConcurrency)
165+
}
166+
inputDownloadConcurrencySemaphore := semaphore.NewWeighted(inputDownloadConcurrency)
167+
164168
outputUploadConcurrency := configuration.OutputUploadConcurrency
165169
if outputUploadConcurrency <= 0 {
166170
return status.Errorf(codes.InvalidArgument, "Nonpositive output upload concurrency: ", outputUploadConcurrency)
@@ -361,6 +365,7 @@ func main() {
361365
naiveBuildDirectory,
362366
directoryFetcher,
363367
fileFetcher,
368+
inputDownloadConcurrencySemaphore,
364369
contentAddressableStorageWriter)
365370
}
366371

@@ -406,7 +411,7 @@ func main() {
406411
buildExecutor,
407412
globalContentAddressableStorage,
408413
directoryFetcher,
409-
prefetchingDownloadConcurrency,
414+
inputDownloadConcurrencySemaphore,
410415
fileSystemAccessCache,
411416
int(configuration.MaximumMessageSizeBytes),
412417
int(prefetchingConfiguration.BloomFilterBitsPerPath),

pkg/builder/naive_build_directory.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ import (
1515
"github.com/buildbarn/bb-storage/pkg/filesystem/path"
1616
"github.com/buildbarn/bb-storage/pkg/util"
1717

18+
"golang.org/x/sync/errgroup"
19+
"golang.org/x/sync/semaphore"
1820
"google.golang.org/grpc/codes"
1921
"google.golang.org/grpc/status"
2022
)
2123

2224
type naiveBuildDirectoryOptions struct {
2325
directoryFetcher cas.DirectoryFetcher
2426
fileFetcher cas.FileFetcher
27+
fileFetcherSemaphore *semaphore.Weighted
2528
contentAddressableStorage blobstore.BlobAccess
2629
}
2730

@@ -40,12 +43,13 @@ type naiveBuildDirectory struct {
4043
// regular local file systems. The downside of such file systems is that
4144
// we cannot populate them on demand. All of the input files must be
4245
// present before invoking the build action.
43-
func NewNaiveBuildDirectory(directory filesystem.DirectoryCloser, directoryFetcher cas.DirectoryFetcher, fileFetcher cas.FileFetcher, contentAddressableStorage blobstore.BlobAccess) BuildDirectory {
46+
func NewNaiveBuildDirectory(directory filesystem.DirectoryCloser, directoryFetcher cas.DirectoryFetcher, fileFetcher cas.FileFetcher, fileFetcherSemaphore *semaphore.Weighted, contentAddressableStorage blobstore.BlobAccess) BuildDirectory {
4447
return &naiveBuildDirectory{
4548
DirectoryCloser: directory,
4649
options: &naiveBuildDirectoryOptions{
4750
directoryFetcher: directoryFetcher,
4851
fileFetcher: fileFetcher,
52+
fileFetcherSemaphore: fileFetcherSemaphore,
4953
contentAddressableStorage: contentAddressableStorage,
5054
},
5155
}
@@ -76,7 +80,7 @@ func (d *naiveBuildDirectory) InstallHooks(filePool re_filesystem.FilePool, erro
7680
// of I/O errors is performed.
7781
}
7882

79-
func (d *naiveBuildDirectory) mergeDirectoryContents(ctx context.Context, digest digest.Digest, inputDirectory filesystem.Directory, pathTrace *path.Trace) error {
83+
func (d *naiveBuildDirectory) mergeDirectoryContents(ctx context.Context, group *errgroup.Group, digest digest.Digest, inputDirectory *filesystem.ReferenceCountedDirectoryCloser, pathTrace *path.Trace) error {
8084
// Obtain directory.
8185
options := d.options
8286
directory, err := options.directoryFetcher.GetDirectory(ctx, digest)
@@ -96,9 +100,24 @@ func (d *naiveBuildDirectory) mergeDirectoryContents(ctx context.Context, digest
96100
if err != nil {
97101
return util.StatusWrapf(err, "Failed to extract digest for input file %#v", childPathTrace.GetUNIXString())
98102
}
99-
if err := options.fileFetcher.GetFile(ctx, childDigest, inputDirectory, component, file.IsExecutable); err != nil {
100-
return util.StatusWrapf(err, "Failed to obtain input file %#v", childPathTrace.GetUNIXString())
103+
104+
// Download individual input files in parallel.
105+
if err := util.AcquireSemaphore(ctx, options.fileFetcherSemaphore, 1); err != nil {
106+
return err
101107
}
108+
downloadDirectory := inputDirectory.Duplicate()
109+
group.Go(func() error {
110+
errGetFile := options.fileFetcher.GetFile(ctx, childDigest, downloadDirectory, component, file.IsExecutable)
111+
errClose := downloadDirectory.Close()
112+
options.fileFetcherSemaphore.Release(1)
113+
if errGetFile != nil {
114+
return util.StatusWrapf(errGetFile, "Failed to obtain input file %#v", childPathTrace.GetUNIXString())
115+
}
116+
if errClose != nil {
117+
return util.StatusWrapf(err, "Failed to close input directory %#v", pathTrace.GetUNIXString())
118+
}
119+
return nil
120+
})
102121
}
103122
for _, directory := range directory.Directories {
104123
component, ok := path.NewComponent(directory.Name)
@@ -117,10 +136,14 @@ func (d *naiveBuildDirectory) mergeDirectoryContents(ctx context.Context, digest
117136
if err != nil {
118137
return util.StatusWrapf(err, "Failed to enter input directory %#v", childPathTrace.GetUNIXString())
119138
}
120-
err = d.mergeDirectoryContents(ctx, childDigest, childDirectory, childPathTrace)
121-
childDirectory.Close()
122-
if err != nil {
123-
return err
139+
refcountedDirectory := filesystem.NewReferenceCountedDirectoryCloser(childDirectory)
140+
errMerge := d.mergeDirectoryContents(ctx, group, childDigest, refcountedDirectory, childPathTrace)
141+
errClose := refcountedDirectory.Close()
142+
if errMerge != nil {
143+
return errMerge
144+
}
145+
if errClose != nil {
146+
return util.StatusWrapf(err, "Failed to close input directory %#v", childPathTrace.GetUNIXString())
124147
}
125148
}
126149
for _, symlink := range directory.Symlinks {
@@ -137,7 +160,11 @@ func (d *naiveBuildDirectory) mergeDirectoryContents(ctx context.Context, digest
137160
}
138161

139162
func (d *naiveBuildDirectory) MergeDirectoryContents(ctx context.Context, errorLogger util.ErrorLogger, digest digest.Digest, monitor access.UnreadDirectoryMonitor) error {
140-
return d.mergeDirectoryContents(ctx, digest, d.DirectoryCloser, nil)
163+
group, groupCtx := errgroup.WithContext(ctx)
164+
group.Go(func() error {
165+
return d.mergeDirectoryContents(groupCtx, group, digest, filesystem.NewReferenceCountedDirectoryCloser(d.DirectoryCloser), nil)
166+
})
167+
return group.Wait()
141168
}
142169

143170
func (d *naiveBuildDirectory) UploadFile(ctx context.Context, name path.Component, digestFunction digest.Function, writableFileUploadDelay <-chan struct{}) (digest.Digest, error) {

pkg/builder/naive_build_directory_test.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/golang/mock/gomock"
1818
"github.com/stretchr/testify/require"
1919

20+
"golang.org/x/sync/semaphore"
2021
"google.golang.org/grpc/codes"
2122
"google.golang.org/grpc/status"
2223
)
@@ -89,13 +90,13 @@ func TestNaiveBuildDirectorySuccess(t *testing.T) {
8990
fileFetcher.EXPECT().GetFile(
9091
gomock.Any(),
9192
digest.MustNewDigest("netbsd", remoteexecution.DigestFunction_SHA256, "9999999999999999999999999999999999999999999999999999999999999999", 512),
92-
buildDirectory,
93+
gomock.Any(),
9394
path.MustNewComponent("non-executable"),
9495
false).Return(nil)
9596
fileFetcher.EXPECT().GetFile(
9697
gomock.Any(),
9798
digest.MustNewDigest("netbsd", remoteexecution.DigestFunction_SHA256, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", 512),
98-
buildDirectory,
99+
gomock.Any(),
99100
path.MustNewComponent("executable"),
100101
true).Return(nil)
101102
buildDirectory.EXPECT().Symlink(gomock.Any(), path.MustNewComponent("link-to-executable")).
@@ -105,7 +106,7 @@ func TestNaiveBuildDirectorySuccess(t *testing.T) {
105106
require.Equal(t, "executable", targetPath.GetUNIXString())
106107
})
107108
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
108-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
109+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
109110

110111
err := inputRootPopulator.MergeDirectoryContents(
111112
ctx,
@@ -127,7 +128,7 @@ func TestNaiveBuildDirectoryInputRootNotInStorage(t *testing.T) {
127128
buildDirectory := mock.NewMockDirectoryCloser(ctrl)
128129
fileFetcher := mock.NewMockFileFetcher(ctrl)
129130
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
130-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
131+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
131132

132133
err := inputRootPopulator.MergeDirectoryContents(
133134
ctx,
@@ -173,7 +174,7 @@ func TestNaiveBuildDirectoryMissingInputDirectoryDigest(t *testing.T) {
173174
helloDirectory.EXPECT().Close()
174175
fileFetcher := mock.NewMockFileFetcher(ctrl)
175176
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
176-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
177+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
177178

178179
err := inputRootPopulator.MergeDirectoryContents(
179180
ctx,
@@ -224,7 +225,7 @@ func TestNaiveBuildDirectoryDirectoryCreationFailure(t *testing.T) {
224225
helloDirectory.EXPECT().Close()
225226
fileFetcher := mock.NewMockFileFetcher(ctrl)
226227
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
227-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
228+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
228229

229230
err := inputRootPopulator.MergeDirectoryContents(
230231
ctx,
@@ -276,7 +277,7 @@ func TestNaiveBuildDirectoryDirectoryEnterDirectoryFailure(t *testing.T) {
276277
helloDirectory.EXPECT().Close()
277278
fileFetcher := mock.NewMockFileFetcher(ctrl)
278279
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
279-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
280+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
280281

281282
err := inputRootPopulator.MergeDirectoryContents(
282283
ctx,
@@ -322,7 +323,7 @@ func TestNaiveBuildDirectoryMissingInputFileDigest(t *testing.T) {
322323
helloDirectory.EXPECT().Close()
323324
fileFetcher := mock.NewMockFileFetcher(ctrl)
324325
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
325-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
326+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
326327

327328
err := inputRootPopulator.MergeDirectoryContents(
328329
ctx,
@@ -373,12 +374,12 @@ func TestNaiveBuildDirectoryFileCreationFailure(t *testing.T) {
373374
fileFetcher.EXPECT().GetFile(
374375
gomock.Any(),
375376
digest.MustNewDigest("netbsd", remoteexecution.DigestFunction_SHA256, "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 87),
376-
helloDirectory,
377+
gomock.Any(),
377378
path.MustNewComponent("World"),
378379
false).Return(status.Error(codes.DataLoss, "Disk on fire"))
379380
helloDirectory.EXPECT().Close()
380381
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
381-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
382+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
382383

383384
err := inputRootPopulator.MergeDirectoryContents(
384385
ctx,
@@ -432,7 +433,7 @@ func TestNaiveBuildDirectorySymlinkCreationFailure(t *testing.T) {
432433
helloDirectory.EXPECT().Close()
433434
fileFetcher := mock.NewMockFileFetcher(ctrl)
434435
contentAddressableStorage := mock.NewMockBlobAccess(ctrl)
435-
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, contentAddressableStorage)
436+
inputRootPopulator := builder.NewNaiveBuildDirectory(buildDirectory, directoryFetcher, fileFetcher, semaphore.NewWeighted(1), contentAddressableStorage)
436437

437438
err := inputRootPopulator.MergeDirectoryContents(
438439
ctx,
@@ -453,7 +454,9 @@ func TestNaiveBuildDirectoryUploadFile(t *testing.T) {
453454
buildDirectory,
454455
directoryFetcher,
455456
fileFetcher,
456-
contentAddressableStorage)
457+
semaphore.NewWeighted(1),
458+
contentAddressableStorage,
459+
)
457460

458461
helloWorldDigest := digest.MustNewDigest("default-scheduler", remoteexecution.DigestFunction_MD5, "3e25960a79dbc69b674cd4ec67a72c62", 11)
459462
digestFunction := helloWorldDigest.GetDigestFunction()

0 commit comments

Comments
 (0)