Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly calculate the offset of the column_view & apply offsets in copy_meta_ranges #423

Merged
19 commits merged into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Concatinate the input rather that repeating it, this forces the
deserialize stage to slice up the input and trigger the bug
  • Loading branch information
dagardner-nv committed Oct 31, 2022
commit ac3fda4a4c341c48c31465ea29e69f76ae177948
38 changes: 32 additions & 6 deletions tests/test_add_scores_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
# limitations under the License.

import os
import time

import numpy as np
import pandas as pd
import pytest

from morpheus._lib.file_types import FileTypes
from morpheus.io.deserializers import read_file_to_df
from morpheus.io.serializers import df_to_csv
from morpheus.messages import MessageMeta
from morpheus.messages import MultiMessage
from morpheus.messages import MultiResponseProbsMessage
Expand All @@ -31,31 +35,53 @@
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from utils import TEST_DIRS
from utils import ConvMsg
from utils import get_column_names_from_file


def _extend_data(input_file, output_file, repeat_count):
df = read_file_to_df(input_file, FileTypes.Auto, df_type='pandas')
data = pd.concat([df for _ in range(repeat_count)])
with open(output_file, 'w') as fh:
output_strs = df_to_csv(data, include_header=True, include_index_col=False)
# Remove any trailing whitespace
if (len(output_strs[-1].strip()) == 0):
output_strs = output_strs[:-1]
fh.writelines(output_strs)


@pytest.mark.slow
@pytest.mark.parametrize('order', ['F', 'C'])
@pytest.mark.parametrize('pipeline_batch_size', [32, 256, 1024])
@pytest.mark.parametrize('pipeline_batch_size', [256, 1024, 2048])
@pytest.mark.parametrize('repeat', [1, 10, 100])
def test_add_scores_stage_pipe(config, tmp_path, order, pipeline_batch_size, repeat):
config.class_labels = ['frogs', 'lizards', 'toads', 'turtles']
config.pipeline_batch_size = pipeline_batch_size
config.num_threads = 1

input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
src_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")
out_file = os.path.join(tmp_path, 'results.csv')

input_cols = get_column_names_from_file(src_file)
if repeat > 1:
input_file = os.path.join(tmp_path, 'input.csv')
_extend_data(src_file, input_file, repeat)
else:
input_file = src_file

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False, repeat=repeat))
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(ConvMsg(config, order=order))
pipe.add_stage(ConvMsg(config, order=order, columns=input_cols))
pipe.add_stage(AddScoresStage(config))
pipe.add_stage(SerializeStage(config, include=["^{}$".format(c) for c in config.class_labels]))
pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False))
pipe.run()

# There seems to be some sort of race between the sync to the output file when cpp=True and repeat=100
time.sleep(1)
assert os.path.exists(out_file)

expected = np.concatenate([np.loadtxt(input_file, delimiter=",", skiprows=1) for _ in range(repeat)])
expected = np.loadtxt(input_file, delimiter=",", skiprows=1)

# The output data will contain an additional id column that we will need to slice off
# also somehow 0.7 ends up being 0.7000000000000001
Expand All @@ -79,7 +105,7 @@ def test_add_scores_stage_multi_segment_pipe(config, tmp_path):
pipe.add_segment_boundary(MessageMeta)
pipe.add_stage(DeserializeStage(config))
pipe.add_segment_boundary(MultiMessage)
pipe.add_stage(ConvMsg(config))
pipe.add_stage(ConvMsg(config, columns=get_column_names_from_file(input_file)))
pipe.add_segment_boundary(MultiResponseProbsMessage)
pipe.add_stage(AddScoresStage(config))
pipe.add_segment_boundary(MultiResponseProbsMessage)
Expand Down
11 changes: 9 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import json
import os
import time
import traceback

import cupy as cp
import srf
Expand Down Expand Up @@ -61,9 +62,10 @@ class ConvMsg(SinglePortStage):
Setting `order` specifies probs to be in either column or row major
"""

def __init__(self, c: Config, expected_data_file: str = None, order: str = 'K'):
def __init__(self, c: Config, expected_data_file: str = None, columns=None, order: str = 'K'):
super().__init__(c)
self._expected_data_file = expected_data_file
self._columns = columns
self._order = order

@property
Expand All @@ -80,7 +82,7 @@ def _conv_message(self, m):
if self._expected_data_file is not None:
df = read_file_to_df(self._expected_data_file, FileTypes.CSV, df_type="cudf")
else:
df = m.get_meta()
df = m.get_meta(self._columns)

probs = cp.array(df.values, copy=True, order=self._order)
memory = ResponseMemoryProbs(count=len(probs), probs=probs)
Expand Down Expand Up @@ -155,3 +157,8 @@ def compare_class_to_scores(file_name, field_names, class_prefix, score_prefix,
above_thresh.to_csv(f"/tmp/score_field_{field_name}.csv")

assert all(above_thresh == df[class_field]), f"Mismatch on {field_name}"


def get_column_names_from_file(file_name):
df = read_file_to_df(file_name, file_type=FileTypes.Auto, df_type='pandas')
return list(df.columns)