Skip to content

Commit

Permalink
Remove duplicate log message in output when using pre-allocation (#981)
Browse files Browse the repository at this point in the history
* Fixes #963 where source stages are printed twice in log output
* Removes unused post-build method in `FileSourceStage` that was no longer being used (unrelated but I noticed it).
* Misc pylint/flake8 fixes

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #981
  • Loading branch information
dagardner-nv authored Jun 21, 2023
1 parent e5af353 commit 43939dc
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 21 deletions.
5 changes: 2 additions & 3 deletions morpheus/pipeline/preallocator_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def _preallocate_multi(self, msg: MultiMessage) -> MultiMessage:
def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair:
(out_stream, out_type) = out_pair
pretty_type = pretty_print_type_name(out_type)
logger.info("Added source: {}\n └─> {}".format(str(self), pretty_type))

if len(self._needed_columns) > 0:
node_name = f"{self.unique_name}-preallocate"
Expand All @@ -111,8 +110,8 @@ def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> Stre
elif issubclass(out_type, (cudf.DataFrame, pd.DataFrame)):
stream = builder.make_node(node_name, ops.map(self._preallocate_df))
else:
msg = ("Additional columns were requested to be inserted into the Dataframe, but the output type {}"
" isn't a supported type".format(pretty_type))
msg = ("Additional columns were requested to be inserted into the Dataframe, but the output type "
f"{pretty_type} isn't a supported type")
raise RuntimeError(msg)

builder.make_edge(out_stream, stream)
Expand Down
23 changes: 5 additions & 18 deletions morpheus/stages/input/file_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""File source stage."""

import logging
import pathlib
import typing

import mrc
import typing_utils
from mrc.core import operators as ops

from morpheus.cli import register_stage
from morpheus.common import FileTypes
Expand Down Expand Up @@ -86,14 +85,16 @@ def __init__(self,

@property
def name(self) -> str:
"""Return the name of the stage"""
return "from-file"

@property
def input_count(self) -> int:
"""Return None for no max intput count"""
return self._input_count

def supports_cpp_node(self):
def supports_cpp_node(self) -> bool:
"""Indicates whether or not this stage supports a C++ node"""
return True

def _build_source(self, builder: mrc.Builder) -> StreamPair:
Expand All @@ -108,21 +109,7 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair:

return out_stream, out_type

def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair:

out_stream = out_pair[0]
out_type = out_pair[1]

# Convert our list of dataframes into the desired type. Flatten if necessary
if (typing_utils.issubtype(out_type, typing.List)):
flattened = builder.make_node(self.unique_name + "-post", ops.flatten())
builder.make_edge(out_stream, flattened)
out_stream = flattened
out_type = typing.get_args(out_type)[0]

return super()._post_build_single(builder, (out_stream, out_type))

def _generate_frames(self):
def _generate_frames(self) -> typing.Iterable[MessageMeta]:

df = read_file_to_df(
self._filename,
Expand Down

0 comments on commit 43939dc

Please sign in to comment.