diff --git a/morpheus/stages/postprocess/generate_viz_frames_stage.py b/morpheus/stages/postprocess/generate_viz_frames_stage.py index e96c6de853..d80ae5dd13 100644 --- a/morpheus/stages/postprocess/generate_viz_frames_stage.py +++ b/morpheus/stages/postprocess/generate_viz_frames_stage.py @@ -69,6 +69,11 @@ def __init__(self, c: Config, server_url: str = "0.0.0.0", server_port: int = 87 self._replay_buffer = [] + # Properties set on start + self._loop: asyncio.AbstractEventLoop = None + self._server_task: asyncio.Task = None + self._server_close_event: asyncio.Event = None + @property def name(self) -> str: return "gen_viz" @@ -89,13 +94,13 @@ def supports_cpp_node(self): return False @staticmethod - def round_to_sec(x): + def round_to_sec(x: int | float): """ Round to even seconds second Parameters ---------- - x : int/float + x : int | float Rounding up the value Returns @@ -126,7 +131,7 @@ def _to_vis_df(self, x: MultiResponseMessage): def indent_data(y: str): try: return json.dumps(json.loads(y), indent=3) - except: # noqa: E722 + except Exception: return y df["data"] = df["data"].apply(indent_data) @@ -146,7 +151,7 @@ def indent_data(y: str): df["ts_round_sec"] = (df["timestamp"] / 1000.0).astype(int) * 1000 # Return a list of tuples of (ts_round_sec, dataframe) - return [(key, group) for key, group in df.groupby(df.ts_round_sec)] + return list(df.groupby(df.ts_round_sec)) def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]): @@ -159,28 +164,27 @@ def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]): offset = (curr_timestamp - self._first_timestamp) / 1000 - fn = os.path.join(self._out_dir, "{}.csv".format(offset)) + out_file = os.path.join(self._out_dir, f"{offset}.csv") - assert not os.path.exists(fn) + assert not os.path.exists(out_file) - in_df.to_csv(fn, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"]) + in_df.to_csv(out_file, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"]) async def start_async(self): """ Launch the Websocket server and asynchronously send messages via Websocket. """ - loop = asyncio.get_event_loop() - self._loop = loop + self._loop = asyncio.get_event_loop() - self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2, loop=loop) + self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2) async def client_connected(websocket: websockets.legacy.server.WebSocketServerProtocol): """ Establishes a connection with the WebSocket server. """ - logger.info("Got connection from: {}:{}".format(*websocket.remote_address)) + logger.info("Got connection from: %s:%s", *websocket.remote_address) while True: try: @@ -191,7 +195,7 @@ async def client_connected(websocket: websockets.legacy.server.WebSocketServerPr except Exception as ex: logger.exception("Error occurred trying to send message over socket", exc_info=ex) - logger.info("Disconnected from: {}:{}".format(*websocket.remote_address)) + logger.info("Disconnected from: %s:%s", *websocket.remote_address) async def run_server(): """ @@ -205,20 +209,20 @@ async def run_server(): listening_on = [":".join([str(y) for y in x.getsockname()]) for x in server.sockets] listening_on_str = [f"'{x}'" for x in listening_on] - logger.info("Websocket server listening at: {}".format(", ".join(listening_on_str))) + logger.info("Websocket server listening at: %s", ", ".join(listening_on_str)) await self._server_close_event.wait() logger.info("Server shut down") - logger.info("Server shut down. Is queue empty: {}".format(self._buffer_queue.empty())) + logger.info("Server shut down. Is queue empty: %s", self._buffer_queue.empty()) except Exception as e: logger.error("Error during serve", exc_info=e) raise - self._server_task = loop.create_task(run_server()) + self._server_task = self._loop.create_task(run_server()) - self._server_close_event = asyncio.Event(loop=loop) + self._server_close_event = asyncio.Event() await asyncio.sleep(1.0) @@ -235,24 +239,24 @@ async def _stop_server(self): # Wait for it to await self._server_task - def _build_single(self, seg: mrc.Builder, input_stream: StreamPair) -> StreamPair: + def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: stream = input_stream[0] - def node_fn(input, output): + def node_fn(input_obs, output_obs): def write_batch(x: MultiResponseMessage): sink = pa.BufferOutputStream() # This is the timestamp of the earliest message - t0 = x.get_meta("timestamp").min() + time0 = x.get_meta("timestamp").min() df = x.get_meta(["timestamp", "src_ip", "dest_ip", "secret_keys", "data"]) out_df = cudf.DataFrame() - out_df["dt"] = (df["timestamp"] - t0).astype(np.int32) + out_df["dt"] = (df["timestamp"] - time0).astype(np.int32) out_df["src"] = df["src_ip"].str.ip_to_int().astype(np.int32) out_df["dst"] = df["dest_ip"].str.ip_to_int().astype(np.int32) out_df["lvl"] = df["secret_keys"].astype(np.int32) @@ -268,7 +272,7 @@ def write_batch(x: MultiResponseMessage): # Enqueue the buffer and block until that completes asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result() - input.pipe(ops.map(write_batch)).subscribe(output) + input_obs.pipe(ops.map(write_batch)).subscribe(output_obs) logger.info("Gen-viz stage completed. Waiting for shutdown") @@ -280,8 +284,8 @@ def write_batch(x: MultiResponseMessage): logger.info("Gen-viz shutdown complete") # Sink to file - to_file = seg.make_node(self.unique_name, ops.build(node_fn)) - seg.make_edge(stream, to_file) + to_file = builder.make_node(self.unique_name, ops.build(node_fn)) + builder.make_edge(stream, to_file) stream = to_file # Return input unchanged to allow passthrough diff --git a/morpheus/utils/producer_consumer_queue.py b/morpheus/utils/producer_consumer_queue.py index 7363f4648b..3516d8420d 100644 --- a/morpheus/utils/producer_consumer_queue.py +++ b/morpheus/utils/producer_consumer_queue.py @@ -138,10 +138,10 @@ class AsyncIOProducerConsumerQueue(asyncio.Queue, typing.Generic[_T]): Custom queue.Queue implementation which supports closing and uses recursive locks """ - def __init__(self, maxsize=0, *, loop=None) -> None: - super().__init__(maxsize=maxsize, loop=loop) + def __init__(self, maxsize=0) -> None: + super().__init__(maxsize=maxsize) - self._closed = asyncio.Event(loop=loop) + self._closed = asyncio.Event() self._is_closed = False async def join(self): @@ -166,7 +166,7 @@ async def put(self, item): slot is available before adding item. """ while self.full() and not self._is_closed: - putter = self._loop.create_future() + putter = self._get_loop().create_future() self._putters.append(putter) try: await putter @@ -196,7 +196,7 @@ async def get(self) -> _T: If queue is empty, wait until an item is available. """ while self.empty() and not self._is_closed: - getter = self._loop.create_future() + getter = self._get_loop().create_future() self._getters.append(getter) try: await getter