Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break circular reference issue causing a memory leak #1115

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0d1aeb9
wip
dagardner-nv Aug 1, 2023
2a6e65e
Don't hold a reference to the pipeline outside of the build method, e…
dagardner-nv Aug 1, 2023
07b2069
Test for issue #1114
dagardner-nv Aug 1, 2023
5b5e3f4
Rename log parsing test to not conflict with top-level test_pipe
dagardner-nv Aug 2, 2023
7745d98
Pylint fixes
dagardner-nv Aug 2, 2023
73318c7
Don't hold a ref to executor options:
dagardner-nv Aug 2, 2023
364da5a
Skip test_multi_segment_bad_data_type due to MRC issue https://github…
dagardner-nv Aug 2, 2023
f85dea5
lint fixes
dagardner-nv Aug 2, 2023
698b3e4
Merge branch 'branch-23.11' into david-destructors-1114
dagardner-nv Aug 4, 2023
de11367
Ensure that both _storage_dir and _storage_type are defined prior to…
dagardner-nv Aug 15, 2023
506d95a
wip
dagardner-nv Aug 14, 2023
da1929c
Revert "wip"
dagardner-nv Aug 15, 2023
2909695
Add comment explaining
dagardner-nv Aug 15, 2023
7394f23
Call PyGILState_Check first to see if we need to gil, borrowed from M…
dagardner-nv Aug 15, 2023
0f7500c
Add comment explaining the call to PyGILState_Check
dagardner-nv Aug 15, 2023
34c038f
Test for MRC 362
dagardner-nv Aug 17, 2023
8984754
Insert CR header, and add fixture for disabling garbage collection
dagardner-nv Aug 17, 2023
7f2a553
Revert "Add comment explaining the call to PyGILState_Check"
dagardner-nv Aug 17, 2023
f061115
Revert "Call PyGILState_Check first to see if we need to gil, borrowe…
dagardner-nv Aug 17, 2023
33cf054
Adopt patched pybind11
dagardner-nv Aug 18, 2023
5163bc9
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Aug 18, 2023
60d20e3
Lint fixes
dagardner-nv Aug 18, 2023
123d730
Merge branch 'branch-23.11' into david-destructors-1114
dagardner-nv Aug 22, 2023
5bd3ade
Merge branch 'branch-23.11' into david-destructors-1114
dagardner-nv Aug 24, 2023
07b284a
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Aug 24, 2023
8b95b65
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Aug 30, 2023
9bbc515
Cleanups per pr feedback
dagardner-nv Aug 30, 2023
e4f24ca
yapf formatting fixes
dagardner-nv Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 63 additions & 58 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ class Pipeline():

Parameters
----------
c : `morpheus.config.Config`
config : `morpheus.config.Config`
Pipeline configuration instance.

"""

def __init__(self, c: Config):
def __init__(self, config: Config):
self._source_count: int = None # Maximum number of iterations for progress reporting. None = Unknown/Unlimited

self._id_counter = 0
self._num_threads = config.num_threads

# Complete set of nodes across segments in this pipeline
self._stages: typing.Set[Stage] = set()
Expand All @@ -70,14 +71,10 @@ def __init__(self, c: Config):
# Dictionary containing segment information for this pipeline
self._segments: typing.Dict = defaultdict(lambda: {"nodes": set(), "ingress_ports": [], "egress_ports": []})

self._exec_options = mrc.Options()
self._exec_options.topology.user_cpuset = "0-{}".format(c.num_threads - 1)
self._exec_options.engine_factories.default_engine_type = mrc.core.options.EngineType.Thread

# Set the default channel size
mrc.Config.default_channel_size = c.edge_buffer_size
mrc.Config.default_channel_size = config.edge_buffer_size

self.batch_size = c.pipeline_batch_size
self.batch_size = config.pipeline_batch_size

self._segment_graphs = defaultdict(lambda: networkx.DiGraph())

Expand All @@ -86,7 +83,6 @@ def __init__(self, c: Config):
self._is_started = False

self._mrc_executor: mrc.Executor = None
self._mrc_pipeline: mrc.Pipeline = None

@property
def is_built(self) -> bool:
Expand Down Expand Up @@ -126,7 +122,7 @@ def add_stage(self, stage: StageT, segment_id: str = "main") -> StageT:
segment_nodes.add(stage)
self._sources.add(stage)
else:
raise NotImplementedError("add_stage() failed. Unknown node type: {}".format(type(stage)))
raise NotImplementedError(f"add_stage() failed. Unknown node type: {type(stage)}")

stage._pipeline = self

Expand Down Expand Up @@ -232,12 +228,16 @@ def build(self):

logger.info("====Registering Pipeline====")

self._mrc_executor = mrc.Executor(self._exec_options)
exec_options = mrc.Options()
exec_options.topology.user_cpuset = f"0-{self._num_threads - 1}"
exec_options.engine_factories.default_engine_type = mrc.core.options.EngineType.Thread

self._mrc_executor = mrc.Executor(exec_options)

self._mrc_pipeline = mrc.Pipeline()
mrc_pipeline = mrc.Pipeline()

def inner_build(builder: mrc.Builder, segment_id: str):
logger.info(f"====Building Segment: {segment_id}====")
logger.info("====Building Segment: %s ====", segment_id)
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
segment_graph = self._segment_graphs[segment_id]

# Check if preallocated columns are requested, this needs to happen before the source stages are built
Expand All @@ -256,7 +256,7 @@ def inner_build(builder: mrc.Builder, segment_id: str):
if (stage.can_build()):
stage.build(builder)

if (not all([x.is_built for x in segment_graph.nodes()])):
if (not all(x.is_built for x in segment_graph.nodes())):
logger.warning("Cyclic pipeline graph detected! Building with reduced constraints")

for stage in segment_graph.nodes():
Expand All @@ -275,22 +275,22 @@ def inner_build(builder: mrc.Builder, segment_id: str):
logger.info("====Building Segment Complete!====")

logger.info("====Building Pipeline====")
for segment_id in self._segments.keys():
segment_ingress_ports = self._segments[segment_id]["ingress_ports"]
segment_egress_ports = self._segments[segment_id]["egress_ports"]
for (segment_id, segment) in self._segments.items():
segment_ingress_ports = segment["ingress_ports"]
segment_egress_ports = segment["egress_ports"]
segment_inner_build = partial(inner_build, segment_id=segment_id)

self._mrc_pipeline.make_segment(segment_id, [port_info["port_pair"] for port_info in segment_ingress_ports],
[port_info["port_pair"] for port_info in segment_egress_ports],
segment_inner_build)
mrc_pipeline.make_segment(segment_id, [port_info["port_pair"] for port_info in segment_ingress_ports],
[port_info["port_pair"] for port_info in segment_egress_ports],
segment_inner_build)

logger.info("====Building Pipeline Complete!====")
self._is_build_complete = True

# Finally call _on_start
self._on_start()

self._mrc_executor.register_pipeline(self._mrc_pipeline)
self._mrc_executor.register_pipeline(mrc_pipeline)

self._is_built = True

Expand All @@ -311,12 +311,13 @@ def stop(self):
"""

logger.info("====Stopping Pipeline====")
for s in list(self._sources) + list(self._stages):
s.stop()
for src in list(self._sources) + list(self._stages):
src.stop()

self._mrc_executor.stop()

logger.info("====Pipeline Stopped====")
self._on_stop()

async def join(self):
"""
Expand All @@ -330,21 +331,26 @@ async def join(self):
raise
finally:
# Make sure these are always shut down even if there was an error
for s in list(self._sources):
s.stop()
for src in list(self._sources):
src.stop()

# First wait for all sources to stop. This only occurs after all messages have been processed fully
for s in list(self._sources):
await s.join()
for src in list(self._sources):
await src.join()

# Now that there is no more data, call stop on all stages to ensure shutdown (i.e., for stages that have
# their own worker loop thread)
for s in list(self._stages):
s.stop()
for stage in list(self._stages):
stage.stop()

# Now call join on all stages
for s in list(self._stages):
await s.join()
for stage in list(self._stages):
await stage.join()

self._on_stop()

def _on_stop(self):
self._mrc_executor = None

async def _build_and_start(self):

Expand All @@ -362,8 +368,8 @@ async def _build_and_start(self):
async def _async_start(self):

# Loop over all stages and call on_start if it exists
for s in self._stages:
await s.start_async()
for stage in self._stages:
await stage.start_async()

def _on_start(self):

Expand All @@ -374,11 +380,11 @@ def _on_start(self):
# Stop from running this twice
self._is_started = True

logger.debug("Starting! Time: {}".format(time.time()))
logger.debug("Starting! Time: %s", time.time())

# Loop over all stages and call on_start if it exists
for s in self._stages:
s.on_start()
for stage in self._stages:
stage.on_start()

def visualize(self, filename: str = None, **graph_kwargs):
"""
Expand Down Expand Up @@ -414,11 +420,11 @@ def visualize(self, filename: str = None, **graph_kwargs):
start_def_port = ":e" if is_lr else ":s"
end_def_port = ":w" if is_lr else ":n"

def has_ports(n: StreamWrapper, is_input):
def has_ports(node: StreamWrapper, is_input):
if (is_input):
return len(n.input_ports) > 0
else:
return len(n.output_ports) > 0
return len(node.input_ports) > 0

return len(node.output_ports) > 0

if not self._is_build_complete:
raise RuntimeError("Pipeline.visualize() requires that the Pipeline has been started before generating "
Expand All @@ -427,31 +433,31 @@ def has_ports(n: StreamWrapper, is_input):
"be fixed in a future release.")

# Now build up the nodes
for idx, segment_id in enumerate(self._segments):
for segment_id in self._segments:
gv_subgraphs[segment_id] = graphviz.Digraph(f"cluster_{segment_id}")
gv_subgraph = gv_subgraphs[segment_id]
gv_subgraph.attr(label=segment_id)
for n, attrs in typing.cast(typing.Mapping[StreamWrapper, dict],
self._segment_graphs[segment_id].nodes).items():
for node, attrs in typing.cast(typing.Mapping[StreamWrapper, dict],
self._segment_graphs[segment_id].nodes).items():
node_attrs = attrs.copy()

label = ""

show_in_ports = has_ports(n, is_input=True)
show_out_ports = has_ports(n, is_input=False)
show_in_ports = has_ports(node, is_input=True)
show_out_ports = has_ports(node, is_input=False)

# Build the ports for the node. Only show ports if there are any
# (Would like to have this not show for one port, but the lines get all messed up)
if (show_in_ports):
in_port_label = " {{ {} }} | ".format(" | ".join(
[f"<u{x.port_number}> input_port: {x.port_number}" for x in n.input_ports]))
in_port_label = " {{ {} }} | ".format(" | ".join( # pylint: disable=consider-using-f-string
[f"<u{x.port_number}> input_port: {x.port_number}" for x in node.input_ports]))
label += in_port_label

label += n.unique_name
label += node.unique_name

if (show_out_ports):
out_port_label = " | {{ {} }}".format(" | ".join(
[f"<d{x.port_number}> output_port: {x.port_number}" for x in n.output_ports]))
out_port_label = " | {{ {} }}".format(" | ".join( # pylint: disable=consider-using-f-string
[f"<d{x.port_number}> output_port: {x.port_number}" for x in node.output_ports]))
label += out_port_label

if (show_in_ports or show_out_ports):
Expand All @@ -462,9 +468,8 @@ def has_ports(n: StreamWrapper, is_input):
"shape": "record",
"fillcolor": "white",
})
# TODO: Eventually allow nodes to have different attributes based on type
# node_attrs.update(n.get_graphviz_attrs())
gv_subgraph.node(n.unique_name, **node_attrs)

gv_subgraph.node(node.unique_name, **node_attrs)

# Build up edges
for segment_id in self._segments:
Expand Down Expand Up @@ -522,7 +527,7 @@ def has_ports(n: StreamWrapper, is_input):
style="dashed",
label=f"Segment Port: {egress_port['port_pair'][0]}")

for key, gv_subgraph in gv_subgraphs.items():
for gv_subgraph in gv_subgraphs.values():
gv_graph.subgraph(gv_subgraph)

file_format = os.path.splitext(filename)[-1].replace(".", "")
Expand All @@ -544,7 +549,7 @@ async def run_async(self):

def error_handler(_, context: dict):

msg = "Unhandled exception in async loop! Exception: \n{}".format(context["message"])
msg = f"Unhandled exception in async loop! Exception: \n{context['message']}"
exception = context.get("exception", Exception())

logger.critical(msg, exc_info=exception)
Expand All @@ -564,10 +569,10 @@ def term_signal():
self.stop()
else:
tqdm.write("Killing")
exit(1)
exit(1) # pylint: disable=consider-using-sys-exit

for s in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(s, term_signal)
for sig in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(sig, term_signal)

try:
await self._build_and_start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _run_mocked_pipeline(config: Config, dataset_cudf: DatasetManager, import_mo
mock_infer_result = mock.MagicMock()
mock_infer_result.as_numpy.side_effect = inf_results

def async_infer(callback=None, **k):
def async_infer(callback=None, **_):
callback(mock_infer_result, None)

mock_triton_client.async_infer.side_effect = async_infer
Expand Down
1 change: 1 addition & 0 deletions tests/test_multi_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def test_linear_boundary_stages(config, filter_probs_df):
assert_results(comp_stage.get_results())


@pytest.mark.skip(reason="Skipping due to MRC issue #360")
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.use_cudf
def test_multi_segment_bad_data_type(config, filter_probs_df):
with pytest.raises(RuntimeError):
Expand Down
83 changes: 83 additions & 0 deletions tests/test_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import typing

import pytest

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.utils.type_aliases import DataFrameType


class SourceTestStage(InMemorySourceStage):

def __init__(self,
config,
dataframes: typing.List[DataFrameType],
state_dict: dict,
repeat: int = 1,
state_key: str = "source"):
super().__init__(config, dataframes, repeat)
self._state_dict = state_dict
self._state_key = state_key

@property
def name(self) -> str:
return "test-source"

def __del__(self):
self._state_dict[self._state_key] = True
self._state_dict = None


class SinkTestStage(InMemorySinkStage):

def __init__(self, config, state_dict: dict, state_key: str = "sink"):
super().__init__(config)
self._state_dict = state_dict
self._state_key = state_key

@property
def name(self) -> str:
return "test-sink"

def __del__(self):
self._state_dict[self._state_key] = True
self._state_dict = None
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved


def _run_pipeline(config: Config, filter_probs_df: DataFrameType, state_dict: dict):
pipe = LinearPipeline(config)
pipe.set_source(SourceTestStage(config, [filter_probs_df], state_dict=state_dict))
pipe.add_stage(SinkTestStage(config, state_dict=state_dict))
pipe.run()


@pytest.mark.use_cudf
def test_destructors_called(config: Config, filter_probs_df: DataFrameType):
"""
Test to ensure that the destructors of stages are called (issue #1114).
"""
state_dict = {"source": False, "sink": False}
_run_pipeline(config, filter_probs_df, state_dict)

gc.collect()
assert state_dict["source"]
assert state_dict["sink"]