Skip to content

Commit

Permalink
Fix or silence warnings emitted during tests (#1501)
Browse files Browse the repository at this point in the history
* Reduce the number of warnings emitted while running tests
* Replace usage of deprecated methods/attribues where possible
* When no other option is avail, silence known warnings.
* Remove out-of-date warning filters

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1501
  • Loading branch information
dagardner-nv authored Feb 12, 2024
1 parent 1f9a7bc commit ddd10d1
Show file tree
Hide file tree
Showing 27 changed files with 188 additions and 88 deletions.
4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/module/content_extractor_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pandas as pd
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import BaseModel
from pydantic import BaseModel # pylint: disable=no-name-in-module
from pydantic import Field
from pydantic import ValidationError
from pydantic import validator
Expand Down Expand Up @@ -197,7 +197,7 @@ def _csv_to_text_converter(input_info: ConverterInputInfo) -> list[str]:
raise ValueError("The CSV file must either include a 'content' column or have a "
"columns specified in the meta configuration with key 'text_column_names'.")
df.fillna(value='', inplace=True)
text_arr = df[text_column_names].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist()
text_arr = df[sorted(text_column_names)].apply(lambda x: ' '.join(map(str, x)), axis=1).tolist()
return text_arr


Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def run():
)
@click.option(
"--model_max_batch_size",
default=64,
default=256,
type=click.IntRange(min=1),
help="Max batch size to use for the model",
)
Expand Down
4 changes: 2 additions & 2 deletions morpheus/_lib/src/objects/table_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <pybind11/stl.h> // IWYU pragma: keep

#include <algorithm> // for find, transform
#include <array> // needed for pybind11::make_tuple
#include <cstddef> // for size_t
#include <iterator> // for back_insert_iterator, back_inserter
#include <memory>
Expand Down Expand Up @@ -306,7 +305,8 @@ std::optional<std::string> MutableTableInfo::ensure_sliceable_index()
auto df_index = py_df.attr("index");

// Check to see if we actually need the change
if (df_index.attr("is_unique").cast<bool>() && df_index.attr("is_monotonic").cast<bool>())
if (df_index.attr("is_unique").cast<bool>() && (df_index.attr("is_monotonic_increasing").cast<bool>() ||
df_index.attr("is_monotonic_decreasing").cast<bool>()))
{
// Set the outputname to nullopt
old_index_col_name = std::nullopt;
Expand Down
2 changes: 1 addition & 1 deletion morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def _read_file_content(self, file_path: str) -> str:

def _try_parse_feed_with_beautiful_soup(self, feed_input: str) -> "feedparser.FeedParserDict":

soup = BeautifulSoup(feed_input, 'lxml')
soup = BeautifulSoup(feed_input, features='xml')

# Verify whether the given feed has 'item' or 'entry' tags.
if soup.find('item'):
Expand Down
2 changes: 1 addition & 1 deletion morpheus/service/vdb/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) ->
df = df.to_pandas()

# Loop over all of the columns of the first row and build the schema
for col_name, col_val in df.iloc[0].iteritems():
for col_name, col_val in df.iloc[0].items():

field_dict = {
"name": col_name,
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/general/monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def accepted_types(self) -> typing.Tuple:
def supports_cpp_node(self):
return False

def on_start(self):
async def start_async(self):
"""
Starts the pipeline stage's progress bar.
"""
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/output/http_server_sink_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def supports_cpp_node(self):
"""Indicates whether or not this stage supports a C++ node."""
return False

def on_start(self):
async def start_async(self):
"""Starts the HTTP server."""
from morpheus.common import HttpServer
self._server = HttpServer(parse_fn=self._request_handler,
Expand Down
9 changes: 8 additions & 1 deletion morpheus/utils/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,14 @@ def _process_column(self, df: pd.DataFrame) -> pd.Series:
The processed column as a datetime Series.
"""

return pd.to_datetime(df[self.input_name], infer_datetime_format=True, utc=True).astype(self.get_pandas_dtype())
dt_series = pd.to_datetime(df[self.input_name], infer_datetime_format=True, utc=True)

dtype = self.get_pandas_dtype()
if dtype == 'datetime64[ns]':
# avoid deprecation warning about using .astype to convert from a tz-aware type to a tz-naive type
return dt_series.dt.tz_localize(None)

return dt_series.astype(dtype)


@dataclasses.dataclass
Expand Down
9 changes: 8 additions & 1 deletion morpheus/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import threading
import typing
import warnings
from enum import Enum

import fsspec
Expand Down Expand Up @@ -131,7 +132,13 @@ def get_dask_client(self):
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})

if (self._merlin_distributed is None):
self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster()))
with warnings.catch_warnings():
# Merlin.Distributed will warn if a client already exists, the client in question is the one created
# and are explicitly passing to it in the constructor.
warnings.filterwarnings("ignore",
message="Existing Dask-client object detected in the current context.*",
category=UserWarning)
self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster()))

return self._merlin_distributed

Expand Down
12 changes: 7 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ markers = [
]

filterwarnings = [
# Warning coming from mlflow's usage of numpy
'ignore:`np.object` is a deprecated alias for the builtin `object`. To silence this warning, use `object` by itself. Doing this will not modify any behavior and is safe',
# Ignore our own warning about the df property since we still have to test it
'ignore:Warning the df property returns a copy, please use the copy_dataframe method or the mutable_dataframe context manager to modify the DataFrame in-place instead.',
'ignore:`np.MachAr` is deprecated \(NumPy 1.22\):DeprecationWarning',
'ignore:Please use `spmatrix` from the `scipy.sparse` namespace, the `scipy.sparse.base` namespace is deprecated:DeprecationWarning',

# Deprecation warning from any project using distutils, currently known sources of this are:
# GPUtils https://github.com/anderskm/gputil/issues/48
# PySpark https://issues.apache.org/jira/browse/SPARK-45390
# PySpark https://issues.apache.org/jira/browse/SPARK-45390 & https://issues.apache.org/jira/browse/SPARK-38660
'ignore:The distutils package is deprecated and slated for removal in Python 3.12. Use setuptools or check PEP 632 for potential alternatives',
'ignore:distutils Version classes are deprecated. Use packaging.version instead.',

# Ignore cudf warnings about Pandas being used under the hood for processing json
'ignore:Using CPU via Pandas to write JSON dataset',
'ignore:Using CPU via Pandas to read JSON dataset',
]

testpaths = ["tests"]
Expand Down
6 changes: 5 additions & 1 deletion tests/_utils/dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import random
import typing
import warnings

import cupy as cp
import pandas as pd
Expand Down Expand Up @@ -235,7 +236,10 @@ def compare_df(cls,
dfb: typing.Union[pd.DataFrame, cdf.DataFrame],
**compare_args):
"""Wrapper for `morpheus.utils.compare_df.compare_df`."""
return compare_df.compare_df(cls._value_as_pandas(dfa), cls._value_as_pandas(dfb), **compare_args)
with warnings.catch_warnings():
# Ignore performance warnings from pandas triggered by the comparison
warnings.filterwarnings("ignore", category=pd.errors.PerformanceWarning)
return compare_df.compare_df(cls._value_as_pandas(dfa), cls._value_as_pandas(dfb), **compare_args)

@classmethod
def assert_compare_df(cls,
Expand Down
14 changes: 10 additions & 4 deletions tests/_utils/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import subprocess
import time
import typing
import warnings
from collections import namedtuple
from functools import partial

Expand Down Expand Up @@ -73,10 +74,15 @@ def seek_to_beginning(kafka_consumer: "KafkaConsumer", timeout: int = PARTITION_

@pytest.fixture(name='kafka_consumer', scope='function')
def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"):
_kafka_consumer.subscribe([kafka_topics.output_topic])
seek_to_beginning(_kafka_consumer)

yield _kafka_consumer
with warnings.catch_warnings():
# Ignore warnings specific to the test fixture and not the actual morpheus code
warnings.filterwarnings("ignore",
message=r"Exception ignored in:.*ConsumerCoordinator\.__del__",
category=pytest.PytestUnraisableExceptionWarning)
_kafka_consumer.subscribe([kafka_topics.output_topic])
seek_to_beginning(_kafka_consumer)

yield _kafka_consumer


def _init_pytest_kafka() -> (bool, Exception):
Expand Down
6 changes: 5 additions & 1 deletion tests/dfencoder/test_autoencoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import os
import typing
import warnings
from unittest.mock import patch

import numpy as np
Expand Down Expand Up @@ -493,7 +494,10 @@ def test_auto_encoder_num_only_convergence(train_ae: autoencoder.AutoEncoder):
'num_feat_2': [3.5, 3.0, 3.2, 3.1, 3.6, 3.9, 3.4, 3.4, 2.9, 3.1],
})

train_ae.fit(num_df, epochs=50)
with warnings.catch_warnings():
# Ignore warning regarding tensorflow not being installed
warnings.filterwarnings("ignore", message="Initializing zero-element tensors is a no-op", category=UserWarning)
train_ae.fit(num_df, epochs=50)

avg_loss = np.sum([np.array(loss[1])
for loss in train_ae.logger.train_fts.values()], axis=0) / len(train_ae.logger.train_fts)
Expand Down
38 changes: 22 additions & 16 deletions tests/dfencoder/test_scalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings

import numpy as np
import pytest
import torch
Expand All @@ -25,35 +27,41 @@
# pylint: disable=redefined-outer-name


@pytest.fixture(scope="function")
def fit_tensor():
@pytest.fixture(name="fit_tensor", scope="function")
def fit_tensor_fixture():
yield torch.tensor([4.4, 5.3, 6.5], dtype=torch.float32)


@pytest.fixture(scope="function")
def tensor():
@pytest.fixture(name="tensor", scope="function")
def tensor_fixture():
yield torch.tensor([7.4, 8.3, 9.5], dtype=torch.float32)


@pytest.fixture(scope="function")
def standard_scaler(fit_tensor):
@pytest.fixture(name="standard_scaler", scope="function")
def standard_scaler_fixture(fit_tensor):
scaler = scalers.StandardScaler()
scaler.fit(fit_tensor)
yield scaler


@pytest.fixture(scope="function")
def modified_scaler(fit_tensor):
@pytest.fixture(name="modified_scaler", scope="function")
def modified_scaler_fixture(fit_tensor):
scaler = scalers.ModifiedScaler()
scaler.fit(fit_tensor)
yield scaler


@pytest.fixture(scope="function")
def gauss_rank_scaler(fit_tensor):
@pytest.fixture(name="gauss_rank_scaler", scope="function")
def gauss_rank_scaler_fixture(fit_tensor):
scaler = scalers.GaussRankScaler()
scaler.fit(fit_tensor)
yield scaler

with warnings.catch_warnings():
# This warning is triggered by the abnormally small tensor size used in this test
warnings.filterwarnings("ignore",
message=r"n_quantiles \(1000\) is greater than the total number of samples \(3\).*",
category=UserWarning)
scaler.fit(fit_tensor)
yield scaler


def test_ensure_float_type():
Expand Down Expand Up @@ -111,8 +119,7 @@ def test_modified_scaler_transform(modified_scaler, tensor):
assert torch.equal(torch.round(results, decimals=2), expected), f"{results} != {expected}"

# Test alternate path where median absolute deviation is 1
test = torch.tensor([3.0, 4.0, 4.0, 5.0])
modified_scaler.fit(test)
modified_scaler.fit(torch.tensor([3.0, 4.0, 4.0, 5.0]))
results = modified_scaler.transform(tensor)
expected = torch.tensor([5.43, 6.86, 8.78])
assert torch.equal(torch.round(results, decimals=2), expected), f"{results} != {expected}"
Expand All @@ -124,8 +131,7 @@ def test_modified_scaler_inverse_transform(modified_scaler, tensor):
assert torch.equal(torch.round(results, decimals=2), expected), f"{results} != {expected}"

# Test alternate path where median absolute deviation is 1
test = torch.tensor([3.0, 4.0, 4.0, 5.0])
modified_scaler.fit(test)
modified_scaler.fit(torch.tensor([3.0, 4.0, 4.0, 5.0]))
results = modified_scaler.inverse_transform(tensor)
expected = torch.tensor([8.64, 9.2, 9.95])
assert torch.equal(torch.round(results, decimals=2), expected), f"{results} != {expected}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_process_events_on_data(mock_datetime: mock.MagicMock,

# post-process should replace nans, lets add a nan to the DF
with dfp_multi_ae_message.meta.mutable_dataframe() as df:
df['v2'][10] = np.nan
df.loc[10, 'v2'] = np.nan
df['event_time'] = ''

set_log_level(morpheus_log_level)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import os
import typing
import warnings

import pytest

Expand Down Expand Up @@ -99,7 +100,12 @@ def test_extract_users(config: Config,
skip_users=skip_users,
only_users=only_users)

results = stage.extract_users(df)
with warnings.catch_warnings():
# Ignore warning about the log message not being set. This happens whenever there aren't any output_messages
warnings.filterwarnings("ignore",
message="Must set log msg before end of context! Skipping log",
category=UserWarning)
results = stage.extract_users(df)

if not include_generic and not include_individual:
# Extra check for weird combination
Expand Down
6 changes: 3 additions & 3 deletions tests/examples/llm/common/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ def import_content_extractor_module(restore_sys_path): # pylint: disable=unused
return content_extractor_module


@pytest.fixture(name="nemollm", autouse=True, scope='session')
def nemollm_fixture(fail_missing: bool):
@pytest.fixture(name="langchain", autouse=True, scope='session')
def langchain_fixture(fail_missing: bool):
"""
All the tests in this subdir require nemollm
All the tests in this subdir require langchain
"""

skip_reason = ("Tests for the WebScraperStage require the langchain package to be installed, to install this run:\n"
Expand Down
6 changes: 3 additions & 3 deletions tests/examples/ransomware_detection/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ def test_merge_curr_and_prev_snapshots(self, config: Config, rwd_conf: dict, dat
}

expected_df = dataset_pandas['examples/ransomware_detection/dask_results.csv'].fillna('')
expected_df['pid_process'][1] = 'test_val1'
expected_df['pid_process'][3] = 'test_val2'
expected_df.loc[1, 'pid_process'] = 'test_val1'
expected_df.loc[3, 'pid_process'] = 'test_val2'

expected_df['snapshot_id'] = snapshot_ids
expected_df.loc[:, 'snapshot_id'] = snapshot_ids
expected_df.index = expected_df.snapshot_id

stage._merge_curr_and_prev_snapshots(df, source_pid_process)
Expand Down
23 changes: 23 additions & 0 deletions tests/llm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import typing
from unittest import mock

import pytest

from _utils import require_env_variable
Expand Down Expand Up @@ -94,3 +98,22 @@ def serpapi_api_key_fixture():
yield require_env_variable(
varname="SERPAPI_API_KEY",
reason="serpapi integration tests require the `SERPAPI_API_KEY` environment variable to be defined.")


@pytest.mark.usefixtures("nemollm")
@pytest.fixture(name="mock_nemollm")
def mock_nemollm_fixture(mock_nemollm: mock.MagicMock):

# The generate function is a blocking call that returns a future when return_type="async"
async def mock_task(fut: asyncio.Future, value: typing.Any = mock.DEFAULT):
fut.set_result(value)

def create_future(*args, **kwargs) -> asyncio.Future: # pylint: disable=unused-argument
event_loop = asyncio.get_event_loop()
fut = event_loop.create_future()
event_loop.create_task(mock_task(fut, mock.DEFAULT))
return fut

mock_nemollm.generate.side_effect = create_future

yield mock_nemollm
Loading

0 comments on commit ddd10d1

Please sign in to comment.