Skip to content

Commit

Permalink
Remove loop from several asyncio API calls (nv-morpheus#1033)
Browse files Browse the repository at this point in the history
Due to changes in `asyncio` in Python 3.10, we needed to remove the `loop` parameter from several API calls. See [here](https://blog.teclado.com/changes-to-async-event-loops-in-python-3-10/) for more info.

Fixes nv-morpheus#1031

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Eli Fajardo (https://github.com/efajardo-nv)

URL: nv-morpheus#1033
  • Loading branch information
mdemoret-nv authored Jul 11, 2023
1 parent 1d1b330 commit 25a9afc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
50 changes: 27 additions & 23 deletions morpheus/stages/postprocess/generate_viz_frames_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def __init__(self, c: Config, server_url: str = "0.0.0.0", server_port: int = 87

self._replay_buffer = []

# Properties set on start
self._loop: asyncio.AbstractEventLoop = None
self._server_task: asyncio.Task = None
self._server_close_event: asyncio.Event = None

@property
def name(self) -> str:
return "gen_viz"
Expand All @@ -89,13 +94,13 @@ def supports_cpp_node(self):
return False

@staticmethod
def round_to_sec(x):
def round_to_sec(x: int | float):
"""
Round to even seconds second
Parameters
----------
x : int/float
x : int | float
Rounding up the value
Returns
Expand Down Expand Up @@ -126,7 +131,7 @@ def _to_vis_df(self, x: MultiResponseMessage):
def indent_data(y: str):
try:
return json.dumps(json.loads(y), indent=3)
except: # noqa: E722
except Exception:
return y

df["data"] = df["data"].apply(indent_data)
Expand All @@ -146,7 +151,7 @@ def indent_data(y: str):
df["ts_round_sec"] = (df["timestamp"] / 1000.0).astype(int) * 1000

# Return a list of tuples of (ts_round_sec, dataframe)
return [(key, group) for key, group in df.groupby(df.ts_round_sec)]
return list(df.groupby(df.ts_round_sec))

def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]):

Expand All @@ -159,28 +164,27 @@ def _write_viz_file(self, x: typing.List[typing.Tuple[int, pd.DataFrame]]):

offset = (curr_timestamp - self._first_timestamp) / 1000

fn = os.path.join(self._out_dir, "{}.csv".format(offset))
out_file = os.path.join(self._out_dir, f"{offset}.csv")

assert not os.path.exists(fn)
assert not os.path.exists(out_file)

in_df.to_csv(fn, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"])
in_df.to_csv(out_file, columns=["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "si", "data"])

async def start_async(self):
"""
Launch the Websocket server and asynchronously send messages via Websocket.
"""

loop = asyncio.get_event_loop()
self._loop = loop
self._loop = asyncio.get_event_loop()

self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2, loop=loop)
self._buffer_queue = AsyncIOProducerConsumerQueue(maxsize=2)

async def client_connected(websocket: websockets.legacy.server.WebSocketServerProtocol):
"""
Establishes a connection with the WebSocket server.
"""

logger.info("Got connection from: {}:{}".format(*websocket.remote_address))
logger.info("Got connection from: %s:%s", *websocket.remote_address)

while True:
try:
Expand All @@ -191,7 +195,7 @@ async def client_connected(websocket: websockets.legacy.server.WebSocketServerPr
except Exception as ex:
logger.exception("Error occurred trying to send message over socket", exc_info=ex)

logger.info("Disconnected from: {}:{}".format(*websocket.remote_address))
logger.info("Disconnected from: %s:%s", *websocket.remote_address)

async def run_server():
"""
Expand All @@ -205,20 +209,20 @@ async def run_server():
listening_on = [":".join([str(y) for y in x.getsockname()]) for x in server.sockets]
listening_on_str = [f"'{x}'" for x in listening_on]

logger.info("Websocket server listening at: {}".format(", ".join(listening_on_str)))
logger.info("Websocket server listening at: %s", ", ".join(listening_on_str))

await self._server_close_event.wait()

logger.info("Server shut down")

logger.info("Server shut down. Is queue empty: {}".format(self._buffer_queue.empty()))
logger.info("Server shut down. Is queue empty: %s", self._buffer_queue.empty())
except Exception as e:
logger.error("Error during serve", exc_info=e)
raise

self._server_task = loop.create_task(run_server())
self._server_task = self._loop.create_task(run_server())

self._server_close_event = asyncio.Event(loop=loop)
self._server_close_event = asyncio.Event()

await asyncio.sleep(1.0)

Expand All @@ -235,24 +239,24 @@ async def _stop_server(self):
# Wait for it to
await self._server_task

def _build_single(self, seg: mrc.Builder, input_stream: StreamPair) -> StreamPair:
def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

stream = input_stream[0]

def node_fn(input, output):
def node_fn(input_obs, output_obs):

def write_batch(x: MultiResponseMessage):

sink = pa.BufferOutputStream()

# This is the timestamp of the earliest message
t0 = x.get_meta("timestamp").min()
time0 = x.get_meta("timestamp").min()

df = x.get_meta(["timestamp", "src_ip", "dest_ip", "secret_keys", "data"])

out_df = cudf.DataFrame()

out_df["dt"] = (df["timestamp"] - t0).astype(np.int32)
out_df["dt"] = (df["timestamp"] - time0).astype(np.int32)
out_df["src"] = df["src_ip"].str.ip_to_int().astype(np.int32)
out_df["dst"] = df["dest_ip"].str.ip_to_int().astype(np.int32)
out_df["lvl"] = df["secret_keys"].astype(np.int32)
Expand All @@ -268,7 +272,7 @@ def write_batch(x: MultiResponseMessage):
# Enqueue the buffer and block until that completes
asyncio.run_coroutine_threadsafe(self._buffer_queue.put(out_buf), loop=self._loop).result()

input.pipe(ops.map(write_batch)).subscribe(output)
input_obs.pipe(ops.map(write_batch)).subscribe(output_obs)

logger.info("Gen-viz stage completed. Waiting for shutdown")

Expand All @@ -280,8 +284,8 @@ def write_batch(x: MultiResponseMessage):
logger.info("Gen-viz shutdown complete")

# Sink to file
to_file = seg.make_node(self.unique_name, ops.build(node_fn))
seg.make_edge(stream, to_file)
to_file = builder.make_node(self.unique_name, ops.build(node_fn))
builder.make_edge(stream, to_file)
stream = to_file

# Return input unchanged to allow passthrough
Expand Down
10 changes: 5 additions & 5 deletions morpheus/utils/producer_consumer_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ class AsyncIOProducerConsumerQueue(asyncio.Queue, typing.Generic[_T]):
Custom queue.Queue implementation which supports closing and uses recursive locks
"""

def __init__(self, maxsize=0, *, loop=None) -> None:
super().__init__(maxsize=maxsize, loop=loop)
def __init__(self, maxsize=0) -> None:
super().__init__(maxsize=maxsize)

self._closed = asyncio.Event(loop=loop)
self._closed = asyncio.Event()
self._is_closed = False

async def join(self):
Expand All @@ -166,7 +166,7 @@ async def put(self, item):
slot is available before adding item.
"""
while self.full() and not self._is_closed:
putter = self._loop.create_future()
putter = self._get_loop().create_future()
self._putters.append(putter)
try:
await putter
Expand Down Expand Up @@ -196,7 +196,7 @@ async def get(self) -> _T:
If queue is empty, wait until an item is available.
"""
while self.empty() and not self._is_closed:
getter = self._loop.create_future()
getter = self._get_loop().create_future()
self._getters.append(getter)
try:
await getter
Expand Down

0 comments on commit 25a9afc

Please sign in to comment.