Skip to content

Commit

Permalink
Last minute fixes before deadline
Browse files Browse the repository at this point in the history
  • Loading branch information
kayousterhout committed Feb 10, 2015
1 parent 4e8d09a commit f3b53d9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 4 deletions.
46 changes: 46 additions & 0 deletions all_utilization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import numpy

import os
import sys

import parse_logs

def plot_cdf(values, filename):
f = open(filename, "w")

def write_data_to_file(data, file_handle):
stringified_data = [str(x) for x in data]
stringified_data += "\n"
file_handle.write("\t".join(stringified_data))

def main(argv):
disk_utilizations = []
cpu_utilizations = []
network_utilizations = []
dirname = argv[0]
for filename in os.listdir(dirname):
full_name = os.path.join(dirname, filename)
if os.path.isfile(full_name) and filename.endswith("job_log"):
print "Reading %s" % filename
analyzer = parse_logs.Analyzer(full_name)

for (id, stage) in analyzer.stages.iteritems():
for task in stage.tasks:
cpu_utilizations.append(task.total_cpu_utilization / 8.)
network_utilizations.append(task.network_bytes_transmitted_ps / (1000*1000*1000))
network_utilizations.append(task.network_bytes_received_ps / (1000*1000*1000))
for name, block_device_numbers in task.disk_utilization.iteritems():
if name in ["xvdb", "xvdf"]:
disk_utilizations.append(block_device_numbers[0])

output_filename = os.path.join(dirname, "cpu_disk_utilization_cdf")
f = open(output_filename, "w")
print max(network_utilizations)
for percent in range(100):
f.write("%s\t%s\t%s\t%s\n" % (percent / 100., numpy.percentile(cpu_utilizations, percent),
numpy.percentile(disk_utilizations, percent),
numpy.percentile(network_utilizations, percent)))
f.close()

if __name__ == "__main__":
main(sys.argv[1:])
4 changes: 3 additions & 1 deletion breakeven.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
class Query:
def __init__(self, filename):
analyzer = parse_logs.Analyzer(filename)
self.total_disk_input_mb = 0
self.total_input_mb = 0
self.total_shuffle_write_mb = 0
self.total_shuffle_read_mb = 0
Expand All @@ -23,6 +24,7 @@ def __init__(self, filename):
self.total_runtime = 0
self.total_cpu_time = 0
for stage in analyzer.stages.values():
self.total_disk_input_mb += sum([t.input_mb for t in stage.tasks if t.input_read_method != "Memory"])
self.total_input_mb += sum([t.input_mb for t in stage.tasks])
self.total_shuffle_write_mb += sum([t.shuffle_mb_written for t in stage.tasks])
self.total_shuffle_read_mb += sum([t.remote_mb_read + t.local_mb_read for t in stage.tasks if t.has_fetch])
Expand Down Expand Up @@ -71,7 +73,7 @@ def main(argv):
# Compute disk breakeven speed (in MB/s).
# Shuffled data has to be written to disk and later read back, so multiply by 2.
# Output data has to be written to 3 disks.
total_disk_mb = query.total_input_mb + query.total_shuffle_write_mb + query.total_shuffle_read_mb + 3 * query.total_output_mb
total_disk_mb = query.total_disk_input_mb + query.total_shuffle_write_mb + query.total_shuffle_read_mb + 3 * query.total_output_mb
# To compute the breakeven speed, need to normalize for the number of disks per machine (2) and
# number of cores (8).
disk_breakeven_speeds.append((total_disk_mb / 2.) / (query.total_cpu_time / (8 * 1000.)))
Expand Down
3 changes: 3 additions & 0 deletions parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ def median_progress_rate_speedup(self, prefix):
for start, finish in start_finish_times]
total_median_progress_rate_runtime += no_stragglers_runtime
all_start_finish_times.append(start_finish_times_adjusted)
print "No stragglers runtime: ", no_stragglers_runtime
print "MAx concurrency: ", concurrency.get_max_concurrency(stage.tasks)

if len(runtimes_for_combined_stages) > 0:
no_stragglers_runtime, start_finish_times = simulate.simulate(
Expand All @@ -294,6 +296,7 @@ def median_progress_rate_speedup(self, prefix):
all_start_finish_times.append(start_finish_times_adjusted)

self.write_simulated_waterfall(all_start_finish_times, "%s_sim_median_progress_rate" % prefix)
print self.get_simulated_runtime
return total_median_progress_rate_runtime * 1.0 / self.get_simulated_runtime()

def no_stragglers_perfect_parallelism_speedup(self):
Expand Down
12 changes: 9 additions & 3 deletions stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def __str__(self):
self.traditional_stragglers(), self.progress_rate_stragglers()[0],
self.scheduler_delay_stragglers()[0], self.hdfs_read_stragglers()[0],
self.hdfs_read_and_scheduler_delay_stragglers()[0], self.gc_stragglers()[0],
self.network_stragglers()[0], self.jit_stragglers()[0],
# Do not compute the JIT stragglers here! Screws up the calculation.
self.network_stragglers()[0], -1,
self.output_progress_rate_stragglers()[0]))

def verbose_str(self):
Expand Down Expand Up @@ -93,7 +94,6 @@ def traditional_stragglers_explained_by_progress_rate(self):
if task.runtime() >= 1.5*median_task_duration and task.input_size_mb() > 0:
progress_rate = task.runtime() * 1.0 / task.input_size_mb()
if progress_rate < 1.5*median_progress_rate:
task.straggler_behavior_explained = True
progress_rate_stragglers += 1
progress_rate_stragglers_total_time += task.runtime()
return progress_rate_stragglers, progress_rate_stragglers_total_time
Expand Down Expand Up @@ -259,7 +259,13 @@ def task_runtimes_with_median_progress_rate(self):
progress_rates = [t.runtime() * 1.0 / t.input_size_mb() for t in self.tasks
if t.input_size_mb() > 0]
median_progress_rate = numpy.median(progress_rates)
runtimes = [t.input_size_mb() * median_progress_rate for t in self.tasks]
def new_runtime(task):
if t.input_size_mb() > 0 and t.runtime() * 1.0 / t.input_size_mb() > median_progress_rate:
return t.input_size_mb() * median_progress_rate
return t.runtime()
runtimes = [new_runtime(t) for t in self.tasks]
print runtimes
print [t.runtime() for t in self.tasks]
return runtimes

def input_mb(self):
Expand Down

0 comments on commit f3b53d9

Please sign in to comment.