Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

426 bug msg keyerror data #428

Merged
14 commits merged into from
Nov 9, 2022
Merged
19 changes: 14 additions & 5 deletions examples/log_parsing/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class PreprocessLogParsingStage(PreprocessBaseStage):
overflowing token-ids can contain duplicated token-ids from the main sequence. If max_length is equal to stride
there are no duplicated-id tokens. If stride is 80% of max_length, 20% of the first sequence will be repeated on
the second sequence and so on until the entire sentence is encoded.
column : str, default = "raw"
Name of the column containing the data that needs to be preprocessed.

"""

Expand All @@ -67,7 +69,8 @@ def __init__(self,
truncation: bool = False,
do_lower_case: bool = False,
add_special_tokens: bool = False,
stride: int = -1):
stride: int = -1,
column: str = "raw"):
super().__init__(c)

self._seq_length = c.feature_length
Expand All @@ -81,6 +84,7 @@ def __init__(self,
# Use the given value
self._stride = stride

self._column = column
self._truncation = truncation
self._do_lower_case = do_lower_case
self._add_special_tokens = add_special_tokens
Expand All @@ -99,7 +103,8 @@ def pre_process_batch(x: MultiMessage,
seq_len: int,
stride: int,
truncation: bool,
add_special_tokens: bool) -> MultiInferenceNLPMessage:
add_special_tokens: bool,
column: str) -> MultiInferenceNLPMessage:
"""
For NLP category usecases, this function performs pre-processing.

Expand All @@ -125,6 +130,8 @@ def pre_process_batch(x: MultiMessage,
If set to true, strings will be truncated and padded to max_length. Each input string will result in exactly
one output sequence. If set to false, there may be multiple output sequences when the max_length is smaller
than generated tokens.
column : str
Name of the column containing the data that needs to be preprocessed.

Returns
-------
Expand All @@ -133,7 +140,7 @@ def pre_process_batch(x: MultiMessage,

"""

text_ser = cudf.Series(x.get_meta("raw"))
text_ser = cudf.Series(x.get_meta(column))

for symbol in string.punctuation:
text_ser = text_ser.str.replace(symbol, ' ' + symbol + ' ')
Expand Down Expand Up @@ -170,7 +177,8 @@ def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMe
stride=self._stride,
seq_len=self._seq_length,
truncation=self._truncation,
add_special_tokens=self._add_special_tokens)
add_special_tokens=self._add_special_tokens,
column=self._column)

def _get_preprocess_node(self, builder: srf.Builder):
return _stages.PreprocessNLPStage(builder,
Expand All @@ -180,4 +188,5 @@ def _get_preprocess_node(self, builder: srf.Builder):
self._truncation,
self._do_lower_case,
self._add_special_tokens,
self._stride)
self._stride,
self._column)
7 changes: 5 additions & 2 deletions morpheus/_lib/include/morpheus/stages/preprocess_nlp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class PreprocessNLPStage
bool truncation,
bool do_lower_case,
bool add_special_token,
int stride = -1);
int stride = -1,
std::string column = "data");

private:
/**
Expand All @@ -63,6 +64,7 @@ class PreprocessNLPStage
subscribe_fn_t build_operator();

std::string m_vocab_hash_file;
std::string m_column;
uint32_t m_sequence_length;
bool m_truncation;
bool m_do_lower_case;
Expand All @@ -86,7 +88,8 @@ struct PreprocessNLPStageInterfaceProxy
bool truncation,
bool do_lower_case,
bool add_special_token,
int stride = -1);
int stride = -1,
std::string column = "data");
};
#pragma GCC visibility pop
} // namespace morpheus
3 changes: 2 additions & 1 deletion morpheus/_lib/src/python_modules/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ PYBIND11_MODULE(stages, m)
py::arg("truncation"),
py::arg("do_lower_case"),
py::arg("add_special_token"),
py::arg("stride"));
py::arg("stride"),
py::arg("column"));

py::class_<srf::segment::Object<SerializeStage>,
srf::segment::ObjectProperties,
Expand Down
13 changes: 8 additions & 5 deletions morpheus/_lib/src/stages/preprocess_nlp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ PreprocessNLPStage::PreprocessNLPStage(std::string vocab_hash_file,
bool truncation,
bool do_lower_case,
bool add_special_token,
int stride) :
int stride,
std::string column) :
PythonNode(base_t::op_factory_from_sub_fn(build_operator())),
m_vocab_hash_file(std::move(vocab_hash_file)),
m_sequence_length(sequence_length),
m_truncation(truncation),
m_do_lower_case(do_lower_case),
m_add_special_token(add_special_token),
m_stride(stride)
m_stride(stride),
m_column(std::move(column))
{}

PreprocessNLPStage::subscribe_fn_t PreprocessNLPStage::build_operator()
Expand All @@ -74,7 +76,7 @@ PreprocessNLPStage::subscribe_fn_t PreprocessNLPStage::build_operator()
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output, stride](sink_type_t x) {
// Convert to string view
auto string_col = cudf::strings_column_view{x->get_meta("data").get_column(0)};
auto string_col = cudf::strings_column_view{x->get_meta(this->m_column).get_column(0)};

// Create the hashed vocab
thread_local std::unique_ptr<nvtext::hashed_vocabulary> vocab =
Expand Down Expand Up @@ -144,10 +146,11 @@ std::shared_ptr<srf::segment::Object<PreprocessNLPStage>> PreprocessNLPStageInte
bool truncation,
bool do_lower_case,
bool add_special_token,
int stride)
int stride,
std::string column)
{
auto stage = builder.construct_object<PreprocessNLPStage>(
name, vocab_hash_file, sequence_length, truncation, do_lower_case, add_special_token, stride);
name, vocab_hash_file, sequence_length, truncation, do_lower_case, add_special_token, stride, column);

return stage;
}
Expand Down
19 changes: 14 additions & 5 deletions morpheus/stages/preprocess/preprocess_nlp_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class PreprocessNLPStage(PreprocessBaseStage):
overflowing token-ids can contain duplicated token-ids from the main sequence. If max_length is equal to stride
there are no duplicated-id tokens. If stride is 80% of max_length, 20% of the first sequence will be repeated on
the second sequence and so on until the entire sentence is encoded.
column : str
Name of the column containing the data that needs to be preprocessed.
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved

"""

Expand All @@ -72,9 +74,11 @@ def __init__(self,
truncation: bool = False,
do_lower_case: bool = False,
add_special_tokens: bool = False,
stride: int = -1):
stride: int = -1,
column: str = "data"):
super().__init__(c)

self._column = column
self._seq_length = c.feature_length
self._vocab_hash_file = vocab_hash_file

Expand Down Expand Up @@ -106,7 +110,8 @@ def pre_process_batch(x: MultiMessage,
seq_len: int,
stride: int,
truncation: bool,
add_special_tokens: bool) -> MultiInferenceNLPMessage:
add_special_tokens: bool,
column: str) -> MultiInferenceNLPMessage:
"""
For NLP category usecases, this function performs pre-processing.

Expand Down Expand Up @@ -134,14 +139,16 @@ def pre_process_batch(x: MultiMessage,
than generated tokens.
add_special_tokens : bool
Whether or not to encode the sequences with the special tokens of the BERT classification model.
column : str
Name of the column containing the data that needs to be preprocessed.

Returns
-------
`morpheus.pipeline.messages.MultiInferenceNLPMessage`
NLP inference message.

"""
text_ser = cudf.Series(x.get_meta("data"))
text_ser = cudf.Series(x.get_meta(column))

tokenized = tokenize_text_series(vocab_hash_file=vocab_hash_file,
do_lower_case=do_lower_case,
Expand Down Expand Up @@ -175,7 +182,8 @@ def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMe
stride=self._stride,
seq_len=self._seq_length,
truncation=self._truncation,
add_special_tokens=self._add_special_tokens)
add_special_tokens=self._add_special_tokens,
column=self._column)

def _get_preprocess_node(self, builder: srf.Builder):
return _stages.PreprocessNLPStage(builder,
Expand All @@ -185,4 +193,5 @@ def _get_preprocess_node(self, builder: srf.Builder):
self._truncation,
self._do_lower_case,
self._add_special_tokens,
self._stride)
self._stride,
self._column)
36 changes: 32 additions & 4 deletions morpheus/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,11 @@ def _configure_from_log_level(log_level: int):
# Default config with level
logging.captureWarnings(True)

# Set the SRF logging level to match
srf.logging.set_level(log_level)

# Get the root Morpheus logger
morpheus_logger = logging.getLogger("morpheus")
morpheus_logger.setLevel(log_level)

# Set the level here
set_log_level(log_level=log_level)

# Dont propagate upstream
morpheus_logger.propagate = False
Expand Down Expand Up @@ -175,6 +174,35 @@ def configure_logging(log_level: int, log_config_file: str = None):
_configure_from_log_level(log_level=log_level)


def set_log_level(log_level: int):
"""
Set the Morpheus logging level. Also propagates the value to SRF's logging system to keep the logging levels in sync

Parameters
----------
log_level : int
One of the levels from the `logging` module. i.e. `logging.DEBUG`, `logging.INFO`, `logging.WARN`,
`logging.ERROR`, etc.

Returns
-------
int
The previously set logging level
"""

# Get the old level and return it in case the user wants that
old_level = srf.logging.get_level()

# Set the SRF logging level to match
srf.logging.set_level(log_level)

# Get the root Morpheus logger
morpheus_logger = logging.getLogger("morpheus")
morpheus_logger.setLevel(log_level)

return old_level


def deprecated_stage_warning(logger, cls, name):
logger.warning(("The '%s' stage ('%s') is no longer required to manage backpressure and has been deprecated. "
"It has no effect and acts as a pass through stage."),
Expand Down
75 changes: 75 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,78 @@ def launch_mock_triton():

else:
yield


@pytest.fixture(scope="session", autouse=True)
def configure_tests_logging():
"""
Sets the base logging settings for the entire test suite to ensure logs are generated. Automatically detects if a
debugger is attached and lowers the logging level to DEBUG.
"""
import sys

from morpheus.utils.logger import configure_logging

log_level = logging.WARN

# Check if a debugger is attached. If so, choose DEBUG for the logging level. Otherwise, only WARN
trace_func = sys.gettrace()

if (trace_func is not None):
trace_module = getattr(trace_func, "__module__", None)

if (trace_module is not None and trace_module.find("pydevd") != -1):
log_level = logging.DEBUG

configure_logging(log_level=log_level)


def _wrap_set_log_level(log_level: int):
from morpheus.utils.logger import set_log_level

# Save the previous logging level
old_level = set_log_level(log_level)

yield

set_log_level(old_level)


@pytest.fixture(scope="function")
def loglevel_debug():
"""
Sets the logging level to `logging.DEBUG` for this function only.
"""
_wrap_set_log_level(logging.DEBUG)


@pytest.fixture(scope="function")
def loglevel_info():
"""
Sets the logging level to `logging.INFO` for this function only.
"""
_wrap_set_log_level(logging.INFO)


@pytest.fixture(scope="function")
def loglevel_warn():
"""
Sets the logging level to `logging.WARN` for this function only.
"""
_wrap_set_log_level(logging.WARN)


@pytest.fixture(scope="function")
def loglevel_error():
"""
Sets the logging level to `logging.ERROR` for this function only.
"""
_wrap_set_log_level(logging.ERROR)


@pytest.fixture(scope="function")
def loglevel_fatal():
"""
Sets the logging level to `logging.FATAL` for this function only.
"""
_wrap_set_log_level(logging.FATAL)
Loading