Skip to content

Commit

Permalink
Fix for output rate stragglers
Browse files Browse the repository at this point in the history
  • Loading branch information
kayousterhout committed Sep 23, 2014
1 parent 6dee69c commit 4e8d09a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
1 change: 1 addition & 0 deletions parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def write_straggler_info(self, job_name, prefix):
scheduler_and_read_straggler_count = sum([x[0] for x in scheduler_and_read_stragger_info])
scheduler_and_read_straggler_time = sum([x[1] for x in scheduler_and_read_stragger_info])

# Important that JIT stragglers be classified last!
jit_straggler_info = [s.jit_stragglers() for s in self.stages.values()]
jit_straggler_count = sum([x[0] for x in jit_straggler_info])
jit_straggler_time = sum([x[1] for x in jit_straggler_info])
Expand Down
28 changes: 26 additions & 2 deletions stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ def output_progress_rate_stragglers(self):
"Returns stats about stragglers that can be attributed to output data size. """
def progress_rate_based_on_output(task):
return (task.runtime() / (task.shuffle_mb_written + task.output_mb))
non_zero_output_tasks = [t for t in self.get_tasks_with_non_zero_input() \
if t.shuffle_mb_written + t.output_mb > 0]
new_progress_rates = [progress_rate_based_on_output(t) \
for t in non_zero_output_tasks]
median_new_progress_rate = numpy.median(new_progress_rates)
attributable_stragglers = []
for t in self.get_progress_rate_stragglers():
if (t.shuffle_mb_written + t.output_mb > 0 and
progress_rate_based_on_output(t) < 1.5 * median_new_progress_rate):
attributable_stragglers.append(t)
t.straggler_behavior_explained = True
return len(attributable_stragglers), sum([t.runtime() for t in attributable_stragglers])

attributable_stragglers = self.get_attributable_stragglers(progress_rate_based_on_output)
straggler_time = sum([t.runtime() for t in attributable_stragglers])
return len(attributable_stragglers), straggler_time
Expand Down Expand Up @@ -196,6 +209,11 @@ def progress_rate_wo_shuffle_write(task):
return self.get_attributable_stragglers_stats(progress_rate_wo_shuffle_write)

def jit_stragglers(self):
""" THIS SHOULD BE CALLED AFTER ALL OTHER STRAGGLER FUNCTIONS.
Tasks won't be classified as JIT stragglers if they have already
been classified as another kind of straggler.
"""
executor_to_task_finish_times = collections.defaultdict(list)
for task in self.tasks:
executor_to_task_finish_times[task.executor].append(task.finish_time)
Expand All @@ -211,15 +229,21 @@ def jit_stragglers(self):
def progress_rate(task):
return task.runtime() * 1.0 / task.input_size_mb()

# For JIT effects, only look at compute time (other times aren't effected by JIT).
def compute_progress_rate(task):
return task.compute_time() * 1.0 / task.input_size_mb()

median_task_progress_rate = numpy.median([progress_rate(t)
for t in self.get_tasks_with_non_zero_input()])
median_virgin_task_progress_rate = numpy.median([progress_rate(t) for t in virgin_tasks])
median_virgin_task_progress_rate = numpy.median([compute_progress_rate(t) for t in virgin_tasks])
jit_stragglers = 0
total_time = 0
print "Median virgin rate", median_virgin_task_progress_rate
for task in virgin_tasks:
print compute_progress_rate(task)
task_progress_rate = progress_rate(task)
if task_progress_rate >= 1.5*median_task_progress_rate:
if task_progress_rate < 1.5*median_virgin_task_progress_rate:
if not task.straggler_behavior_explained and compute_progress_rate(task) < 1.5*median_virgin_task_progress_rate:
jit_stragglers += 1
total_time += task.runtime()
task.straggler_behavior_explained = True
Expand Down

0 comments on commit 4e8d09a

Please sign in to comment.