Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove duplicate log message in output when using pre-allocation #981

Merged
merged 10 commits into from
Jun 21, 2023
5 changes: 2 additions & 3 deletions morpheus/pipeline/preallocator_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def _preallocate_multi(self, msg: MultiMessage) -> MultiMessage:
def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair:
(out_stream, out_type) = out_pair
pretty_type = pretty_print_type_name(out_type)
logger.info("Added source: {}\n └─> {}".format(str(self), pretty_type))

if len(self._needed_columns) > 0:
node_name = f"{self.unique_name}-preallocate"
Expand All @@ -111,8 +110,8 @@ def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> Stre
elif issubclass(out_type, (cudf.DataFrame, pd.DataFrame)):
stream = builder.make_node(node_name, ops.map(self._preallocate_df))
else:
msg = ("Additional columns were requested to be inserted into the Dataframe, but the output type {}"
" isn't a supported type".format(pretty_type))
msg = ("Additional columns were requested to be inserted into the Dataframe, but the output type "
f"{pretty_type} isn't a supported type")
raise RuntimeError(msg)

builder.make_edge(out_stream, stream)
Expand Down
23 changes: 5 additions & 18 deletions morpheus/stages/input/file_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""File source stage."""

import logging
import pathlib
import typing

import mrc
import typing_utils
from mrc.core import operators as ops

from morpheus.cli import register_stage
from morpheus.common import FileTypes
Expand Down Expand Up @@ -86,14 +85,16 @@ def __init__(self,

@property
def name(self) -> str:
"""Return the name of the stage"""
return "from-file"

@property
def input_count(self) -> int:
"""Return None for no max intput count"""
return self._input_count

def supports_cpp_node(self):
def supports_cpp_node(self) -> bool:
"""Indicates whether or not this stage supports a C++ node"""
return True

def _build_source(self, builder: mrc.Builder) -> StreamPair:
Expand All @@ -108,21 +109,7 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair:

return out_stream, out_type

def _post_build_single(self, builder: mrc.Builder, out_pair: StreamPair) -> StreamPair:

out_stream = out_pair[0]
out_type = out_pair[1]

# Convert our list of dataframes into the desired type. Flatten if necessary
if (typing_utils.issubtype(out_type, typing.List)):
flattened = builder.make_node(self.unique_name + "-post", ops.flatten())
builder.make_edge(out_stream, flattened)
out_stream = flattened
out_type = typing.get_args(out_type)[0]

return super()._post_build_single(builder, (out_stream, out_type))

def _generate_frames(self):
def _generate_frames(self) -> typing.Iterable[MessageMeta]:

df = read_file_to_df(
self._filename,
Expand Down