Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix abp_pcap_detection example #792

Merged
20 changes: 8 additions & 12 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,15 @@ def supports_cpp_node(self):

@staticmethod
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str]) -> MultiInferenceFILMessage:
flags_bin_series = cudf.Series(x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b")))
# Converts the int flags field into a binary string
flags_bin_series = x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b"))

df = flags_bin_series.str.findall("[0-1]")

rename_cols_dct = {0: "ack", 1: "psh", 2: "rst", 3: "syn", 4: "fin"}
# Expand binary string into an array
df = cudf.DataFrame(np.vstack(flags_bin_series.str.findall("[0-1]")).astype("int8"), index=x.get_meta().index)

# adding [ack, psh, rst, syn, fin] details from the binary flag
for col in df.columns:
rename_col = rename_cols_dct[col]
df[rename_col] = df[col].astype("int8")

df = df.drop([0, 1, 2, 3, 4], axis=1)
rename_cols_dct = {0: "ack", 1: "psh", 2: "rst", 3: "syn", 4: "fin"}
df = df.rename(columns=rename_cols_dct)

df["flags_bin"] = flags_bin_series
df["timestamp"] = x.get_meta("timestamp").astype("int64")
Expand Down Expand Up @@ -173,13 +170,12 @@ def round_time_kernel(timestamp, rollup_time, secs):
req_cols = ["flow_id", "rollup_time"]

for col in req_cols:
# TODO: temporary work-around for Issue #286
x.meta.df[col] = merged_df[col].copy(True)
x.set_meta(col, merged_df[col])

del merged_df

seg_ids = cp.zeros((count, 3), dtype=cp.uint32)
seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32)
seg_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seg_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
Expand Down
2 changes: 1 addition & 1 deletion morpheus/messages/multi_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def set_meta(self, columns: typing.Union[None, str, typing.List[str]], value):
try:
# Now update the slice
df.iloc[row_indexer, column_indexer] = value
except ValueError:
except (ValueError, TypeError):
# Try this as a fallback. Works better for strings. See issue #286
df[columns].iloc[row_indexer] = value

Expand Down
9 changes: 8 additions & 1 deletion tests/test_multi_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,20 @@ def test_set_meta_new_column_dup_index(filter_probs_df: cudf.DataFrame, df_type:
test_set_meta_new_column(df, df_type)


def test_set_meta_issue_286(filter_probs_df: cudf.DataFrame):
@pytest.mark.use_cudf
@pytest.mark.parametrize('use_series', [True, False])
def test_set_meta_issue_286(filter_probs_df: cudf.DataFrame, use_series: bool):
"""
Explicitly calling set_meta on two different non-overlapping slices.
"""

meta = MessageMeta(filter_probs_df)
mm1 = MultiMessage(meta=meta, mess_offset=0, mess_count=5)
mm2 = MultiMessage(meta=meta, mess_offset=5, mess_count=5)

values = list(string.ascii_letters)
if use_series:
values = cudf.Series(values)

mm1.set_meta('letters', values[0:5])
mm2.set_meta('letters', values[5:10])
Expand Down