Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions parallel-orch/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def kill(self):
self.exec_ctxt.process.kill()

def reset_to_ready(self):
util.perf_log_start("Node", "Reset to Ready", self.cnid)
assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING,
NodeState.SPECULATED]

Expand All @@ -289,6 +290,7 @@ def reset_to_ready(self):

# TODO: make this more sophisticated
if self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]:
util.perf_log("Node", "EXE", self.cnid, optional_message="Killed")
self.kill()

# Probably delete them from tmpfs too
Expand All @@ -300,23 +302,31 @@ def reset_to_ready(self):
self.exec_ctxt = None
self.exec_result = None
self.state = NodeState.READY

util.perf_log("Node", "Reset to Ready", self.cnid)


def start_executing(self, env_file):
assert self.state == NodeState.READY
util.perf_log_start("Node", "EXE", self.cnid)
self.start_command(env_file)
self.state = NodeState.EXECUTING

def start_spec_executing(self, env_file):
assert self.state == NodeState.READY
util.perf_log_start("Node", "EXE", self.cnid)
self.start_command(env_file, speculate=True)
self.state = NodeState.SPEC_EXECUTING

def commit_frontier_execution(self):
assert self.state == NodeState.EXECUTING
self.exec_result = ExecResult(self.exec_ctxt.process.pid, self.exec_ctxt.process.returncode)
self.gather_fs_actions()

util.perf_log_start("Node", "Commit", self.cnid)
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
util.perf_log("Node", "Commit", self.cnid)

self.state = NodeState.COMMITTED

def finish_spec_execution(self):
Expand All @@ -328,7 +338,11 @@ def finish_spec_execution(self):

def commit_speculated(self):
assert self.state == NodeState.SPECULATED

util.perf_log_start("Node", "Commit", self.cnid)
executor.commit_workspace(self.exec_ctxt.sandbox_dir)
util.perf_log("Node", "Commit", self.cnid)

self.state = NodeState.COMMITTED

def transition_from_stopped_to_executing(self, env_file=None):
Expand Down Expand Up @@ -370,6 +384,7 @@ def get_rw_set(self):
return self.rwset

def has_env_conflict_with(self, other_env) -> bool:
util.perf_log_start("Node", "Env Dependency Resolution", self.cnid)
# Early return if paths are the same
if self.exec_ctxt.pre_env_file == other_env:
return False
Expand Down Expand Up @@ -416,6 +431,7 @@ def parse_env(content):
logging.critical(f"Variable {key} differs: node environment has {node_env_vars[key]}, other has {other_env_vars[key]}")
conflict_exists = True

util.perf_log("Node", "Env Dependency Resolution", self.cnid)
return conflict_exists


Expand Down
11 changes: 11 additions & 0 deletions parallel-orch/partial_program_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,19 @@ def valid(self):
return True

def fetch_fs_actions(self):
util.perf_log_start("PPO", "Fetch FS Actions")
for node in self.get_executing_normal_and_spec_nodes():
node.gather_fs_actions()
util.perf_log("PPO", "Fetch FS Actions")

def _has_fs_deps(self, concrete_node_id: ConcreteNodeId):
util.perf_log_start("PPO", "FS Dep Checking", concrete_node_id)
node_of_interest : ConcreteNode = self.get_concrete_node(concrete_node_id)
for nid in self.to_be_resolved[concrete_node_id]:
node: ConcreteNode = self.get_concrete_node(nid)
if node.get_rw_set().has_conflict(node_of_interest.get_rw_set()):
return True
util.perf_log("PPO", "FS Dep Checking", concrete_node_id)
return False

# TODO: It's currently designed this way to avoid reading trace file all the time
Expand All @@ -188,6 +192,7 @@ def schedule_spec_work(self, concrete_node_id: ConcreteNodeId, env_file: str):
def handle_complete(self, concrete_node_id: ConcreteNodeId, has_pending_wait: bool,
current_env: str):
event_log(f"handle_complete {concrete_node_id}")
util.perf_log("Node", "EXE", concrete_node_id)
node = self.get_concrete_node(concrete_node_id)
# TODO: complete the state matching
if node.is_executing():
Expand Down Expand Up @@ -217,6 +222,9 @@ def reset_succeeding_nodes(self, node_id: NodeId, env_file: str):
# # uncommitted_node.start_spec_executing(env_file)

def adding_new_basic_block(self, concrete_node_id: ConcreteNodeId):

util.perf_log_start("PPO", "Adding New Basic Block", concrete_node_id)

basic_block = self.hsprog.find_basic_block(concrete_node_id.node_id)
if len(self.concrete_nodes) != 0:
prev_concrete_node_id = next(reversed(self.concrete_nodes))
Expand All @@ -237,6 +245,9 @@ def adding_new_basic_block(self, concrete_node_id: ConcreteNodeId):
self.prev_concrete_node[new_concrete_node_id] = []
prev_concrete_node_id = new_concrete_node_id
assert concrete_node_id in self.concrete_nodes

util.perf_log("PPO", "Adding New Basic Block", concrete_node_id)


def finish_wait_unsafe(self, concrete_node_id: ConcreteNodeId):
node = self.concrete_nodes[concrete_node_id]
Expand Down
4 changes: 4 additions & 0 deletions parallel-orch/scheduler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,15 @@ def __init__(self, socket_file):
self.partial_program_order = None

def handle_init(self, input_cmd: str):
util.perf_log_start("Scheduler", "Init")

assert(input_cmd.startswith("Init"))
partial_order_file = input_cmd.split(":")[1].rstrip()
logging.debug(f'Scheduler: Received partial_order_file: {partial_order_file}')
self.partial_program_order = util.parse_partial_program_order_from_file(partial_order_file)
util.debug_log(str(self.partial_program_order.hsprog))

util.perf_log("Scheduler", "Init")

def handle_wait(self, input_cmd: str, connection):
concrete_node_id, env_file = self.__parse_wait(input_cmd)
Expand Down
58 changes: 29 additions & 29 deletions parallel-orch/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import os
import socket
import subprocess
import tempfile
import time
import re
Expand All @@ -13,6 +12,7 @@
from partial_program_order import PartialProgramOrder

DEBUG_LOG = '[DEBUG_LOG] '
PERFORMANCE_LOG = '[PERFORMANCE_LOG] '

def debug_log(s):
logging.debug(DEBUG_LOG + s)
Expand Down Expand Up @@ -112,40 +112,40 @@ def compare_env_strings(file1_content, file2_content):
dict2 = parse_env_string_to_dict(file2_content)
return compare_dicts(dict1, dict2)

def log_time_delta_from_start(module: str, action: str, node=None):
logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time From start:{to_milliseconds_str(time.time() - config.START_TIME)}")
# Generate a key for the named timestamp.
# If a custom key is set, it overrides all other arguments. Otherwise, use a combination of all non-None arguments.
def get_timestamp_key(module: str, action: str, node=None):
parts = [module, action] + ([str(node)] if node is not None else [])
return '|'.join(parts)

def set_named_timestamp(action: str, node=None, key=None):
if key is None:
key = f"{action}{',' + str(node) if node is not None else ''}"
config.NAMED_TIMESTAMPS[key] = time.time()
# Convert seconds to a formatted milliseconds string.
def to_milliseconds_str(seconds: float) -> str:
return f"{seconds * 1000:.3f}ms"

def invalidate_named_timestamp(action: str, node=None, key=None):
if key is None:
key = f"{action}{',' + str(node) if node is not None else ''}"
del config.NAMED_TIMESTAMPS[key]
# Log the time delta from the start for a given module and action.
def perf_log_start_only(module: str, action: str, node=None, optional_message=None):
key = get_timestamp_key(module, action, node)
logging.info("%s %s||Time From start:%s||%s", PERFORMANCE_LOG, key, to_milliseconds_str(time.time() - config.START_TIME), optional_message if optional_message is not None else "")

def log_time_delta_from_start_and_set_named_timestamp(module: str, action: str, node=None, key=None):
try:
set_named_timestamp(action, node, key)
logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time from start:{to_milliseconds_str(time.time() - config.START_TIME)}")
except KeyError:
logging.error(f"Named timestamp {key} already exists")
# Set a named timestamp.
def set_named_timestamp(module: str, action: str, node=None):
key = get_timestamp_key(module, action, node)
config.NAMED_TIMESTAMPS[key] = time.time()

def log_time_delta_from_named_timestamp(module: str, action: str, node=None, key=None, invalidate=True):
# Log the time delta from the start and set a named timestamp.
def perf_log_start(module: str, action: str, node=None, optional_message=None):
key = get_timestamp_key(module, action, node)
set_named_timestamp(module, action, node)
logging.info("%s %s||Time from start:%s||%s", PERFORMANCE_LOG, key, to_milliseconds_str(time.time() - config.START_TIME), optional_message if optional_message is not None else "")

# Log the time delta from a named timestamp.
def perf_log(module: str, action: str, node=None, optional_message=None):
key = get_timestamp_key(module, action, node)
try:
if key is None:
key = f"{action}{',' + str(node) if node is not None else ''}"
logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time from start:{to_milliseconds_str(time.time() - config.START_TIME)}|Step time:{to_milliseconds_str(time.time() - config.NAMED_TIMESTAMPS[key])}")
if invalidate:
invalidate_named_timestamp(action, node, key)
step_time = time.time() - config.NAMED_TIMESTAMPS[key]
logging.info("%s %s||Time from start:%s||Step time:%s||%s", PERFORMANCE_LOG, key, to_milliseconds_str(time.time() - config.START_TIME), to_milliseconds_str(step_time), optional_message if optional_message is not None else "")
except KeyError:
logging.error(f"Named timestamp {key} does not exist")

def to_milliseconds_str(seconds: float) -> str:
return f"{seconds * 1000:.3f}ms"


logging.error("Named timestamp %s does not exist", key)

def get_all_child_processes(pid):
try:
Expand Down
98 changes: 93 additions & 5 deletions report/result_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import hashlib
import numpy as np
import sys
from datetime import datetime
from pprint import pprint
from datetime import datetime

class ResultAnalyzer:
@staticmethod
def process_results(orch_log):
log_lines = orch_log.split("\n")
prog_blocks = []
performance_data = {}
current_block = []
block_start_time = None

Expand All @@ -18,20 +22,92 @@ def process_results(orch_log):
time_str = parts[1]
log_content = parts[2].strip()
if log_content == "[PROG_LOG]":
# Start of a new block
if current_block:
prog_blocks.append((block_start_time, current_block))
current_block = []
block_start_time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S,%f")
else:
# Continuing the current block
state, node_id, command = log_content.replace("[PROG_LOG] ", "").split(",", 2)
current_block.append((node_id.strip(), state.strip()))
elif line.startswith("INFO|") and "[PERFORMANCE_LOG]" in line:
parts = line.split("]")[1].strip().split("||")
unprocessed_time = line.split("]")[0].split("|")[1]
log_time = datetime.strptime(unprocessed_time, "%Y-%m-%d %H:%M:%S,%f")
key = parts[0].strip()
optional_parts = parts[1:]
time_from_start = ""
step_time = ""
message = ""

for part in optional_parts:
if part.startswith("Time from start:"):
time_from_start = part.replace("Time from start:", "").strip()
elif part.startswith("Step time:"):
step_time = part.replace("Step time:", "").strip()
else:
message = part.strip()

if key not in performance_data:
performance_data[key] = []
performance_data[key].append({
"log_time": log_time,
"time_from_start": time_from_start,
"step_time": step_time,
"message": message
})

# Process to create summary
performance_summary = {}
for key, entries in performance_data.items():
entries.sort(key=lambda x: x['log_time']) # Ensure chronological order
last_initial_entry = None
if key not in performance_summary:
performance_summary[key] = []

for entry in entries:
if entry['step_time']: # This entry is a second term of a pair
if last_initial_entry:
start_time = last_initial_entry['log_time']
end_time = entry['log_time']
step_time = entry['step_time']
performance_summary[key].append({
"start_time": start_time,
"end_time": end_time,
"step_time": float(step_time.strip("ms")),
"time_from_start": float(entry['time_from_start'].strip("ms")),
"message": entry['message']
})
last_initial_entry = None # Reset for the next pair
else:
# This entry is a potential first term of a pair, keep it and wait for its pair
if last_initial_entry:
# Keep the last of consecutive initial terms
performance_summary[key].append({
"start_time": last_initial_entry['log_time'],
"end_time": None,
"step_time": "",
"time_from_start": last_initial_entry['time_from_start'],
"message": last_initial_entry['message']
})
last_initial_entry = entry

# Handle the last unpaired initial entry, if any
if last_initial_entry:
performance_summary[key].append({
"start_time": last_initial_entry['log_time'],
"end_time": None,
"step_time": "",
"time_from_start": last_initial_entry['time_from_start'],
"message": last_initial_entry['message']
})

# Append the last block if not empty
if current_block:
prog_blocks.append((block_start_time, current_block))

return prog_blocks
return prog_blocks, performance_summary



@staticmethod
def compare_results(bash_output, orch_output, max_lines=1000):
Expand All @@ -51,4 +127,16 @@ def compare_results(bash_output, orch_output, max_lines=1000):
if hash_value not in bash_hashes:
diffs.append(f'+ {line}')

return diffs
return diffs


def __main__():
# Example usage

with open(sys.argv[1], "r") as file:
bash_output = file.read()
pprint(ResultAnalyzer.process_results(bash_output)[1])
pprint([len(x) for x in ResultAnalyzer.process_results(bash_output)[1].values()])

if __name__ == "__main__":
__main__()