diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 7921fb430a..3482cc463b 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -94,7 +94,7 @@ jobs: # Disable conda upload for now, once we have morpheus packages in conda forge set the value to # !fromJSON(needs.prepare.outputs.is_pr) && (fromJSON(needs.prepare.outputs.is_main_branch) && 'main' || 'dev') || '' conda_upload_label: "" - container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-240214 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-240214 + container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-240221 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-240221 secrets: NGC_API_KEY: ${{ secrets.NGC_API_KEY }} diff --git a/ci/scripts/bootstrap_local_ci.sh b/ci/scripts/bootstrap_local_ci.sh index 2e59d9f1ec..3051b13af1 100755 --- a/ci/scripts/bootstrap_local_ci.sh +++ b/ci/scripts/bootstrap_local_ci.sh @@ -16,11 +16,16 @@ export WORKSPACE_TMP="$(pwd)/ws_tmp" mkdir -p ${WORKSPACE_TMP} -git clone ${GIT_URL} Morpheus -cd Morpheus/ -git checkout ${GIT_BRANCH} -git pull -git checkout ${GIT_COMMIT} + +if [[ "${USE_HOST_GIT}" == "1" ]]; then + cd Morpheus/ +else + git clone ${GIT_URL} Morpheus + cd Morpheus/ + git checkout ${GIT_BRANCH} + git pull + git checkout ${GIT_COMMIT} +fi export MORPHEUS_ROOT=$(pwd) export WORKSPACE=${MORPHEUS_ROOT} diff --git a/ci/scripts/run_ci_local.sh b/ci/scripts/run_ci_local.sh index 1575555ad3..fb29fdf139 100755 --- a/ci/scripts/run_ci_local.sh +++ b/ci/scripts/run_ci_local.sh @@ -41,6 +41,10 @@ function git_ssh_to_https() MORPHEUS_ROOT=${MORPHEUS_ROOT:-$(git rev-parse --show-toplevel)} +# Specifies whether to mount the current git repo (to allow changes to be persisted) or to use a clean clone (to closely +# match CI, the default) +USE_HOST_GIT=${USE_HOST_GIT:-0} + GIT_URL=$(git remote get-url origin) GIT_URL=$(git_ssh_to_https ${GIT_URL}) @@ -51,7 +55,7 @@ GIT_BRANCH=$(git branch --show-current) GIT_COMMIT=$(git log -n 1 --pretty=format:%H) LOCAL_CI_TMP=${LOCAL_CI_TMP:-${MORPHEUS_ROOT}/.tmp/local_ci_tmp} -CONTAINER_VER=${CONTAINER_VER:-240214} +CONTAINER_VER=${CONTAINER_VER:-240221} CUDA_VER=${CUDA_VER:-12.1} DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""} @@ -66,6 +70,7 @@ ENV_LIST="${ENV_LIST} --env GIT_COMMIT=${GIT_COMMIT}" ENV_LIST="${ENV_LIST} --env PARALLEL_LEVEL=$(nproc)" ENV_LIST="${ENV_LIST} --env CUDA_VER=${CUDA_VER}" ENV_LIST="${ENV_LIST} --env SKIP_CONDA_ENV_UPDATE=${SKIP_CONDA_ENV_UPDATE}" +ENV_LIST="${ENV_LIST} --env USE_HOST_GIT=${USE_HOST_GIT}" mkdir -p ${LOCAL_CI_TMP} cp ${MORPHEUS_ROOT}/ci/scripts/bootstrap_local_ci.sh ${LOCAL_CI_TMP} @@ -82,6 +87,10 @@ for STAGE in "${STAGES[@]}"; do DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS} --runtime=runc" fi + if [[ "${USE_HOST_GIT}" == "1" ]]; then + DOCKER_RUN_ARGS="${DOCKER_RUN_ARGS} -v ${MORPHEUS_ROOT}:/Morpheus" + fi + if [[ "${STAGE}" == "bash" ]]; then DOCKER_RUN_CMD="bash --init-file /ci_tmp/bootstrap_local_ci.sh" else @@ -89,7 +98,9 @@ for STAGE in "${STAGES[@]}"; do fi echo "Running ${STAGE} stage in ${CONTAINER}" + set -x docker run ${DOCKER_RUN_ARGS} ${DOCKER_EXTRA_ARGS} ${CONTAINER} ${DOCKER_RUN_CMD} + set +x STATUS=$? if [[ ${STATUS} -ne 0 ]]; then diff --git a/conda/environments/all_cuda-121_arch-x86_64.yaml b/conda/environments/all_cuda-121_arch-x86_64.yaml index 5c3329da82..28bdaa6fd9 100644 --- a/conda/environments/all_cuda-121_arch-x86_64.yaml +++ b/conda/environments/all_cuda-121_arch-x86_64.yaml @@ -54,7 +54,6 @@ dependencies: - isort - jsonpatch>=1.33 - kfp -- langchain=0.0.190 - librdkafka>=1.9.2,<1.10.0a0 - libtool - libwebp=1.3.2 @@ -67,10 +66,11 @@ dependencies: - ninja=1.11 - nlohmann_json=3.9 - nodejs=18.* +- numexpr - numpydoc=1.5 - nvtabular=23.08.00 - onnx -- openai=0.28 +- openai=1.13 - papermill=2.4.0 - pip - pkg-config=0.29 @@ -119,6 +119,7 @@ dependencies: - dgl - dglgo - google-search-results==2.4 + - langchain==0.1.9 - milvus==2.3.5 - nemollm - pymilvus==2.3.6 diff --git a/conda/environments/examples_cuda-121_arch-x86_64.yaml b/conda/environments/examples_cuda-121_arch-x86_64.yaml index 04cef6f3ac..ad2315d91c 100644 --- a/conda/environments/examples_cuda-121_arch-x86_64.yaml +++ b/conda/environments/examples_cuda-121_arch-x86_64.yaml @@ -27,16 +27,16 @@ dependencies: - huggingface_hub=0.20.2 - jsonpatch>=1.33 - kfp -- langchain=0.0.190 - libwebp=1.3.2 - mlflow=2.9.2 - networkx=2.8.8 - newspaper3k=0.2 - nodejs=18.* +- numexpr - numpydoc=1.5 - nvtabular=23.08.00 - onnx -- openai=0.28 +- openai=1.13 - papermill=2.4.0 - pip - pypdf=3.17.4 @@ -63,6 +63,7 @@ dependencies: - dgl - dglgo - google-search-results==2.4 + - langchain==0.1.9 - milvus==2.3.5 - nemollm - pymilvus==2.3.6 diff --git a/dependencies.yaml b/dependencies.yaml index bff273259b..e3dbdf55ea 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -21,24 +21,24 @@ files: cuda: ["12.1"] arch: [x86_64] includes: - - data_retrieval + - benchmark_cpp - build_cpp + - checks + - cudatoolkit + - cve-mitigation + - data_retrieval - development - - benchmark_cpp - - runtime + - doca + - docs - example-dfp-prod - example-gnn - example-llm-agents - example-llm-completion - example-llm-rag - example-llm-vdb-upload - - test_python_morpheus - - docs - - cudatoolkit - python - - checks - - cve-mitigation - - doca + - runtime + - test_python_morpheus dev: output: conda @@ -46,26 +46,16 @@ files: cuda: ["12.1"] arch: [x86_64] includes: - - data_retrieval + - benchmark_cpp - build_cpp + - checks + - cudatoolkit + - data_retrieval - development - - benchmark_cpp - - runtime - - test_python_morpheus - docs - - cudatoolkit - python - - checks - - examples: - output: conda - matrix: - cuda: ["12.1"] - arch: [x86_64] - includes: - - examples - runtime - - cudatoolkit + - test_python_morpheus build: output: none @@ -73,14 +63,14 @@ files: cuda: ["12.1"] arch: [x86_64] includes: - - data_retrieval + - benchmark_cpp - build_cpp + - cudatoolkit + - data_retrieval - development - - benchmark_cpp + - python - runtime - test_python_morpheus - - cudatoolkit - - python test: output: none @@ -88,20 +78,20 @@ files: cuda: ["12.1"] arch: [x86_64] includes: - - data_retrieval + - benchmark_cpp - build_cpp + - cudatoolkit + - data_retrieval - development - - benchmark_cpp - - runtime - - test_python_morpheus - example-dfp-prod - example-gnn - example-llm-agents - example-llm-completion - example-llm-rag - example-llm-vdb-upload - - cudatoolkit - python + - runtime + - test_python_morpheus docs: output: none @@ -109,19 +99,19 @@ files: cuda: ["12.1"] arch: [x86_64] includes: - - data_retrieval - - build_cpp - benchmark_cpp + - build_cpp - cudatoolkit - - python + - data_retrieval - docs - - runtime - example-dfp-prod - example-gnn - example-llm-agents - example-llm-completion - example-llm-rag - example-llm-vdb-upload + - python + - runtime runtime: output: conda @@ -138,7 +128,7 @@ files: cuda: ["12.1"] arch: [x86_64] includes: - - runtime + - cve-mitigation - example-dfp-prod - example-gnn - example-llm-agents @@ -146,7 +136,7 @@ files: - example-llm-rag - example-llm-vdb-upload - python - - cve-mitigation + - runtime model-utils: output: conda @@ -231,18 +221,18 @@ dependencies: common: - output_types: [conda] packages: + - &click click >=8 + - &numpydoc numpydoc=1.5 - breathe=4.35.0 - doxygen=1.9.2 - exhale=0.3.6 - ipython - myst-parser=0.18.1 - nbsphinx + - pluggy=1.3 - python-graphviz - sphinx - sphinx_rtd_theme - - &numpydoc numpydoc=1.5 - - pluggy=1.3 - - &click click >=8 benchmark_cpp: common: @@ -287,10 +277,10 @@ dependencies: - output_types: [conda] packages: - &nodejs nodejs=18.* - - pytest=7.4.4 - pytest-asyncio - pytest-benchmark=4.0 - pytest-cov + - pytest=7.4.4 - python-docx==1.1.0 - pip - pip: @@ -328,12 +318,13 @@ dependencies: - output_types: [conda] packages: - &grpcio-status grpcio-status==1.59 - - &langchain langchain=0.0.190 - &transformers transformers=4.36.2 # newer versions are incompatible with our pinned version of huggingface_hub - huggingface_hub=0.20.2 # work-around for https://github.com/UKPLab/sentence-transformers/issues/1762 - - pip + - numexpr - sentence-transformers + - pip - pip: + - &langchain langchain==0.1.9 - nemollm example-llm-completion: @@ -350,12 +341,12 @@ dependencies: - output_types: [conda] packages: - *grpcio-status - - *langchain - anyio>=3.7 - jsonpatch>=1.33 - - openai=0.28 + - openai=1.13 - pip - pip: + - *langchain - google-search-results==2.4 example-llm-vdb-upload: @@ -364,27 +355,27 @@ dependencies: packages: - *arxiv - *grpcio-status - - *langchain - *newspaper3k - *pypdf - onnx - pip - pip: - PyMuPDF==1.23.21 + - *langchain model-training-tuning: common: - output_types: [conda] packages: - *cuml + - *scikit-learn + - *transformers - jupyterlab - matplotlib - onnx - pandas - - *scikit-learn - seaborn - seqeval=1.2.2 - - *transformers - xgboost cve-mitigation: diff --git a/docs/source/conf.py b/docs/source/conf.py index d63f161384..053e8214a8 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -176,6 +176,7 @@ "morpheus.cli.commands", # Dont document the CLI in Sphinx "nvtabular", "pandas", + "pydantic", "pymilvus", "tensorrt", "torch", diff --git a/examples/llm/agents/kafka_pipeline.py b/examples/llm/agents/kafka_pipeline.py index 333dd37a89..91a8ee8add 100644 --- a/examples/llm/agents/kafka_pipeline.py +++ b/examples/llm/agents/kafka_pipeline.py @@ -39,7 +39,7 @@ def _build_agent_executor(model_name: str) -> AgentExecutor: - llm = OpenAIChat(model=model_name, temperature=0) + llm = OpenAIChat(model_name=model_name, model_kwargs={"temperature": 0.0}, client=None) tools = load_tools(["serpapi", "llm-math"], llm=llm) diff --git a/examples/llm/agents/simple_pipeline.py b/examples/llm/agents/simple_pipeline.py index 0e7e1f4321..9b2b95c611 100644 --- a/examples/llm/agents/simple_pipeline.py +++ b/examples/llm/agents/simple_pipeline.py @@ -15,11 +15,11 @@ import logging import time -from langchain import OpenAI from langchain.agents import AgentType from langchain.agents import initialize_agent from langchain.agents import load_tools from langchain.agents.agent import AgentExecutor +from langchain.llms.openai import OpenAI import cudf @@ -43,7 +43,7 @@ def _build_agent_executor(model_name: str) -> AgentExecutor: - llm = OpenAI(model=model_name, temperature=0) + llm = OpenAI(model=model_name, temperature=0.0, client=None) tools = load_tools(["serpapi", "llm-math"], llm=llm) diff --git a/examples/llm/common/utils.py b/examples/llm/common/utils.py index 6c9984ee42..1779bb6c88 100644 --- a/examples/llm/common/utils.py +++ b/examples/llm/common/utils.py @@ -15,8 +15,9 @@ import logging import pymilvus -from langchain.embeddings import HuggingFaceEmbeddings +from langchain.embeddings import HuggingFaceEmbeddings # pylint: disable=no-name-in-module +from morpheus.llm.services.llm_service import LLMService from morpheus.llm.services.nemo_llm_service import NeMoLLMService from morpheus.llm.services.openai_chat_service import OpenAIChatService from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP @@ -34,16 +35,19 @@ def build_huggingface_embeddings(model_name: str, model_kwargs: dict = None, enc def build_llm_service(model_name: str, llm_service: str, tokens_to_generate: int, **model_kwargs): lowered_llm_service = llm_service.lower() + + service: LLMService | None = None + if (lowered_llm_service == 'nemollm'): model_kwargs['tokens_to_generate'] = tokens_to_generate - llm_service = NeMoLLMService() + service = NeMoLLMService() elif (lowered_llm_service == 'openai'): model_kwargs['max_tokens'] = tokens_to_generate - llm_service = OpenAIChatService() + service = OpenAIChatService() else: raise RuntimeError(f"Unsupported LLM service name: {llm_service}") - return llm_service.get_client(model_name, **model_kwargs) + return service.get_client(model_name=model_name, **model_kwargs) def build_milvus_config(resource_schema_config: dict): diff --git a/examples/llm/completion/pipeline.py b/examples/llm/completion/pipeline.py index 00df19363a..80f5e8ea0f 100644 --- a/examples/llm/completion/pipeline.py +++ b/examples/llm/completion/pipeline.py @@ -54,8 +54,8 @@ def _build_engine(llm_service: str): else: raise ValueError(f"Invalid LLM service: {llm_service}") - llm_service = llm_service_cls() - llm_clinet = llm_service.get_client(model_name=model_name) + service = llm_service_cls() + llm_clinet = service.get_client(model_name=model_name) engine = LLMEngine() diff --git a/examples/llm/vdb_upload/module/content_extractor_module.py b/examples/llm/vdb_upload/module/content_extractor_module.py index 5b2ed2ce0f..ac5ae771e6 100755 --- a/examples/llm/vdb_upload/module/content_extractor_module.py +++ b/examples/llm/vdb_upload/module/content_extractor_module.py @@ -32,7 +32,7 @@ from pydantic import BaseModel # pylint: disable=no-name-in-module from pydantic import Field from pydantic import ValidationError -from pydantic import validator +from pydantic import field_validator from morpheus.messages import MessageMeta from morpheus.utils.module_utils import ModuleLoaderFactory @@ -55,12 +55,13 @@ class ContentExtractorSchema(BaseModel): converters_meta: Dict[str, Dict] = Field(default_factory=dict) num_threads: int = 10 - @validator('converters_meta', pre=True, allow_reuse=True) - def val_converters_meta(cls, to_validate: Dict[str, Dict]) -> Dict[str, Dict]: # pylint: disable=no-self-argument + @field_validator('converters_meta', mode="before") + @classmethod + def val_converters_meta(cls, to_validate: Dict[str, Dict]) -> Dict[str, Dict]: validated_meta = {} for key, value in to_validate.items(): if key.lower() == 'csv': - validated_meta[key] = CSVConverterSchema(**value) + validated_meta[key] = CSVConverterSchema(**value).model_dump() else: validated_meta[key] = value return validated_meta @@ -319,8 +320,10 @@ def file_content_extractor(builder: mrc.Builder): chunk_params = { file_type: { + # pylint: disable=no-member "chunk_size": converters_meta.get(file_type, {}).get("chunk_size", chunk_size), "chunk_overlap": converters_meta.get(file_type, {}).get("chunk_overlap", chunk_overlap) + # pylint: enable=no-member } for file_type in converters } diff --git a/morpheus/llm/services/llm_service.py b/morpheus/llm/services/llm_service.py index e1cbe3c65f..1d11481345 100644 --- a/morpheus/llm/services/llm_service.py +++ b/morpheus/llm/services/llm_service.py @@ -13,7 +13,6 @@ # limitations under the License. import logging -import typing from abc import ABC from abc import abstractmethod @@ -88,7 +87,7 @@ class LLMService(ABC): """ @abstractmethod - def get_client(self, model_name: str, **model_kwargs: dict[str, typing.Any]) -> LLMClient: + def get_client(self, *, model_name: str, **model_kwargs) -> LLMClient: """ Returns a client for interacting with a specific model. diff --git a/morpheus/llm/services/nemo_llm_service.py b/morpheus/llm/services/nemo_llm_service.py index 5dede8a240..0173354df9 100644 --- a/morpheus/llm/services/nemo_llm_service.py +++ b/morpheus/llm/services/nemo_llm_service.py @@ -50,11 +50,14 @@ class NeMoLLMClient(LLMClient): Additional keyword arguments to pass to the model when generating text. """ - def __init__(self, parent: "NeMoLLMService", model_name: str, **model_kwargs: dict[str, typing.Any]) -> None: + def __init__(self, parent: "NeMoLLMService", *, model_name: str, **model_kwargs) -> None: if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION super().__init__() + + assert parent is not None, "Parent service cannot be None." + self._parent = parent self._model_name = model_name self._model_kwargs = model_kwargs @@ -167,7 +170,7 @@ def __init__(self, *, api_key: str = None, org_id: str = None) -> None: org_id=org_id, ) - def get_client(self, model_name: str, **model_kwargs: dict[str, typing.Any]) -> NeMoLLMClient: + def get_client(self, *, model_name: str, **model_kwargs) -> NeMoLLMClient: """ Returns a client for interacting with a specific model. This method is the preferred way to create a client. @@ -180,4 +183,4 @@ def get_client(self, model_name: str, **model_kwargs: dict[str, typing.Any]) -> Additional keyword arguments to pass to the model when generating text. """ - return NeMoLLMClient(self, model_name, **model_kwargs) + return NeMoLLMClient(self, model_name=model_name, **model_kwargs) diff --git a/morpheus/llm/services/openai_chat_service.py b/morpheus/llm/services/openai_chat_service.py index 098625c538..76da57912f 100644 --- a/morpheus/llm/services/openai_chat_service.py +++ b/morpheus/llm/services/openai_chat_service.py @@ -11,11 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import asyncio import copy import logging +import os +import time import typing +from contextlib import contextmanager +from textwrap import dedent + +import appdirs from morpheus.llm.services.llm_service import LLMClient from morpheus.llm.services.llm_service import LLMService @@ -30,10 +35,36 @@ try: import openai + import openai.types.chat + import openai.types.chat.chat_completion except ImportError as import_exc: IMPORT_EXCEPTION = import_exc +class _ApiLogger: + """ + Simple class that allows passing back and forth the inputs and outputs of an API call via a context manager. + """ + + log_template: typing.ClassVar[str] = dedent(""" + ============= MESSAGE %d START ============== + --- Input --- + %s + --- Output --- (%f ms) + %s + ============= MESSAGE %d END ============== + """).strip("\n") + + def __init__(self, *, message_id: int, inputs: typing.Any) -> None: + + self.message_id = message_id + self.inputs = inputs + self.outputs = None + + def set_output(self, output: typing.Any) -> None: + self.outputs = output + + class OpenAIChatClient(LLMClient): """ Client for interacting with a specific OpenAI chat model. This class should be constructed with the @@ -51,11 +82,24 @@ class OpenAIChatClient(LLMClient): Additional keyword arguments to pass to the model when generating text. """ - def __init__(self, model_name: str, set_assistant: bool = False, **model_kwargs: dict[str, typing.Any]) -> None: + _prompt_key: str = "prompt" + _assistant_key: str = "assistant" + + def __init__(self, + parent: "OpenAIChatService", + *, + model_name: str, + set_assistant: bool = False, + **model_kwargs) -> None: if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION super().__init__() + + assert parent is not None, "Parent service cannot be None." + + self._parent = parent + self._model_name = model_name self._set_assistant = set_assistant self._prompt_key = "prompt" @@ -63,7 +107,10 @@ def __init__(self, model_name: str, set_assistant: bool = False, **model_kwargs: # Preserve original configuration. self._model_kwargs = copy.deepcopy(model_kwargs) - self._model_kwargs['temperature'] = model_kwargs.get('temperature', 0) + + # Create the client objects for both sync and async + self._client = openai.OpenAI() + self._client_async = openai.AsyncOpenAI() def get_input_names(self) -> list[str]: input_names = [self._prompt_key] @@ -72,27 +119,42 @@ def get_input_names(self) -> list[str]: return input_names - def _create_messages(self, prompt: str, assistant: str = None) -> list[dict[str, str]]: - messages = [ - { - "role": "system", "content": "You are a helpful assistant." - }, - { - "role": "user", "content": prompt - }, - ] + @contextmanager + def _api_logger(self, inputs: typing.Any): - if (self._set_assistant): + message_id = self._parent._get_message_id() + start_time = time.time() + + api_logger = _ApiLogger(message_id=message_id, inputs=inputs) + + yield api_logger + + end_time = time.time() + duration_ms = (end_time - start_time) * 1000.0 + + self._parent._logger.info(_ApiLogger.log_template, + message_id, + api_logger.inputs, + duration_ms, + api_logger.outputs, + message_id) + + def _create_messages(self, + prompt: str, + assistant: str = None) -> list["openai.types.chat.ChatCompletionMessageParam"]: + messages: list[openai.types.chat.ChatCompletionMessageParam] = [{"role": "user", "content": prompt}] + + if (self._set_assistant and assistant is not None): messages.append({"role": "assistant", "content": assistant}) return messages - def _extract_completion(self, completion: "openai.openai_object.OpenAIObject") -> str: - choices = completion.get('choices', []) + def _extract_completion(self, completion: "openai.types.chat.chat_completion.ChatCompletion") -> str: + choices = completion.choices if len(choices) == 0: raise ValueError("No choices were returned from the model.") - content = choices[0].get('message', {}).get('content', None) + content = choices[0].message.content if content is None: raise ValueError("No content was returned from the model.") @@ -101,7 +163,8 @@ def _extract_completion(self, completion: "openai.openai_object.OpenAIObject") - def _generate(self, prompt: str, assistant: str = None) -> str: messages = self._create_messages(prompt, assistant) - output = openai.ChatCompletion.create(model=self._model_name, messages=messages, **self._model_kwargs) + output: openai.types.chat.chat_completion.ChatCompletion = self._client.chat.completions.create( + model=self._model_name, messages=messages, **self._model_kwargs) return self._extract_completion(output) @@ -117,9 +180,20 @@ def generate(self, input_dict: dict[str, str]) -> str: return self._generate(input_dict[self._prompt_key], input_dict.get(self._assistant_key)) async def _generate_async(self, prompt: str, assistant: str = None) -> str: + messages = self._create_messages(prompt, assistant) - output = await openai.ChatCompletion.acreate(model=self._model_name, messages=messages, **self._model_kwargs) + with self._api_logger(inputs=messages) as msg_logger: + + try: + output = await self._client_async.chat.completions.create(model=self._model_name, + messages=messages, + **self._model_kwargs) + except Exception as exc: + self._parent._logger.error("Error generating completion: %s", exc) + raise + + msg_logger.set_output(output) return self._extract_completion(output) @@ -186,16 +260,54 @@ class OpenAIChatService(LLMService): A service for interacting with OpenAI Chat models, this class should be used to create clients. """ - def __init__(self) -> None: + def __init__(self, *, default_model_kwargs: dict = None) -> None: + """ + Creates a service for interacting with OpenAI Chat models, this class should be used to create clients. + + Parameters + ---------- + default_model_kwargs : dict, optional + Default arguments to use when creating a client via the `get_client` function. Any argument specified here + will automatically be used when calling `get_client`. Arguments specified in the `get_client` function will + overwrite default values specified here. This is useful to set model arguments before creating multiple + clients. By default None + + Raises + ------ + ImportError + If the `openai` library is not found in the python environment. + """ if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION super().__init__() - def get_client(self, - model_name: str, - set_assistant: bool = False, - **model_kwargs: dict[str, typing.Any]) -> OpenAIChatClient: + self._default_model_kwargs = default_model_kwargs or {} + + self._logger = logging.getLogger(f"{__package__}.{OpenAIChatService.__name__}") + + # Dont propagate up to the default logger. Just log to file + self._logger.propagate = False + + log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "openai.log") + + # Add a file handler + file_handler = logging.FileHandler(log_file) + + self._logger.addHandler(file_handler) + self._logger.setLevel(logging.INFO) + + self._logger.info("OpenAI Chat Service started.") + + self._message_count = 0 + + def _get_message_id(self): + + self._message_count += 1 + + return self._message_count + + def get_client(self, *, model_name: str, set_assistant: bool = False, **model_kwargs) -> OpenAIChatClient: """ Returns a client for interacting with a specific model. This method is the preferred way to create a client. @@ -208,7 +320,10 @@ def get_client(self, When `True`, a second input field named `assistant` will be used to proide additional context to the model. model_kwargs : dict[str, typing.Any] - Additional keyword arguments to pass to the model when generating text. + Additional keyword arguments to pass to the model when generating text. Arguments specified here will + overwrite the `default_model_kwargs` set in the service constructor """ - return OpenAIChatClient(model_name=model_name, set_assistant=set_assistant, **model_kwargs) + final_model_kwargs = {**self._default_model_kwargs, **model_kwargs} + + return OpenAIChatClient(self, model_name=model_name, set_assistant=set_assistant, **final_model_kwargs) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index d601d3880d..e4111926e9 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -443,4 +443,4 @@ def _convert_one_response(output: MultiResponseMessage, inf: MultiInferenceMessa for i, idx in enumerate(mess_ids): probs[idx, :] = cp.maximum(probs[idx, :], resp_probs[i, :]) - return MultiResponseMessage.from_message(inf, memory=memory, offset=inf.offset, count=inf.mess_count) + return MultiResponseMessage.from_message(inf, memory=memory, offset=seq_offset, count=seq_count) diff --git a/morpheus/stages/input/arxiv_source.py b/morpheus/stages/input/arxiv_source.py index 1b03b299fa..c1ed77c0cb 100644 --- a/morpheus/stages/input/arxiv_source.py +++ b/morpheus/stages/input/arxiv_source.py @@ -37,7 +37,7 @@ IMPORT_ERROR_MESSAGE = ( "ArxivSource requires additional dependencies to be installed. Install them by running the following command: " "`conda env update --solver=libmamba -n morpheus" - "--file morpheus/conda/environments/dev_cuda-121_arch-x86_64.yaml --prune`") + "--file conda/environments/all_cuda-121_arch-x86_64.yaml --prune`") @register_stage("from-arxiv") @@ -47,7 +47,7 @@ class ArxivSource(PreallocatorMixin, SingleOutputSource): This stage requires several additional dependencies to be installed. Install them by running the following command: `conda env update --solver=libmamba -n morpheus " - "--file morpheus/conda/environments/dev_cuda-121_arch-x86_64.yaml --prune` + "--file conda/environments/all_cuda-121_arch-x86_64.yaml --prune` Parameters ---------- diff --git a/tests/_utils/llm.py b/tests/_utils/llm.py index 6925b90df5..9c48583b7e 100644 --- a/tests/_utils/llm.py +++ b/tests/_utils/llm.py @@ -14,6 +14,7 @@ import asyncio import typing +from unittest import mock from morpheus.llm import InputMap from morpheus.llm import LLMContext @@ -73,3 +74,20 @@ def execute_task_handler(task_handler: LLMTaskHandler, message = asyncio.run(task_handler.try_handle(context)) return message + + +def _mk_mock_choice(message: str) -> mock.MagicMock: + mock_choice = mock.MagicMock() + mock_choice.message.content = message + return mock_choice + + +def mk_mock_openai_response(messages: list[str]) -> mock.MagicMock: + """ + Creates a mocked openai.types.chat.chat_completion.ChatCompletion response with the given messages. + """ + response = mock.MagicMock() + mock_choices = [_mk_mock_choice(message) for message in messages] + response.choices = mock_choices + + return response diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py index f877612bb6..f051218193 100644 --- a/tests/benchmarks/conftest.py +++ b/tests/benchmarks/conftest.py @@ -128,16 +128,19 @@ def mock_serpapi_request_time_fixture(): @pytest.mark.usefixtures("openai") @pytest.fixture(name="mock_chat_completion") -def mock_chat_completion_fixture(mock_chat_completion: mock.MagicMock, mock_openai_request_time: float): +def mock_chat_completion_fixture(mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock], + mock_openai_request_time: float): + (mock_client, mock_async_client) = mock_chat_completion async def sleep_first(*args, **kwargs): # Sleep time is based on average request time await asyncio.sleep(mock_openai_request_time) return mock.DEFAULT - mock_chat_completion.acreate.side_effect = sleep_first + mock_async_client.chat.completions.create.side_effect = sleep_first + mock_client.chat.completions.create.side_effect = sleep_first - yield mock_chat_completion + yield (mock_client, mock_async_client) @pytest.mark.usefixtures("nemollm") diff --git a/tests/conftest.py b/tests/conftest.py index 6599f53ce3..c422c18ea9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1051,13 +1051,15 @@ def openai_fixture(fail_missing: bool): @pytest.mark.usefixtures("openai") @pytest.fixture(name="mock_chat_completion") def mock_chat_completion_fixture(): - with mock.patch("openai.ChatCompletion") as mock_chat_completion: - mock_chat_completion.return_value = mock_chat_completion - - response = {'choices': [{'message': {'content': 'test_output'}}]} - mock_chat_completion.create.return_value = response.copy() - mock_chat_completion.acreate = mock.AsyncMock(return_value=response.copy()) - yield mock_chat_completion + from _utils.llm import mk_mock_openai_response + with (mock.patch("openai.OpenAI") as mock_client, mock.patch("openai.AsyncOpenAI") as mock_async_client): + mock_client.return_value = mock_client + mock_async_client.return_value = mock_async_client + + mock_client.chat.completions.create.return_value = mk_mock_openai_response(['test_output']) + mock_async_client.chat.completions.create = mock.AsyncMock( + return_value=mk_mock_openai_response(['test_output'])) + yield (mock_client, mock_async_client) @pytest.mark.usefixtures("nemollm") diff --git a/tests/llm/services/test_llm_service_pipe.py b/tests/llm/services/test_llm_service_pipe.py index a04d09a683..fa6c1ac0c7 100644 --- a/tests/llm/services/test_llm_service_pipe.py +++ b/tests/llm/services/test_llm_service_pipe.py @@ -18,6 +18,7 @@ import cudf from _utils import assert_results +from _utils.llm import mk_mock_openai_response from morpheus.config import Config from morpheus.llm import LLMEngine from morpheus.llm.nodes.extracter_node import ExtracterNode @@ -86,15 +87,15 @@ def test_completion_pipe_nemo( def test_completion_pipe_openai(config: Config, - mock_chat_completion: mock.MagicMock, + mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock], country_prompts: list[str], capital_responses: list[str]): - mock_chat_completion.acreate.side_effect = [{ - "choices": [{ - 'message': { - 'content': response - } - }] - } for response in capital_responses] + (mock_client, mock_async_client) = mock_chat_completion + mock_async_client.chat.completions.create.side_effect = [ + mk_mock_openai_response([response]) for response in capital_responses + ] _run_pipeline(config, OpenAIChatService, country_prompts, capital_responses) + + mock_client.chat.completions.create.assert_not_called() + mock_async_client.chat.completions.create.assert_called() diff --git a/tests/llm/services/test_nemo_llm_client.py b/tests/llm/services/test_nemo_llm_client.py index b9636e722a..5a7993006c 100644 --- a/tests/llm/services/test_nemo_llm_client.py +++ b/tests/llm/services/test_nemo_llm_client.py @@ -23,19 +23,19 @@ def test_constructor(mock_nemollm: mock.MagicMock, mock_nemo_service: mock.MagicMock): - client = NeMoLLMClient(mock_nemo_service, "test_model", additional_arg="test_arg") + client = NeMoLLMClient(mock_nemo_service, model_name="test_model", additional_arg="test_arg") assert isinstance(client, LLMClient) mock_nemollm.assert_not_called() def test_get_input_names(mock_nemollm: mock.MagicMock, mock_nemo_service: mock.MagicMock): - client = NeMoLLMClient(mock_nemo_service, "test_model", additional_arg="test_arg") + client = NeMoLLMClient(mock_nemo_service, model_name="test_model", additional_arg="test_arg") assert client.get_input_names() == ["prompt"] mock_nemollm.assert_not_called() def test_generate(mock_nemollm: mock.MagicMock, mock_nemo_service: mock.MagicMock): - client = NeMoLLMClient(mock_nemo_service, "test_model", additional_arg="test_arg") + client = NeMoLLMClient(mock_nemo_service, model_name="test_model", additional_arg="test_arg") assert client.generate({'prompt': "test_prompt"}) == "test_output" mock_nemollm.generate_multiple.assert_called_once_with(model="test_model", prompts=["test_prompt"], @@ -46,7 +46,7 @@ def test_generate(mock_nemollm: mock.MagicMock, mock_nemo_service: mock.MagicMoc def test_generate_batch(mock_nemollm: mock.MagicMock, mock_nemo_service: mock.MagicMock): mock_nemollm.generate_multiple.return_value = ["output1", "output2"] - client = NeMoLLMClient(mock_nemo_service, "test_model", additional_arg="test_arg") + client = NeMoLLMClient(mock_nemo_service, model_name="test_model", additional_arg="test_arg") assert client.generate_batch({'prompt': ["prompt1", "prompt2"]}) == ["output1", "output2"] mock_nemollm.generate_multiple.assert_called_once_with(model="test_model", prompts=["prompt1", "prompt2"], @@ -63,7 +63,7 @@ def test_generate_async( mock_nemo_service: mock.MagicMock): mock_asyncio_gather.return_value = [mock.MagicMock()] - client = NeMoLLMClient(mock_nemo_service, "test_model", additional_arg="test_arg") + client = NeMoLLMClient(mock_nemo_service, model_name="test_model", additional_arg="test_arg") results = asyncio.run(client.generate_async({'prompt': "test_prompt"})) assert results == "test_output" mock_nemollm.generate.assert_called_once_with("test_model", @@ -82,7 +82,7 @@ def test_generate_batch_async( mock_asyncio_gather.return_value = [mock.MagicMock(), mock.MagicMock()] mock_nemollm.post_process_generate_response.side_effect = [{"text": "output1"}, {"text": "output2"}] - client = NeMoLLMClient(mock_nemo_service, "test_model", additional_arg="test_arg") + client = NeMoLLMClient(mock_nemo_service, model_name="test_model", additional_arg="test_arg") results = asyncio.run(client.generate_batch_async({'prompt': ["prompt1", "prompt2"]})) assert results == ["output1", "output2"] mock_nemollm.generate.assert_has_calls([ @@ -101,7 +101,7 @@ def test_generate_batch_async_error( mock_asyncio_gather.return_value = [mock.MagicMock(), mock.MagicMock()] mock_nemollm.post_process_generate_response.return_value = {"status": "fail", "msg": "unittest"} - client = NeMoLLMClient(mock_nemo_service, "test_model", additional_arg="test_arg") + client = NeMoLLMClient(mock_nemo_service, model_name="test_model", additional_arg="test_arg") with pytest.raises(RuntimeError, match="unittest"): asyncio.run(client.generate_batch_async({'prompt': ["prompt1", "prompt2"]})) diff --git a/tests/llm/services/test_nemo_llm_service.py b/tests/llm/services/test_nemo_llm_service.py index d9ef769fca..d91a6f7351 100644 --- a/tests/llm/services/test_nemo_llm_service.py +++ b/tests/llm/services/test_nemo_llm_service.py @@ -43,6 +43,6 @@ def test_constructor(mock_nemollm: mock.MagicMock, api_key: str, org_id: str): def test_get_client(): service = NeMoLLMService(api_key="test_api_key") - client = service.get_client("test_model") + client = service.get_client(model_name="test_model") assert isinstance(client, NeMoLLMClient) diff --git a/tests/llm/services/test_openai_chat_client.py b/tests/llm/services/test_openai_chat_client.py index e6b6a450df..21013ce463 100644 --- a/tests/llm/services/test_openai_chat_client.py +++ b/tests/llm/services/test_openai_chat_client.py @@ -18,14 +18,18 @@ import pytest +from _utils.llm import mk_mock_openai_response from morpheus.llm.services.llm_service import LLMClient from morpheus.llm.services.openai_chat_service import OpenAIChatClient +from morpheus.llm.services.openai_chat_service import OpenAIChatService -def test_constructor(mock_chat_completion: mock.MagicMock): - client = OpenAIChatClient(model_name="test_model") +def test_constructor(mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock]): + client = OpenAIChatClient(OpenAIChatService(), model_name="test_model") assert isinstance(client, LLMClient) - mock_chat_completion.assert_not_called() + + for mock_client in mock_chat_completion: + mock_client.assert_called() @pytest.mark.parametrize("use_async", [True, False]) @@ -34,87 +38,79 @@ def test_constructor(mock_chat_completion: mock.MagicMock): [({ "prompt": "test_prompt", "assistant": "assistant_response" }, - True, - [{ - "role": "system", "content": "You are a helpful assistant." - }, { + True, [{ "role": "user", "content": "test_prompt" }, { "role": "assistant", "content": "assistant_response" - }]), - ({ - "prompt": "test_prompt" - }, - False, [{ - "role": "system", "content": "You are a helpful assistant." - }, { + }]), ({ + "prompt": "test_prompt" + }, False, [{ "role": "user", "content": "test_prompt" }])]) @pytest.mark.parametrize("temperature", [0, 1, 2]) -def test_generate(mock_chat_completion: mock.MagicMock, +def test_generate(mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock], use_async: bool, input_dict: dict[str, str], set_assistant: bool, expected_messages: list[dict], temperature: int): - client = OpenAIChatClient(model_name="test_model", set_assistant=set_assistant, temperature=temperature) + (mock_client, mock_async_client) = mock_chat_completion + client = OpenAIChatClient(OpenAIChatService(), + model_name="test_model", + set_assistant=set_assistant, + temperature=temperature) if use_async: results = asyncio.run(client.generate_async(input_dict)) - mock_chat_completion.acreate.assert_called_once_with(model="test_model", - messages=expected_messages, - temperature=temperature) + mock_async_client.chat.completions.create.assert_called_once_with(model="test_model", + messages=expected_messages, + temperature=temperature) + mock_client.chat.completions.create.assert_not_called() else: results = client.generate(input_dict) - mock_chat_completion.create.assert_called_once_with(model="test_model", - messages=expected_messages, - temperature=temperature) + mock_client.chat.completions.create.assert_called_once_with(model="test_model", + messages=expected_messages, + temperature=temperature) + mock_async_client.chat.completions.create.assert_not_called() assert results == "test_output" @pytest.mark.parametrize("use_async", [True, False]) -@pytest.mark.parametrize( - "inputs, set_assistant, expected_messages", - [({ - "prompt": ["prompt1", "prompt2"], "assistant": ["assistant1", "assistant2"] - }, - True, - [[{ - "role": "system", "content": "You are a helpful assistant." - }, { - "role": "user", "content": "prompt1" - }, { - "role": "assistant", "content": "assistant1" - }], - [{ - "role": "system", "content": "You are a helpful assistant." - }, { - "role": "user", "content": "prompt2" - }, { - "role": "assistant", "content": "assistant2" - }]]), - ({ - "prompt": ["prompt1", "prompt2"] - }, - False, - [[{ - "role": "system", "content": "You are a helpful assistant." - }, { - "role": "user", "content": "prompt1" - }], [{ - "role": "system", "content": "You are a helpful assistant." - }, { - "role": "user", "content": "prompt2" - }]])]) +@pytest.mark.parametrize("inputs, set_assistant, expected_messages", + [({ + "prompt": ["prompt1", "prompt2"], "assistant": ["assistant1", "assistant2"] + }, + True, + [[{ + "role": "user", "content": "prompt1" + }, { + "role": "assistant", "content": "assistant1" + }], [{ + "role": "user", "content": "prompt2" + }, { + "role": "assistant", "content": "assistant2" + }]]), + ({ + "prompt": ["prompt1", "prompt2"] + }, + False, [[{ + "role": "user", "content": "prompt1" + }], [{ + "role": "user", "content": "prompt2" + }]])]) @pytest.mark.parametrize("temperature", [0, 1, 2]) -def test_generate_batch(mock_chat_completion: mock.MagicMock, +def test_generate_batch(mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock], use_async: bool, inputs: dict[str, list[str]], set_assistant: bool, expected_messages: list[list[dict]], temperature: int): - client = OpenAIChatClient(model_name="test_model", set_assistant=set_assistant, temperature=temperature) + (mock_client, mock_async_client) = mock_chat_completion + client = OpenAIChatClient(OpenAIChatService(), + model_name="test_model", + set_assistant=set_assistant, + temperature=temperature) expected_results = ["test_output" for _ in range(len(inputs["prompt"]))] expected_calls = [ @@ -123,28 +119,22 @@ def test_generate_batch(mock_chat_completion: mock.MagicMock, if use_async: results = asyncio.run(client.generate_batch_async(inputs)) - mock_chat_completion.acreate.assert_has_calls(expected_calls, any_order=False) + mock_async_client.chat.completions.create.assert_has_calls(expected_calls, any_order=False) + mock_client.chat.completions.create.assert_not_called() + else: results = client.generate_batch(inputs) - mock_chat_completion.create.assert_has_calls(expected_calls, any_order=False) + mock_client.chat.completions.create.assert_has_calls(expected_calls, any_order=False) + mock_async_client.chat.completions.create.assert_not_called() assert results == expected_results -@pytest.mark.parametrize("completion", [{ - "choices": [] -}, { - "choices": [{}] -}, { - "choices": [{ - "message": {} - }] -}], - ids=["no_choices", "no_message", "no_content"]) -def test_generate_invalid_completions(mock_chat_completion: mock.MagicMock, completion: dict): - mock_chat_completion.create.return_value = completion - - client = OpenAIChatClient(model_name="test_model") +@pytest.mark.parametrize("completion", [[], [None]], ids=["no_choices", "no_content"]) +@pytest.mark.usefixtures("mock_chat_completion") +def test_extract_completion_errors(completion: list): + client = OpenAIChatClient(OpenAIChatService(), model_name="test_model") + mock_completion = mk_mock_openai_response(completion) with pytest.raises(ValueError): - client.generate({"prompt": "test_prompt"}) + client._extract_completion(mock_completion) diff --git a/tests/llm/services/test_openai_chat_service.py b/tests/llm/services/test_openai_chat_service.py index 20cb454c7d..fc05d64543 100644 --- a/tests/llm/services/test_openai_chat_service.py +++ b/tests/llm/services/test_openai_chat_service.py @@ -29,7 +29,7 @@ def test_constructor(): def test_get_client(): service = OpenAIChatService() - client = service.get_client("test_model") + client = service.get_client(model_name="test_model") assert isinstance(client, OpenAIChatClient) @@ -39,10 +39,11 @@ def test_get_client(): @mock.patch("morpheus.llm.services.openai_chat_service.OpenAIChatClient") def test_get_client_passed_args(mock_client: mock.MagicMock, set_assistant: bool, temperature: int): service = OpenAIChatService() - service.get_client("test_model", set_assistant=set_assistant, temperature=temperature, test='this') + service.get_client(model_name="test_model", set_assistant=set_assistant, temperature=temperature, test='this') # Ensure the get_client method passed on the set_assistant and model kwargs - mock_client.assert_called_once_with(model_name="test_model", + mock_client.assert_called_once_with(service, + model_name="test_model", set_assistant=set_assistant, temperature=temperature, test='this') diff --git a/tests/llm/test_completion_pipe.py b/tests/llm/test_completion_pipe.py index db020b6ba6..39c16d7e3b 100644 --- a/tests/llm/test_completion_pipe.py +++ b/tests/llm/test_completion_pipe.py @@ -21,6 +21,7 @@ import cudf from _utils import assert_results +from _utils.llm import mk_mock_openai_response from morpheus.config import Config from morpheus.llm import LLMEngine from morpheus.llm.nodes.extracter_node import ExtracterNode @@ -106,19 +107,18 @@ def test_completion_pipe_nemo( @pytest.mark.usefixtures("openai") def test_completion_pipe_openai(config: Config, - mock_chat_completion: mock.MagicMock, + mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock], countries: list[str], capital_responses: list[str]): - mock_chat_completion.acreate.side_effect = [{ - "choices": [{ - 'message': { - 'content': response - } - }] - } for response in capital_responses] + (mock_client, mock_async_client) = mock_chat_completion + mock_async_client.chat.completions.create.side_effect = [ + mk_mock_openai_response([response]) for response in capital_responses + ] results = _run_pipeline(config, OpenAIChatService, countries=countries, capital_responses=capital_responses) assert_results(results) + mock_client.chat.completions.create.assert_not_called() + mock_async_client.chat.completions.create.assert_called() @pytest.mark.usefixtures("nemollm") diff --git a/tests/llm/test_rag_standalone_pipe.py b/tests/llm/test_rag_standalone_pipe.py index b9577a89ef..e394420845 100644 --- a/tests/llm/test_rag_standalone_pipe.py +++ b/tests/llm/test_rag_standalone_pipe.py @@ -25,6 +25,7 @@ from _utils import TEST_DIRS from _utils import assert_results from _utils.dataset_manager import DatasetManager +from _utils.llm import mk_mock_openai_response from _utils.milvus import populate_milvus from morpheus.config import Config from morpheus.config import PipelineModes @@ -160,18 +161,15 @@ def test_rag_standalone_pipe_nemo(config: Config, @pytest.mark.parametrize("repeat_count", [5]) @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'llm/common/utils.py')) def test_rag_standalone_pipe_openai(config: Config, - mock_chat_completion: mock.MagicMock, + mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock], dataset: DatasetManager, milvus_server_uri: str, repeat_count: int, import_mod: types.ModuleType): - mock_chat_completion.acreate.side_effect = [{ - "choices": [{ - 'message': { - 'content': EXPECTED_RESPONSE - } - }] - } for _ in range(repeat_count)] + (mock_client, mock_async_client) = mock_chat_completion + mock_async_client.chat.completions.create.side_effect = [ + mk_mock_openai_response([EXPECTED_RESPONSE]) for _ in range(repeat_count) + ] collection_name = "test_rag_standalone_pipe_openai" populate_milvus(milvus_server_uri=milvus_server_uri, @@ -190,6 +188,8 @@ def test_rag_standalone_pipe_openai(config: Config, utils_mod=import_mod, ) assert_results(results) + mock_client.chat.completions.create.assert_not_called() + mock_async_client.chat.completions.create.assert_called() @pytest.mark.usefixtures("nemollm") diff --git a/tests/stages/arxiv/test_arxiv_source.py b/tests/stages/arxiv/test_arxiv_source.py index 30d9730064..6daa0ebd71 100644 --- a/tests/stages/arxiv/test_arxiv_source.py +++ b/tests/stages/arxiv/test_arxiv_source.py @@ -162,11 +162,15 @@ def test_splitting_pages(config: Config, num_expected_chunks = len(page_content_col) source_col = [] page_col = [] + type_col = [] for _ in range(num_expected_chunks): source_col.append(pdf_file) page_col.append(0) + type_col.append("Document") - expected_df = cudf.DataFrame({"page_content": page_content_col, "source": source_col, "page": page_col}) + expected_df = cudf.DataFrame({ + "page_content": page_content_col, "source": source_col, "page": page_col, "type": type_col + }) loader = langchain.document_loaders.PyPDFLoader(pdf_file) documents = loader.load() @@ -189,8 +193,11 @@ def test_splitting_pages_no_chunks(config: Config, page_content_col = [content] source_col = [pdf_file] page_col = [0] + type_col = ["Document"] - expected_df = cudf.DataFrame({"page_content": page_content_col, "source": source_col, "page": page_col}) + expected_df = cudf.DataFrame({ + "page_content": page_content_col, "source": source_col, "page": page_col, "type": type_col + }) loader = langchain.document_loaders.PyPDFLoader(pdf_file) documents = loader.load() diff --git a/tests/stages/arxiv/test_arxiv_source_pipe.py b/tests/stages/arxiv/test_arxiv_source_pipe.py index 4d8b33bc32..bf903de717 100644 --- a/tests/stages/arxiv/test_arxiv_source_pipe.py +++ b/tests/stages/arxiv/test_arxiv_source_pipe.py @@ -39,8 +39,11 @@ def test_arxiv_source_pipeline(mock_arxiv_search: mock.MagicMock, config: Config page_content_col = [content] source_col = [cached_pdf] page_col = [0] + type_col = ["Document"] - expected_df = cudf.DataFrame({"page_content": page_content_col, "source": source_col, "page": page_col}) + expected_df = cudf.DataFrame({ + "page_content": page_content_col, "source": source_col, "page": page_col, "type": type_col + }) # The ArxivSource sets a pe_count of 6 for the process_pages node, and we need at least that number of threads # in the config to run the pipeline