Skip to content

Commit

Permalink
Fix abp_pcap_detection example (#792)
Browse files Browse the repository at this point in the history
* Work-around for change in cudf introduced by rapidsai/cudf#10226
* Fix handling of indexes and offsets in `examples/abp_pcap_detection/abp_pcap_preprocessing.py`
* Cudf will throw a different exception on a series of strings than a list of strings

fixes #790

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #792
  • Loading branch information
dagardner-nv authored Mar 24, 2023
1 parent 9e4065f commit 0c91c91
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
20 changes: 8 additions & 12 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,15 @@ def supports_cpp_node(self):

@staticmethod
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str]) -> MultiInferenceFILMessage:
flags_bin_series = cudf.Series(x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b")))
# Converts the int flags field into a binary string
flags_bin_series = x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b"))

df = flags_bin_series.str.findall("[0-1]")

rename_cols_dct = {0: "ack", 1: "psh", 2: "rst", 3: "syn", 4: "fin"}
# Expand binary string into an array
df = cudf.DataFrame(np.vstack(flags_bin_series.str.findall("[0-1]")).astype("int8"), index=x.get_meta().index)

# adding [ack, psh, rst, syn, fin] details from the binary flag
for col in df.columns:
rename_col = rename_cols_dct[col]
df[rename_col] = df[col].astype("int8")

df = df.drop([0, 1, 2, 3, 4], axis=1)
rename_cols_dct = {0: "ack", 1: "psh", 2: "rst", 3: "syn", 4: "fin"}
df = df.rename(columns=rename_cols_dct)

df["flags_bin"] = flags_bin_series
df["timestamp"] = x.get_meta("timestamp").astype("int64")
Expand Down Expand Up @@ -173,13 +170,12 @@ def round_time_kernel(timestamp, rollup_time, secs):
req_cols = ["flow_id", "rollup_time"]

for col in req_cols:
# TODO: temporary work-around for Issue #286
x.meta.df[col] = merged_df[col].copy(True)
x.set_meta(col, merged_df[col])

del merged_df

seg_ids = cp.zeros((count, 3), dtype=cp.uint32)
seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32)
seg_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seg_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
Expand Down
2 changes: 1 addition & 1 deletion morpheus/messages/multi_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def set_meta(self, columns: typing.Union[None, str, typing.List[str]], value):
try:
# Now update the slice
df.iloc[row_indexer, column_indexer] = value
except ValueError:
except (ValueError, TypeError):
# Try this as a fallback. Works better for strings. See issue #286
df[columns].iloc[row_indexer] = value

Expand Down
9 changes: 8 additions & 1 deletion tests/test_multi_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,20 @@ def test_set_meta_new_column_dup_index(filter_probs_df: cudf.DataFrame, df_type:
test_set_meta_new_column(df, df_type)


def test_set_meta_issue_286(filter_probs_df: cudf.DataFrame):
@pytest.mark.use_cudf
@pytest.mark.parametrize('use_series', [True, False])
def test_set_meta_issue_286(filter_probs_df: cudf.DataFrame, use_series: bool):
"""
Explicitly calling set_meta on two different non-overlapping slices.
"""

meta = MessageMeta(filter_probs_df)
mm1 = MultiMessage(meta=meta, mess_offset=0, mess_count=5)
mm2 = MultiMessage(meta=meta, mess_offset=5, mess_count=5)

values = list(string.ascii_letters)
if use_series:
values = cudf.Series(values)

mm1.set_meta('letters', values[0:5])
mm2.set_meta('letters', values[5:10])
Expand Down

0 comments on commit 0c91c91

Please sign in to comment.