Skip to content

Commit b46ac10

Browse files
committed
Avoid capturing SnapshotsInProgress$Entry in queue (elastic#88707)
Today each time there's shards to snapshot we enqueue a lambda which captures the current `SnapshotsInProgress$Entry`. This is a pretty heavyweight object, possibly several MB in size, most of which is not necessary to capture, and with concurrent snapshots across thousands of shards we may enqueue many hundreds of slightly different such objects. With this commit we compute a more efficient representation of the work to be done by each task in the queue instead. Relates elastic#77466
1 parent c7f4454 commit b46ac10

File tree

2 files changed

+64
-47
lines changed

2 files changed

+64
-47
lines changed

docs/changelog/88707.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 88707
2+
summary: Avoid capturing `SnapshotsInProgress$Entry` in queue
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.transport.TransportService;
5454

5555
import java.io.IOException;
56+
import java.util.ArrayList;
5657
import java.util.HashMap;
5758
import java.util.Iterator;
5859
import java.util.List;
@@ -226,7 +227,24 @@ private void startNewSnapshots(List<SnapshotsInProgress.Entry> snapshotsInProgre
226227
}
227228
if (startedShards != null && startedShards.isEmpty() == false) {
228229
shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards);
229-
startNewShards(entry, startedShards);
230+
231+
final List<Runnable> shardSnapshotTasks = new ArrayList<>(startedShards.size());
232+
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : startedShards.entrySet()) {
233+
final ShardId shardId = shardEntry.getKey();
234+
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
235+
final IndexId indexId = entry.indices().get(shardId.getIndexName());
236+
assert indexId != null;
237+
assert SnapshotsService.useShardGenerations(entry.version())
238+
|| ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null
239+
: "Found non-null, non-numeric shard generation ["
240+
+ snapshotStatus.generation()
241+
+ "] for snapshot with old-format compatibility";
242+
shardSnapshotTasks.add(
243+
newShardSnapshotTask(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version())
244+
);
245+
}
246+
247+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> shardSnapshotTasks.forEach(Runnable::run));
230248
}
231249
} else if (entryState == State.ABORTED) {
232250
// Abort all running shards for this snapshot
@@ -249,53 +267,47 @@ private void startNewSnapshots(List<SnapshotsInProgress.Entry> snapshotsInProgre
249267
}
250268
}
251269

252-
private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexShardSnapshotStatus> startedShards) {
253-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
254-
final Snapshot snapshot = entry.snapshot();
255-
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : startedShards.entrySet()) {
256-
final ShardId shardId = shardEntry.getKey();
257-
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
258-
final IndexId indexId = entry.indices().get(shardId.getIndexName());
259-
assert indexId != null;
260-
assert SnapshotsService.useShardGenerations(entry.version())
261-
|| ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null
262-
: "Found non-null, non-numeric shard generation ["
263-
+ snapshotStatus.generation()
264-
+ "] for snapshot with old-format compatibility";
265-
snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(), new ActionListener<>() {
266-
@Override
267-
public void onResponse(ShardSnapshotResult shardSnapshotResult) {
268-
final ShardGeneration newGeneration = shardSnapshotResult.getGeneration();
269-
assert newGeneration != null;
270-
assert newGeneration.equals(snapshotStatus.generation());
271-
if (logger.isDebugEnabled()) {
272-
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
273-
logger.debug(
274-
"[{}][{}] completed snapshot to [{}] with status [{}] at generation [{}]",
275-
shardId,
276-
snapshot,
277-
snapshot.getRepository(),
278-
lastSnapshotStatus,
279-
snapshotStatus.generation()
280-
);
281-
}
282-
notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult);
283-
}
270+
private Runnable newShardSnapshotTask(
271+
final ShardId shardId,
272+
final Snapshot snapshot,
273+
final IndexId indexId,
274+
final Map<String, Object> userMetadata,
275+
final IndexShardSnapshotStatus snapshotStatus,
276+
final Version entryVersion
277+
) {
278+
// separate method to make sure this lambda doesn't capture any heavy local objects like a SnapshotsInProgress.Entry
279+
return () -> snapshot(shardId, snapshot, indexId, userMetadata, snapshotStatus, entryVersion, new ActionListener<>() {
280+
@Override
281+
public void onResponse(ShardSnapshotResult shardSnapshotResult) {
282+
final ShardGeneration newGeneration = shardSnapshotResult.getGeneration();
283+
assert newGeneration != null;
284+
assert newGeneration.equals(snapshotStatus.generation());
285+
if (logger.isDebugEnabled()) {
286+
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
287+
logger.debug(
288+
"[{}][{}] completed snapshot to [{}] with status [{}] at generation [{}]",
289+
shardId,
290+
snapshot,
291+
snapshot.getRepository(),
292+
lastSnapshotStatus,
293+
snapshotStatus.generation()
294+
);
295+
}
296+
notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult);
297+
}
284298

285-
@Override
286-
public void onFailure(Exception e) {
287-
final String failure;
288-
if (e instanceof AbortedSnapshotException) {
289-
failure = "aborted";
290-
logger.debug(() -> format("[%s][%s] aborted shard snapshot", shardId, snapshot), e);
291-
} else {
292-
failure = summarizeFailure(e);
293-
logger.warn(() -> format("[%s][%s] failed to snapshot shard", shardId, snapshot), e);
294-
}
295-
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
296-
notifyFailedSnapshotShard(snapshot, shardId, failure, snapshotStatus.generation());
297-
}
298-
});
299+
@Override
300+
public void onFailure(Exception e) {
301+
final String failure;
302+
if (e instanceof AbortedSnapshotException) {
303+
failure = "aborted";
304+
logger.debug(() -> format("[%s][%s] aborted shard snapshot", shardId, snapshot), e);
305+
} else {
306+
failure = summarizeFailure(e);
307+
logger.warn(() -> format("[%s][%s] failed to snapshot shard", shardId, snapshot), e);
308+
}
309+
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
310+
notifyFailedSnapshotShard(snapshot, shardId, failure, snapshotStatus.generation());
299311
}
300312
});
301313
}

0 commit comments

Comments
 (0)