Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Match charm messages #141

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
39 changes: 39 additions & 0 deletions pipit/tests/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,45 @@ def test_comm_by_process(data_dir, ping_pong_otf2_trace):
assert counts.loc[1]["Received"] == 8


def test_match_charm_messages(ping_pong_projections_trace):
trace = Trace.from_projections(str(ping_pong_projections_trace))
trace._match_charm_messages()

df = trace.events

receive_events = df[df["Attributes"].apply(
lambda x: False if not x else ("Entry Type" in x) and (x["Entry Type"] == "Processing")
)]

# Filter out unmatched receive events
receive_events = receive_events.loc[receive_events["_matching_message"].notnull()]

receive_indices = list(receive_events.index)
receive_matching_message = list(receive_events["_matching_message"])
receive_process = list(receive_events["Process"])


for i in range(len(receive_matching_message)):
receive_index = receive_indices[i]
send_index = receive_matching_message[i]

if send_index != 200:
corresponding_send = df.iloc[send_index]

# Ensure that, for each receive event, the corresponding send event has
# that receive event index in its matching message column
#
# Testing if the distance between the two DataFrame indices is <= 17 accounts
# accounts for some weirdness in the trace where some message receives are divided into
# several enter events. Most distances will be 0 (equal indices).
assert abs(corresponding_send["_matching_message"] - receive_index) <= 17

# With a few exceptions, messages send and receive on opposite processes
exceptions = [541, 549, 553, 1120, 1124, 1305, 1306, 1320, 1882, 1893, 1894]
if receive_index >= 360 and receive_index not in exceptions:
assert receive_process[i] != corresponding_send["Process"]


def test_match_events(data_dir, ping_pong_otf2_trace):
trace = Trace.from_otf2(str(ping_pong_otf2_trace))
trace._match_events()
Expand Down
145 changes: 145 additions & 0 deletions pipit/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,151 @@ def _match_events(self):

self.events = self.events.astype({"_matching_event": "Int32"})


def _match_charm_messages(self):

if "_matching_message" not in self.events.columns:
self.events["_matching_message"] = None

if "_matching_timestamp" not in self.events.columns:
self.events["_matching_message_timestamp"] = np.nan

matching_events = list(self.events["_matching_message"])
matching_times = list(self.events["_matching_message_timestamp"])

charm_events = self.events[self.events["Attributes"].apply(
lambda x: False if not x else ("Entry Type" in x) and (x["Entry Type"] == "Processing"
or x["Entry Type"] == "Create")
)]

df_indices = list(charm_events.index)
timestamps = list(charm_events["Timestamp (ns)"])
attrs = list(charm_events["Attributes"])

# queue is indexed by event ID and will store (dataframe index, timestamp) from
# message send events
queue = {}

# Iterate through send/receive events
for i in range(len(charm_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_event_type = attrs[i]["Entry Type"]
curr_event_id = attrs[i]["Event ID"]

if curr_event_type == "Create":
queue[curr_event_id] = (curr_df_index, curr_timestamp)
elif curr_event_type == "Processing" and curr_event_id in queue:
send_df_index = queue[curr_event_id][0]
send_timestamp = queue[curr_event_id][1]

matching_events[send_df_index] = curr_df_index
matching_events[curr_df_index] = send_df_index

matching_times[send_df_index] = curr_timestamp
matching_times[curr_df_index] = send_timestamp


self.events["_matching_message"] = matching_events
self.events["_matching_message_timestamp"] = matching_times

self.events = self.events.astype({"_matching_message": "Int32"})




def _match_messages(self):
"""
Matches corresponding MpiSend/MpiRecv and MpiIsend/MpiIrecv instant events.
Creates new columns _matching_message and _matching_message_timestamp.
"""
if "_matching_message" not in self.events.columns:
self.events["_matching_message"] = None

if "_matching_message_timestamp" not in self.events.columns:
self.events["_matching_message_timestamp"] = np.nan

matching_events = list(self.events["_matching_message"])
matching_times = list(self.events["_matching_message_timestamp"])

# Filter by send/receive events
send_events_names = ["MpiSend", "MpiISend"]

send_events = self.events[self.events["Name"].isin(send_events_names)]

receive_events_names = ["MpiRecv", "MpiIrecv"]

receive_events = self.events[self.events["Name"].isin(receive_events_names)]

# Queue is a dictionary in which each receiving process (key) will have
# another dictionary of sending processes (key) that will have a list of
# tuple that each contain information about an associated send event
queue: dict[int, dict[int, (int, int)]] = {}

df_indices = list(send_events.index)
timestamps = list(send_events["Timestamp (ns)"])
attrs = list(send_events["Attributes"])
processes = list(send_events["Process"])

# First iterate over send events, adding each event's information
# to the receiver's list in the dictionary
for i in range(len(send_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_attrs = attrs[i]
curr_process = processes[i]

# Add current dataframe index, timestamp, and process to stack
if "receiver" in curr_attrs:
receiving_process = curr_attrs["receiver"]

# Add receiving process to queue if not already present
if receiving_process not in queue:
queue[receiving_process] = {}
# Add sending process to receiving process's queue
# if not already present
if curr_process not in queue[receiving_process]:
queue[receiving_process][curr_process] = []

# Add current dataframe index and timestamp to the correct queue
queue[receiving_process][curr_process].append(
(curr_df_index, curr_timestamp)
)

df_indices = list(receive_events.index)
timestamps = list(receive_events["Timestamp (ns)"])
attrs = list(receive_events["Attributes"])
processes = list(receive_events["Process"])

# Now iterate over receive events
for i in range(len(receive_events)):
curr_df_index = df_indices[i]
curr_timestamp = timestamps[i]
curr_attrs = attrs[i]
curr_process = processes[i]

if "sender" in curr_attrs:
# send_process = None
send_process = curr_attrs["sender"]

if len(queue[curr_process][send_process]) > 0:
# Get the corresponding "send" event
send_df_index, send_timestamp = queue[curr_process][
send_process
].pop(0)

# Fill in the lists with the matching values
matching_events[send_df_index] = curr_df_index
matching_events[curr_df_index] = send_df_index

matching_times[send_df_index] = curr_timestamp
matching_times[curr_df_index] = send_timestamp

self.events["_matching_message"] = matching_events
self.events["_matching_message_timestamp"] = matching_times

self.events = self.events.astype({"_matching_message": "Int32"})

def _match_caller_callee(self):
"""Matches callers (parents) to callees (children) and adds three
columns to the dataframe:
Expand Down
Loading