diff --git a/morpheus/utils/column_info.py b/morpheus/utils/column_info.py index 8cc369e704..1182932b46 100644 --- a/morpheus/utils/column_info.py +++ b/morpheus/utils/column_info.py @@ -22,12 +22,14 @@ import cudf -logger = logging.getLogger("morpheus.{}".format(__name__)) +logger = logging.getLogger(f"morpheus.{__name__}") + +DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00' # TODO(Devin): Proxying this for backwards compatibility. Had to move the primary definition to avoid circular imports. def process_dataframe(df_in: typing.Union[pd.DataFrame, cudf.DataFrame], input_schema) -> pd.DataFrame: - import morpheus.utils.schema_transforms as schema_transforms + from morpheus.utils import schema_transforms return schema_transforms.process_dataframe(df_in, input_schema) @@ -37,7 +39,6 @@ def create_increment_col(df, column_name: str, groupby_column="username", timest timestamp values in `timestamp_column` and then grouping by `groupby_column` returning incrementing values starting at `1`. """ - DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00' # Ensure we are pandas for this if (isinstance(df, cudf.DataFrame)): @@ -63,8 +64,8 @@ def column_listjoin(df, col_name: str) -> pd.Series: """ if col_name in df: return df[col_name].transform(lambda x: ",".join(x)).astype('string') - else: - return pd.Series(None, dtype='string') + + return pd.Series(None, dtype='string') @dataclasses.dataclass @@ -78,11 +79,11 @@ def get_pandas_dtype(self) -> str: if ((isinstance(self.dtype, str) and self.dtype.startswith("datetime")) or (isinstance(self.dtype, type) and issubclass(self.dtype, datetime))): return "datetime64[ns]" - else: - if (isinstance(self.dtype, str)): - return self.dtype - else: - return self.dtype.__name__ + + if (isinstance(self.dtype, str)): + return self.dtype + + return self.dtype.__name__ def _process_column(self, df: pd.DataFrame) -> pd.Series: """ @@ -226,7 +227,7 @@ def __post_init__(self): # Compile the regex if (input_preserve_columns is not None and len(input_preserve_columns) > 0): - input_preserve_columns = re.compile("({})".format("|".join(input_preserve_columns))) + input_preserve_columns = re.compile(f"({'|'.join(input_preserve_columns)})") else: input_preserve_columns = None diff --git a/morpheus/utils/nvt/mutate.py b/morpheus/utils/nvt/mutate.py index b36f806fea..bf3e96ab2e 100644 --- a/morpheus/utils/nvt/mutate.py +++ b/morpheus/utils/nvt/mutate.py @@ -48,25 +48,26 @@ def label(self): name = self._func.__name__.split(".")[-1] if name != "": return f"MutateOp: {name}" - else: - try: - # otherwise get the lambda source code from the inspect module if possible - source = getsourcelines(self.f)[0][0] - lambdas = [op.strip() for op in source.split(">>") if "lambda " in op] - if len(lambdas) == 1 and lambdas[0].count("lambda") == 1: - return lambdas[0] - except Exception: # pylint: disable=broad-except - # we can fail to load the source in distributed environments. Since the - # label is mainly used for diagnostics, don't worry about the error here and - # fallback to the default labelling - pass + + try: + # otherwise get the lambda source code from the inspect module if possible + source = getsourcelines(self.f)[0][0] + lambdas = [op.strip() for op in source.split(">>") if "lambda " in op] + if len(lambdas) == 1 and lambdas[0].count("lambda") == 1: + return lambdas[0] + except Exception: # pylint: disable=broad-except + # we can fail to load the source in distributed environments. Since the + # label is mainly used for diagnostics, don't worry about the error here and + # fallback to the default labelling + pass # Failed to figure out the source return "MutateOp" + # pylint: disable=arguments-renamed @annotate("MutateOp", color="darkgreen", domain="nvt_python") - def transform(self, column_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: - return self._func(column_selector, df) + def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: + return self._func(col_selector, df) def column_mapping( self, diff --git a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py index 7ee7cbdb5d..54031fac8a 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py +++ b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py @@ -18,11 +18,12 @@ import os from unittest import mock -import cudf import fsspec import pandas as pd import pytest +import cudf + from morpheus.common import FileTypes from morpheus.config import Config from morpheus.pipeline.preallocator_mixin import PreallocatorMixin @@ -47,8 +48,9 @@ def single_file_obj(): def test_single_object_to_dataframe(single_file_obj: fsspec.core.OpenFile): from dfp.stages.dfp_file_to_df import _single_object_to_dataframe - schema = DataFrameInputSchema( - column_info=[CustomColumn(name='data', dtype=str, process_column_fn=lambda df: df['data'].to_arrow().to_pylist()[0])]) + schema = DataFrameInputSchema(column_info=[ + CustomColumn(name='data', dtype=str, process_column_fn=lambda df: df['data'].to_arrow().to_pylist()[0]) + ]) df = _single_object_to_dataframe(single_file_obj, schema, FileTypes.Auto, False, {}) assert df.columns == ['data'] @@ -56,10 +58,9 @@ def test_single_object_to_dataframe(single_file_obj: fsspec.core.OpenFile): d = json.load(fh) expected_data = d['data'] - aslist = [x.tolist() for x in df['data'].to_list()] # to_list returns a list of numpy arrays - - assert(aslist == expected_data) + aslist = [x.tolist() for x in df['data'].to_list()] # to_list returns a list of numpy arrays + assert (aslist == expected_data) def test_single_object_to_dataframe_timeout():