Skip to content

Commit

Permalink
Do not add exchange locations for finished tasks
Browse files Browse the repository at this point in the history
This prevents query hangs when using writer scaling as it can create new
tasks with exchange locations referencing tasks that are already expired
(and thus will be auto-created as new tasks that will never complete).
  • Loading branch information
electrum committed Dec 29, 2017
1 parent 886cdf9 commit 2ec80be
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public final class SqlStageExecution
private final Set<TaskId> finishedTasks = newConcurrentHashSet();
private final AtomicBoolean splitsScheduled = new AtomicBoolean();

private final Multimap<PlanNodeId, URI> exchangeLocations = HashMultimap.create();
private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create();
private final Set<PlanNodeId> completeSources = newConcurrentHashSet();
private final Set<PlanFragmentId> completeSourceFragments = newConcurrentHashSet();

Expand Down Expand Up @@ -231,19 +231,20 @@ public StageInfo getStageInfo()
ImmutableList::of);
}

public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<URI> exchangeLocations, boolean noMoreExchangeLocations)
public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> sourceTasks, boolean noMoreExchangeLocations)
{
requireNonNull(fragmentId, "fragmentId is null");
requireNonNull(exchangeLocations, "exchangeLocations is null");
requireNonNull(sourceTasks, "sourceTasks is null");

RemoteSourceNode remoteSource = exchangeSources.get(fragmentId);
checkArgument(remoteSource != null, "Unknown remote source %s. Known sources are %s", fragmentId, exchangeSources.keySet());

this.exchangeLocations.putAll(remoteSource.getId(), exchangeLocations);
this.sourceTasks.putAll(remoteSource.getId(), sourceTasks);

for (RemoteTask task : getAllTasks()) {
ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder();
for (URI exchangeLocation : exchangeLocations) {
for (RemoteTask sourceTask : sourceTasks) {
URI exchangeLocation = sourceTask.getTaskStatus().getSelf();
newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), exchangeLocation));
}
task.addSplits(newSplits.build());
Expand Down Expand Up @@ -349,9 +350,13 @@ private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<

ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
initialSplits.putAll(sourceSplits);
for (Entry<PlanNodeId, URI> entry : exchangeLocations.entries()) {
initialSplits.put(entry.getKey(), createRemoteSplitFor(taskId, entry.getValue()));
}

sourceTasks.forEach((planNodeId, task) -> {
TaskStatus status = task.getTaskStatus();
if (status.getState() != TaskState.FINISHED) {
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
}
});

OutputBuffers outputBuffers = this.outputBuffers.get();
checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public SqlQueryScheduler(QueryStateMachine queryStateMachine,

OutputBufferId rootBufferId = Iterables.getOnlyElement(rootOutputBuffers.getBuffers().keySet());
List<SqlStageExecution> stages = createStages(
(fragmentId, exchangeLocations, noMoreExchangeLocations) -> updateQueryOutputLocations(queryStateMachine, rootBufferId, exchangeLocations, noMoreExchangeLocations),
(fragmentId, tasks, noMoreExchangeLocations) -> updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations),
new AtomicInteger(),
locationFactory,
plan.withBucketToPartition(Optional.of(new int[1])),
Expand Down Expand Up @@ -217,9 +217,10 @@ else if (queryStateMachine.getQueryState() == QueryState.STARTING) {
}
}

private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBufferId rootBufferId, Set<URI> exchangeLocations, boolean noMoreExchangeLocations)
private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBufferId rootBufferId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations)
{
Set<URI> bufferLocations = exchangeLocations.stream()
Set<URI> bufferLocations = tasks.stream()
.map(task -> task.getTaskStatus().getSelf())
.map(location -> uriBuilderFrom(location).appendPath("results").appendPath(rootBufferId.toString()).build())
.collect(toImmutableSet());
queryStateMachine.updateOutputLocations(bufferLocations, noMoreExchangeLocations);
Expand Down Expand Up @@ -573,7 +574,7 @@ private static ListenableFuture<?> whenAllStages(Collection<SqlStageExecution> s

private interface ExchangeLocationsConsumer
{
void addExchangeLocations(PlanFragmentId fragmentId, Set<URI> exchangeLocations, boolean noMoreExchangeLocations);
void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations);
}

private static class StageLinkage
Expand Down Expand Up @@ -637,10 +638,7 @@ public void processScheduleResults(StageState newState, Set<RemoteTask> newTasks
}

// Add an exchange location to the parent stage for each new task
Set<URI> newExchangeLocations = newTasks.stream()
.map(task -> task.getTaskStatus().getSelf())
.collect(toImmutableSet());
parent.addExchangeLocations(currentStageFragmentId, newExchangeLocations, noMoreTasks);
parent.addExchangeLocations(currentStageFragmentId, newTasks, noMoreTasks);

if (!childOutputBufferManagers.isEmpty()) {
// Add an output buffer to the child stages for each new task
Expand Down

0 comments on commit 2ec80be

Please sign in to comment.