Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/queue metrics #860

Merged
merged 18 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion mlserver/batching/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from .requests import BatchedRequests

from prometheus_client import Histogram


class AdaptiveBatcher:
def __init__(self, model: MLModel):
Expand All @@ -31,6 +33,9 @@ def __init__(self, model: MLModel):
self.__requests: Optional[Queue[Tuple[str, InferenceRequest]]] = None
self._async_responses: Dict[str, Future[InferenceResponse]] = {}
self._batching_task = None
self.batch_request_queue_size = Histogram(
"batch_request_queue", "counter of request queue batch size"
)

async def predict(self, req: InferenceRequest) -> InferenceResponse:
internal_id, _ = await self._queue_request(req)
Expand All @@ -51,7 +56,7 @@ async def _queue_request(
req: InferenceRequest,
) -> Tuple[str, Awaitable[InferenceResponse]]:
internal_id = generate_uuid()

self._batch_queue_monitor()
await self._requests.put((internal_id, req))

loop = asyncio.get_running_loop()
Expand All @@ -60,6 +65,11 @@ async def _queue_request(

return internal_id, async_response

def _batch_queue_monitor(self):
"""Monitorize batch queue size"""
batch_queue_size = self._requests.qsize()
self.batch_request_queue_size.observe(batch_queue_size)

async def _wait_response(self, internal_id: str) -> InferenceResponse:
async_response = self._async_responses[internal_id]

Expand Down
23 changes: 19 additions & 4 deletions mlserver/parallel/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from typing import Dict, List
from typing import Dict, List, Tuple
from itertools import cycle
from multiprocessing import Queue
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -17,6 +17,7 @@
ModelRequestMessage,
ModelResponseMessage,
)
from prometheus_client import Histogram


class Dispatcher:
Expand All @@ -28,6 +29,11 @@ def __init__(self, workers: Dict[int, Worker], responses: Queue):
self._process_responses_task = None
self._executor = ThreadPoolExecutor()
self._async_responses: Dict[str, Future[ModelResponseMessage]] = {}
self.parallel_request_queue_size = Histogram(
"parallel_request_queue",
"counter of request queue size for workers",
["workerpid"],
)

def start(self):
self._active = True
Expand Down Expand Up @@ -79,18 +85,27 @@ async def _process_response(self, response: ModelResponseMessage):
async def dispatch_request(
self, request_message: ModelRequestMessage
) -> ModelResponseMessage:
worker = self._get_worker()
worker, wpid = self._get_worker()
self._workers_queue_monitor(worker, wpid)
worker.send_request(request_message)

return await self._dispatch(request_message)

def _get_worker(self) -> Worker:
def _get_worker(self) -> Tuple[Worker, int]:
"""
Get next available worker.
By default, this is just a round-robin through all the workers.
"""
worker_pid = next(self._workers_round_robin)
return self._workers[worker_pid]
return self._workers[worker_pid], worker_pid
alvarorsant marked this conversation as resolved.
Show resolved Hide resolved

def _workers_queue_monitor(self, worker: Worker, worker_pid: int):
"""Get metrics from every worker request queue"""
queue_size = worker._requests.qsize()

self.parallel_request_queue_size.labels(workerpid=str(worker_pid)).observe(
float(queue_size)
)

async def dispatch_update(
self, model_update: ModelUpdateMessage
Expand Down
3 changes: 3 additions & 0 deletions tests/batching/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

from .conftest import TestRequestSender

# from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry
alvarorsant marked this conversation as resolved.
Show resolved Hide resolved


async def test_batch_requests(
adaptive_batcher: AdaptiveBatcher,
Expand Down
1 change: 1 addition & 0 deletions tests/batching/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from mlserver.model import MLModel
from mlserver.types import InferenceRequest, InferenceResponse
from mlserver.batching.hooks import load_batching
from prometheus_client.registry import CollectorRegistry


async def test_batching_predict(
Expand Down
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from mlserver.handlers import DataPlane, ModelRepositoryHandlers
from mlserver.registry import MultiModelRegistry
from mlserver.repository import ModelRepository, DEFAULT_MODEL_SETTINGS_FILENAME
from prometheus_client.registry import REGISTRY, CollectorRegistry
from starlette_exporter import PrometheusMiddleware
from mlserver.parallel import InferencePool
from mlserver.utils import install_uvloop_event_loop
from mlserver.logging import get_logger
Expand Down Expand Up @@ -43,6 +45,31 @@ def logger():
return logger


@pytest.fixture(autouse=True)
def prometheus_registry() -> CollectorRegistry:
"""
Fixture used to ensure the registry is cleaned on each run.
Otherwise, `py-grpc-prometheus` will complain that metrics already exist.

TODO: Open issue in `py-grpc-prometheus` to check whether a metric exists
before creating it.
For an example on how to do this, see `starlette_exporter`'s implementation

https://github.com/stephenhillier/starlette_exporter/blob/947d4d631dd9a6a8c1071b45573c5562acba4834/starlette_exporter/middleware.py#L67
"""
# NOTE: Since the `REGISTRY` object is global, this fixture is NOT
# thread-safe!!
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)

# Clean metrics from `starlette_exporter` as well, as otherwise they won't
# get re-created
PrometheusMiddleware._metrics.clear()

yield REGISTRY


@pytest.fixture
def event_loop():
# By default use uvloop for tests
Expand Down
27 changes: 27 additions & 0 deletions tests/grpc/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from mlserver.grpc import dataplane_pb2 as pb
from mlserver.grpc.dataplane_pb2_grpc import GRPCInferenceServiceStub
from mlserver.grpc import GRPCServer
from prometheus_client.registry import REGISTRY, CollectorRegistry
from starlette_exporter import PrometheusMiddleware

from ..conftest import TESTDATA_PATH
from ..fixtures import SumModel
Expand All @@ -30,6 +32,31 @@ def _read_testdata_pb(payload_path: str, pb_klass):
return model_infer_request


@pytest.fixture()
def delete_registry() -> CollectorRegistry:
"""
Fixture used to ensure the registry is cleaned on each run.
Otherwise, `py-grpc-prometheus` will complain that metrics already exist.

TODO: Open issue in `py-grpc-prometheus` to check whether a metric exists
before creating it.
For an example on how to do this, see `starlette_exporter`'s implementation

https://github.com/stephenhillier/starlette_exporter/blob/947d4d631dd9a6a8c1071b45573c5562acba4834/starlette_exporter/middleware.py#L67
"""
# NOTE: Since the `REGISTRY` object is global, this fixture is NOT
# thread-safe!!
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
REGISTRY.unregister(collector)

# Clean metrics from `starlette_exporter` as well, as otherwise they won't
# get re-created
PrometheusMiddleware._metrics.clear()

yield REGISTRY


@pytest.fixture
async def model_registry(
sum_model_settings: ModelSettings, inference_pool: InferencePool
Expand Down
13 changes: 11 additions & 2 deletions tests/grpc/test_model_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from mlserver.grpc.model_repository_pb2_grpc import ModelRepositoryServiceStub
from mlserver.grpc import dataplane_pb2 as pb
from mlserver.grpc import model_repository_pb2 as mr_pb
from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry


@pytest.fixture
Expand Down Expand Up @@ -83,7 +85,10 @@ async def test_model_repository_unload(


async def test_model_repository_load(
inference_service_stub, model_repository_service_stub, sum_model_settings
inference_service_stub,
model_repository_service_stub,
sum_model_settings,
prometheus_registry: CollectorRegistry,
):
await model_repository_service_stub.RepositoryModelUnload(
mr_pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
Expand All @@ -100,8 +105,12 @@ async def test_model_repository_load(


async def test_model_repository_load_error(
inference_service_stub, model_repository_service_stub, sum_model_settings
prometheus_registry: CollectorRegistry,
inference_service_stub,
model_repository_service_stub,
sum_model_settings,
):

with pytest.raises(grpc.RpcError) as err:
load_request = mr_pb.RepositoryModelLoadRequest(model_name="my-model")
await model_repository_service_stub.RepositoryModelLoad(load_request)
Expand Down
63 changes: 49 additions & 14 deletions tests/grpc/test_servicers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import pytest
import grpc
from .conftest import delete_registry
from ..metrics.conftest import prometheus_registry
from prometheus_client.registry import CollectorRegistry


from mlserver.cloudevents import (
CLOUDEVENTS_HEADER_SPECVERSION_DEFAULT,
Expand All @@ -15,28 +19,36 @@
from mlserver import __version__


async def test_server_live(inference_service_stub):
async def test_server_live(
prometheus_registry: CollectorRegistry, inference_service_stub
):
req = pb.ServerLiveRequest()
response = await inference_service_stub.ServerLive(req)

assert response.live


async def test_server_ready(inference_service_stub):
async def test_server_ready(
prometheus_registry: CollectorRegistry, inference_service_stub
):
req = pb.ServerReadyRequest()
response = await inference_service_stub.ServerReady(req)

assert response.ready


async def test_model_ready(inference_service_stub, sum_model):
async def test_model_ready(
prometheus_registry: CollectorRegistry, inference_service_stub, sum_model
):
req = pb.ModelReadyRequest(name=sum_model.name, version=sum_model.version)
response = await inference_service_stub.ModelReady(req)

assert response.ready


async def test_server_metadata(inference_service_stub):
async def test_server_metadata(
prometheus_registry: CollectorRegistry, inference_service_stub
):
req = pb.ServerMetadataRequest()
response = await inference_service_stub.ServerMetadata(req)

Expand All @@ -45,7 +57,9 @@ async def test_server_metadata(inference_service_stub):
assert response.extensions == []


async def test_model_metadata(inference_service_stub, sum_model_settings):
async def test_model_metadata(
prometheus_registry: CollectorRegistry, inference_service_stub, sum_model_settings
):
req = pb.ModelMetadataRequest(
name=sum_model_settings.name, version=sum_model_settings.parameters.version
)
Expand All @@ -60,7 +74,11 @@ async def test_model_metadata(inference_service_stub, sum_model_settings):
"model_name,model_version", [("sum-model", "v1.2.3"), ("sum-model", None)]
)
async def test_model_infer(
inference_service_stub, model_infer_request, model_name, model_version
prometheus_registry: CollectorRegistry,
inference_service_stub,
model_infer_request,
model_name,
model_version,
):
model_infer_request.model_name = model_name
if model_version is not None:
Expand All @@ -76,7 +94,9 @@ async def test_model_infer(
assert prediction.outputs[0].contents == expected


async def test_model_infer_raw_contents(inference_service_stub, model_infer_request):
async def test_model_infer_raw_contents(
prometheus_registry: CollectorRegistry, inference_service_stub, model_infer_request
):
# Prepare request with raw contents
for input_tensor in model_infer_request.inputs:
request_input = InferInputTensorConverter.to_types(input_tensor)
Expand Down Expand Up @@ -104,7 +124,10 @@ async def test_model_infer_raw_contents(inference_service_stub, model_infer_requ


async def test_model_infer_headers(
inference_service_stub, model_infer_request, sum_model_settings
prometheus_registry: CollectorRegistry,
inference_service_stub,
model_infer_request,
sum_model_settings,
):
model_infer_request.model_name = sum_model_settings.name
model_infer_request.ClearField("model_version")
Expand All @@ -126,7 +149,9 @@ async def test_model_infer_headers(
assert trailing_metadata[key] == value


async def test_model_infer_error(inference_service_stub, model_infer_request):
async def test_model_infer_error(
prometheus_registry: CollectorRegistry, inference_service_stub, model_infer_request
):
with pytest.raises(grpc.RpcError) as err:
model_infer_request.model_name = "my-model"
await inference_service_stub.ModelInfer(model_infer_request)
Expand All @@ -136,14 +161,18 @@ async def test_model_infer_error(inference_service_stub, model_infer_request):


async def test_model_repository_index(
inference_service_stub, grpc_repository_index_request
prometheus_registry: CollectorRegistry,
inference_service_stub,
grpc_repository_index_request,
):
index = await inference_service_stub.RepositoryIndex(grpc_repository_index_request)

assert len(index.models) == 1


async def test_model_repository_unload(inference_service_stub, sum_model_settings):
async def test_model_repository_unload(
prometheus_registry: CollectorRegistry, inference_service_stub, sum_model_settings
):
unload_request = pb.RepositoryModelUnloadRequest(model_name=sum_model_settings.name)
await inference_service_stub.RepositoryModelUnload(unload_request)

Expand All @@ -153,11 +182,15 @@ async def test_model_repository_unload(inference_service_stub, sum_model_setting
)


async def test_model_repository_load(inference_service_stub, sum_model_settings):
async def test_model_repository_load(
prometheus_registry: CollectorRegistry,
inference_service_stub,
delete_registry: CollectorRegistry,
sum_model_settings,
):
await inference_service_stub.RepositoryModelUnload(
pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
)

load_request = pb.RepositoryModelLoadRequest(model_name=sum_model_settings.name)
await inference_service_stub.RepositoryModelLoad(load_request)

Expand All @@ -168,7 +201,9 @@ async def test_model_repository_load(inference_service_stub, sum_model_settings)
assert response.name == sum_model_settings.name


async def test_model_repository_load_error(inference_service_stub, sum_model_settings):
async def test_model_repository_load_error(
prometheus_registry: CollectorRegistry, inference_service_stub, sum_model_settings
):
with pytest.raises(grpc.RpcError) as err:
load_request = pb.RepositoryModelLoadRequest(model_name="my-model")
await inference_service_stub.RepositoryModelLoad(load_request)
Expand Down
Loading