Skip to content

Commit bca1912

Browse files
buchgrCopybara-Service
authored and
Copybara-Service
committed
remote: fix race on download error. Fixes bazelbuild#5047
For downloading output files / directories we trigger all downloads concurrently and asynchronously in the background and after that wait for all downloads to finish. However, if a download failed we did not wait for the remaining downloads to finish but immediately started deleting partial downloads and continued with local execution of the action. That leads to two interesting bugs: * The cleanup procedure races with the downloads that are still in progress. As it tries to delete files and directories, new files and directories are created and that will often lead to "Directory not empty" errors as seen in bazelbuild#5047. * The clean up procedure does not detect the race, succeeds and subsequent local execution fails because not all files have been deleted. The solution is to always wait for all downloads to complete before entering the cleanup routine. Ideally we would also cancel all outstanding downloads, however, that's not as straightfoward as it seems. That is, the j.u.c.Future API does not provide a way to cancel a computation and also wait for that computation actually having determinated. So we'd need to introduce a separate mechanism to cancel downloads. RELNOTES: None PiperOrigin-RevId: 205980446
1 parent 04eeca4 commit bca1912

File tree

3 files changed

+224
-52
lines changed

3 files changed

+224
-52
lines changed

src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java

+71-50
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.remote;
1515

16-
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
17-
16+
import com.google.common.annotations.VisibleForTesting;
1817
import com.google.common.base.Preconditions;
1918
import com.google.common.util.concurrent.FutureCallback;
2019
import com.google.common.util.concurrent.Futures;
@@ -27,6 +26,7 @@
2726
import com.google.devtools.build.lib.concurrent.ThreadSafety;
2827
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
2928
import com.google.devtools.build.lib.remote.util.DigestUtil;
29+
import com.google.devtools.build.lib.remote.util.Utils;
3030
import com.google.devtools.build.lib.util.io.FileOutErr;
3131
import com.google.devtools.build.lib.vfs.Dirent;
3232
import com.google.devtools.build.lib.vfs.FileStatus;
@@ -170,68 +170,84 @@ public void onFailure(Throwable t) {
170170
// TODO(olaola): will need to amend to include the TreeNodeRepository for updating.
171171
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
172172
throws ExecException, IOException, InterruptedException {
173-
try {
174-
Context ctx = Context.current();
175-
List<FuturePathBooleanTuple> fileDownloads =
176-
Collections.synchronizedList(
177-
new ArrayList<>(result.getOutputFilesCount() + result.getOutputDirectoriesCount()));
178-
for (OutputFile file : result.getOutputFilesList()) {
179-
Path path = execRoot.getRelative(file.getPath());
180-
ListenableFuture<Void> download =
181-
retrier.executeAsync(
182-
() -> ctx.call(() -> downloadFile(path, file.getDigest(), file.getContent())));
183-
fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable()));
184-
}
173+
Context ctx = Context.current();
174+
List<FuturePathBooleanTuple> fileDownloads =
175+
Collections.synchronizedList(
176+
new ArrayList<>(result.getOutputFilesCount() + result.getOutputDirectoriesCount()));
177+
for (OutputFile file : result.getOutputFilesList()) {
178+
Path path = execRoot.getRelative(file.getPath());
179+
ListenableFuture<Void> download =
180+
retrier.executeAsync(
181+
() -> ctx.call(() -> downloadFile(path, file.getDigest(), file.getContent())));
182+
fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable()));
183+
}
185184

186-
List<ListenableFuture<Void>> dirDownloads =
187-
new ArrayList<>(result.getOutputDirectoriesCount());
188-
for (OutputDirectory dir : result.getOutputDirectoriesList()) {
189-
SettableFuture<Void> dirDownload = SettableFuture.create();
190-
ListenableFuture<byte[]> protoDownload =
191-
retrier.executeAsync(() -> ctx.call(() -> downloadBlob(dir.getTreeDigest())));
192-
Futures.addCallback(
193-
protoDownload,
194-
new FutureCallback<byte[]>() {
195-
@Override
196-
public void onSuccess(byte[] b) {
197-
try {
198-
Tree tree = Tree.parseFrom(b);
199-
Map<Digest, Directory> childrenMap = new HashMap<>();
200-
for (Directory child : tree.getChildrenList()) {
201-
childrenMap.put(digestUtil.compute(child), child);
202-
}
203-
Path path = execRoot.getRelative(dir.getPath());
204-
fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap, ctx));
205-
dirDownload.set(null);
206-
} catch (IOException e) {
207-
dirDownload.setException(e);
185+
List<ListenableFuture<Void>> dirDownloads = new ArrayList<>(result.getOutputDirectoriesCount());
186+
for (OutputDirectory dir : result.getOutputDirectoriesList()) {
187+
SettableFuture<Void> dirDownload = SettableFuture.create();
188+
ListenableFuture<byte[]> protoDownload =
189+
retrier.executeAsync(() -> ctx.call(() -> downloadBlob(dir.getTreeDigest())));
190+
Futures.addCallback(
191+
protoDownload,
192+
new FutureCallback<byte[]>() {
193+
@Override
194+
public void onSuccess(byte[] b) {
195+
try {
196+
Tree tree = Tree.parseFrom(b);
197+
Map<Digest, Directory> childrenMap = new HashMap<>();
198+
for (Directory child : tree.getChildrenList()) {
199+
childrenMap.put(digestUtil.compute(child), child);
208200
}
201+
Path path = execRoot.getRelative(dir.getPath());
202+
fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap, ctx));
203+
dirDownload.set(null);
204+
} catch (IOException e) {
205+
dirDownload.setException(e);
209206
}
207+
}
210208

211-
@Override
212-
public void onFailure(Throwable t) {
213-
dirDownload.setException(t);
214-
}
215-
},
216-
MoreExecutors.directExecutor());
217-
dirDownloads.add(dirDownload);
218-
}
209+
@Override
210+
public void onFailure(Throwable t) {
211+
dirDownload.setException(t);
212+
}
213+
},
214+
MoreExecutors.directExecutor());
215+
dirDownloads.add(dirDownload);
216+
}
219217

220-
fileDownloads.addAll(downloadOutErr(result, outErr, ctx));
218+
// Subsequently we need to wait for *every* download to finish, even if we already know that
219+
// one failed. That's so that when exiting this method we can be sure that all downloads have
220+
// finished and don't race with the cleanup routine.
221+
// TODO(buchgr): Look into cancellation.
221222

222-
for (ListenableFuture<Void> dirDownload : dirDownloads) {
223-
// Block on all directory download futures, so that we can be sure that we have discovered
224-
// all file downloads and can subsequently safely iterate over the list of file downloads.
223+
IOException downloadException = null;
224+
try {
225+
fileDownloads.addAll(downloadOutErr(result, outErr, ctx));
226+
} catch (IOException e) {
227+
downloadException = e;
228+
}
229+
for (ListenableFuture<Void> dirDownload : dirDownloads) {
230+
// Block on all directory download futures, so that we can be sure that we have discovered
231+
// all file downloads and can subsequently safely iterate over the list of file downloads.
232+
try {
225233
getFromFuture(dirDownload);
234+
} catch (IOException e) {
235+
downloadException = downloadException == null ? e : downloadException;
226236
}
237+
}
227238

228-
for (FuturePathBooleanTuple download : fileDownloads) {
239+
for (FuturePathBooleanTuple download : fileDownloads) {
240+
try {
229241
getFromFuture(download.getFuture());
230242
if (download.getPath() != null) {
231243
download.getPath().setExecutable(download.isExecutable());
232244
}
245+
} catch (IOException e) {
246+
downloadException = downloadException == null ? e : downloadException;
233247
}
234-
} catch (IOException downloadException) {
248+
}
249+
250+
if (downloadException != null) {
235251
try {
236252
// Delete any (partially) downloaded output files, since any subsequent local execution
237253
// of this action may expect none of the output files to exist.
@@ -261,6 +277,11 @@ public void onFailure(Throwable t) {
261277
}
262278
}
263279

280+
@VisibleForTesting
281+
protected <T> T getFromFuture(ListenableFuture<T> f) throws IOException, InterruptedException {
282+
return Utils.getFromFuture(f);
283+
}
284+
264285
/** Tuple of {@code ListenableFuture, Path, boolean}. */
265286
private static class FuturePathBooleanTuple {
266287
private final ListenableFuture<?> future;

src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java

-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
package com.google.devtools.build.lib.remote;
1616

17-
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
18-
1917
import com.google.common.util.concurrent.FutureCallback;
2018
import com.google.common.util.concurrent.Futures;
2119
import com.google.common.util.concurrent.ListenableFuture;

src/test/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCacheTests.java

+153
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,46 @@
1515

1616
import static com.google.common.truth.Truth.assertThat;
1717
import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows;
18+
import static org.junit.Assert.fail;
1819

20+
import com.google.common.base.Throwables;
1921
import com.google.common.collect.ImmutableList;
22+
import com.google.common.util.concurrent.FutureCallback;
23+
import com.google.common.util.concurrent.Futures;
24+
import com.google.common.util.concurrent.ListenableFuture;
25+
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
26+
import com.google.common.util.concurrent.MoreExecutors;
27+
import com.google.common.util.concurrent.SettableFuture;
2028
import com.google.devtools.build.lib.actions.ExecException;
2129
import com.google.devtools.build.lib.clock.JavaClock;
2230
import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.UploadManifest;
31+
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
2332
import com.google.devtools.build.lib.remote.util.DigestUtil;
33+
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
34+
import com.google.devtools.build.lib.remote.util.Utils;
35+
import com.google.devtools.build.lib.util.io.FileOutErr;
2436
import com.google.devtools.build.lib.vfs.DigestHashFunction;
2537
import com.google.devtools.build.lib.vfs.FileSystem;
2638
import com.google.devtools.build.lib.vfs.FileSystemUtils;
2739
import com.google.devtools.build.lib.vfs.Path;
2840
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
2941
import com.google.devtools.remoteexecution.v1test.ActionResult;
42+
import com.google.devtools.remoteexecution.v1test.Command;
43+
import com.google.devtools.remoteexecution.v1test.Digest;
44+
import com.google.devtools.remoteexecution.v1test.OutputFile;
45+
import java.io.IOException;
46+
import java.io.OutputStream;
47+
import java.util.ArrayList;
48+
import java.util.Collection;
49+
import java.util.HashMap;
50+
import java.util.List;
51+
import java.util.Map;
52+
import java.util.concurrent.Executors;
53+
import java.util.concurrent.atomic.AtomicInteger;
54+
import javax.annotation.Nullable;
55+
import org.junit.AfterClass;
3056
import org.junit.Before;
57+
import org.junit.BeforeClass;
3158
import org.junit.Test;
3259
import org.junit.runner.RunWith;
3360
import org.junit.runners.JUnit4;
@@ -40,13 +67,25 @@ public class AbstractRemoteActionCacheTests {
4067
private Path execRoot;
4168
private final DigestUtil digestUtil = new DigestUtil(DigestHashFunction.SHA256);
4269

70+
private static ListeningScheduledExecutorService retryService;
71+
72+
@BeforeClass
73+
public static void beforeEverything() {
74+
retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
75+
}
76+
4377
@Before
4478
public void setUp() throws Exception {
4579
fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
4680
execRoot = fs.getPath("/execroot");
4781
execRoot.createDirectory();
4882
}
4983

84+
@AfterClass
85+
public static void afterEverything() {
86+
retryService.shutdownNow();
87+
}
88+
5089
@Test
5190
public void uploadSymlinkAsFile() throws Exception {
5291
ActionResult.Builder result = ActionResult.newBuilder();
@@ -92,4 +131,118 @@ public void uploadSymlinkInDirectory() throws Exception {
92131
.hasMessageThat()
93132
.contains("Only regular files and directories may be uploaded to a remote cache.");
94133
}
134+
135+
@Test
136+
public void onErrorWaitForRemainingDownloadsToComplete() throws Exception {
137+
// If one or more downloads of output files / directories fail then the code should
138+
// wait for all downloads to have been completed before it tries to clean up partially
139+
// downloaded files.
140+
141+
Path stdout = fs.getPath("/execroot/stdout");
142+
Path stderr = fs.getPath("/execroot/stderr");
143+
144+
Map<Digest, ListenableFuture<byte[]>> downloadResults = new HashMap<>();
145+
Path file1 = fs.getPath("/execroot/file1");
146+
Digest digest1 = digestUtil.compute("file1".getBytes("UTF-8"));
147+
downloadResults.put(digest1, Futures.immediateFuture("file1".getBytes("UTF-8")));
148+
Path file2 = fs.getPath("/execroot/file2");
149+
Digest digest2 = digestUtil.compute("file2".getBytes("UTF-8"));
150+
downloadResults.put(digest2, Futures.immediateFailedFuture(new IOException("download failed")));
151+
Path file3 = fs.getPath("/execroot/file3");
152+
Digest digest3 = digestUtil.compute("file3".getBytes("UTF-8"));
153+
downloadResults.put(digest3, Futures.immediateFuture("file3".getBytes("UTF-8")));
154+
155+
RemoteOptions options = new RemoteOptions();
156+
RemoteRetrier retrier = new RemoteRetrier(options, (e) -> false, retryService,
157+
Retrier.ALLOW_ALL_CALLS);
158+
List<ListenableFuture<?>> blockingDownloads = new ArrayList<>();
159+
AtomicInteger numSuccess = new AtomicInteger();
160+
AtomicInteger numFailures = new AtomicInteger();
161+
AbstractRemoteActionCache cache = new DefaultRemoteActionCache(options, digestUtil, retrier) {
162+
@Override
163+
public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
164+
SettableFuture<Void> result = SettableFuture.create();
165+
Futures.addCallback(downloadResults.get(digest), new FutureCallback<byte[]>() {
166+
@Override
167+
public void onSuccess(byte[] bytes) {
168+
numSuccess.incrementAndGet();
169+
try {
170+
out.write(bytes);
171+
out.close();
172+
result.set(null);
173+
} catch (IOException e) {
174+
result.setException(e);
175+
}
176+
}
177+
178+
@Override
179+
public void onFailure(Throwable throwable) {
180+
numFailures.incrementAndGet();
181+
result.setException(throwable);
182+
}
183+
}, MoreExecutors.directExecutor());
184+
return result;
185+
}
186+
187+
@Override
188+
protected <T> T getFromFuture(ListenableFuture<T> f)
189+
throws IOException, InterruptedException {
190+
blockingDownloads.add(f);
191+
return Utils.getFromFuture(f);
192+
}
193+
};
194+
195+
ActionResult result = ActionResult.newBuilder()
196+
.setExitCode(0)
197+
.addOutputFiles(OutputFile.newBuilder().setPath(file1.getPathString()).setDigest(digest1))
198+
.addOutputFiles(OutputFile.newBuilder().setPath(file2.getPathString()).setDigest(digest2))
199+
.addOutputFiles(OutputFile.newBuilder().setPath(file3.getPathString()).setDigest(digest3))
200+
.build();
201+
try {
202+
cache.download(result, execRoot, new FileOutErr(stdout, stderr));
203+
fail("Expected IOException");
204+
} catch (IOException e) {
205+
assertThat(numSuccess.get()).isEqualTo(2);
206+
assertThat(numFailures.get()).isEqualTo(1);
207+
assertThat(blockingDownloads).hasSize(3);
208+
assertThat(Throwables.getRootCause(e)).hasMessageThat().isEqualTo("download failed");
209+
}
210+
}
211+
212+
private static class DefaultRemoteActionCache extends AbstractRemoteActionCache {
213+
214+
public DefaultRemoteActionCache(RemoteOptions options,
215+
DigestUtil digestUtil, Retrier retrier) {
216+
super(options, digestUtil, retrier);
217+
}
218+
219+
@Override
220+
public void ensureInputsPresent(TreeNodeRepository repository, Path execRoot, TreeNode root,
221+
Command command) throws IOException, InterruptedException {
222+
throw new UnsupportedOperationException();
223+
}
224+
225+
@Nullable
226+
@Override
227+
ActionResult getCachedActionResult(ActionKey actionKey)
228+
throws IOException, InterruptedException {
229+
throw new UnsupportedOperationException();
230+
}
231+
232+
@Override
233+
void upload(ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr,
234+
boolean uploadAction) throws ExecException, IOException, InterruptedException {
235+
throw new UnsupportedOperationException();
236+
}
237+
238+
@Override
239+
protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
240+
throw new UnsupportedOperationException();
241+
}
242+
243+
@Override
244+
public void close() {
245+
throw new UnsupportedOperationException();
246+
}
247+
}
95248
}

0 commit comments

Comments
 (0)