Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ For example, with msgspec_:

Complete table of extras below:

+-------------------------------------+----------------------------------+
| example | description |
+=====================================+==================================+
| ``pip install "asyncly[msgspec]"`` | For using msgspec_ structs |
+-------------------------------------+----------------------------------+
| ``pip install "asyncly[orjson]"`` | For fast parsing json by orjson_ |
+-------------------------------------+----------------------------------+
| ``pip install "asyncly[pydantic]"`` | For using pydantic_ models |
+-------------------------------------+----------------------------------+
+------------------------------------------+-----------------------------------+
| example | description |
+==========================================+===================================+
| ``pip install "asyncly[msgspec]"`` | For using msgspec_ structs |
+------------------------------------------+-----------------------------------+
| ``pip install "asyncly[orjson]"`` | For fast parsing json by orjson_ |
+------------------------------------------+-----------------------------------+
| ``pip install "asyncly[pydantic]"`` | For using pydantic_ models |
+------------------------------------------+-----------------------------------+
| ``pip install "asyncly[prometheus]"`` | To collect Prometheus_ metrics |
+------------------------------------------+-----------------------------------+
| ``pip install "asyncly[opentelemetry]"`` | To collect OpenTelemetry_ metrics |
+------------------------------------------+-----------------------------------+

Quick start guide
-----------------
Expand Down Expand Up @@ -179,6 +183,8 @@ Useful responses and serializers
.. _msgspec: https://github.com/jcrist/msgspec
.. _orjson: https://github.com/ijl/orjson
.. _pydantic: https://github.com/pydantic/pydantic
.. _Prometheus: https://prometheus.io
.. _OpenTelemetry: https://opentelemetry.io

.. _examples/catfact_client.py: https://github.com/andy-takker/asyncly/blob/master/examples/catfact_client.py
.. _examples/test_catfact_client.py: https://github.com/andy-takker/asyncly/blob/master/examples/test_catfact_client.py
Expand Down
11 changes: 2 additions & 9 deletions asyncly/client/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import sys
from typing import Any, Literal
from typing import Any

from aiohttp import ClientSession
from aiohttp.client import DEFAULT_TIMEOUT
Expand All @@ -10,13 +9,7 @@
apply_handler,
)
from asyncly.client.timeout import TimeoutType, get_timeout

if sys.version_info >= (3, 11):
from http import HTTPMethod

MethodType = HTTPMethod | Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"]
else:
MethodType = Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"]
from asyncly.client.typing import MethodType


class BaseHttpClient:
Expand Down
6 changes: 2 additions & 4 deletions asyncly/client/handlers/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from collections.abc import Callable, Mapping
from http import HTTPStatus
from collections.abc import Callable
from typing import Any

from aiohttp import ClientResponse

from asyncly.client.handlers.exceptions import UnhandledStatusException

ResponseHandlersType = Mapping[HTTPStatus | int | str, Callable]
from asyncly.client.typing import ResponseHandlersType


async def apply_handler(
Expand Down
Empty file.
148 changes: 148 additions & 0 deletions asyncly/client/metrics/instrumentable_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from http import HTTPStatus
from time import perf_counter
from types import TracebackType
from typing import Any

from aiohttp import ClientResponse, ClientSession
from aiohttp.client import DEFAULT_TIMEOUT
from yarl import URL

from asyncly.client.base import BaseHttpClient, MethodType
from asyncly.client.metrics.route_resolver import default_route_resolver
from asyncly.client.metrics.sinks.base import MetricsSink
from asyncly.client.metrics.sinks.noop import NoopSink
from asyncly.client.timeout import TimeoutType
from asyncly.client.typing import ResponseHandler, ResponseHandlersType, RouteResolver


class InstrumentableHttpClient(BaseHttpClient):
__slots__ = ("_metrics_sink", "_resolve_route") + BaseHttpClient.__slots__

def __init__(
self,
url: URL | str,
session: ClientSession,
client_name: str,
) -> None:
super().__init__(url=url, session=session, client_name=client_name)
self._metrics_sink: MetricsSink = NoopSink()
self._resolve_route: RouteResolver = default_route_resolver

def enable_metrics(
self, sink: MetricsSink, *, route_resolver: RouteResolver | None = None
) -> None:
self._metrics_sink = sink
if route_resolver is not None:
self._resolve_route = route_resolver

def disable_metrics(self) -> None:
self._metrics_sink = NoopSink()
self._resolve_route = default_route_resolver

def instrument( # type: ignore[no-untyped-def]
self, sink: MetricsSink, *, route_resolver: RouteResolver | None = None
):
client = self

class _Ctx:
def __enter__(self) -> "InstrumentableHttpClient":
self._prev_sink = client._metrics_sink
self._prev_resolver = client._resolve_route
client.enable_metrics(sink, route_resolver=route_resolver)
return client

def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
client._metrics_sink = self._prev_sink
client._resolve_route = self._prev_resolver

return _Ctx()

async def _make_req(
self,
/,
method: MethodType,
url: URL,
handlers: ResponseHandlersType,
timeout: TimeoutType = DEFAULT_TIMEOUT,
**kwargs: Any,
) -> Any:
# Быстрый путь: метрики Noop → почти нулевая накладная
sink = self._metrics_sink
if isinstance(sink, NoopSink):
return await super()._make_req(
method=method,
url=url,
handlers=handlers,
timeout=timeout,
**kwargs,
)

route_label = self._resolve_route(url)
start = perf_counter()
chosen_status: dict[str, int | HTTPStatus | str | None] = {"value": None}

# Заворачиваем хэндлеры, чтобы знать какой статус сработал
wrapped_handlers = _wrap_handlers_with_status_mark(handlers, chosen_status)

error_type: str | None = None
status_for_metrics: int | str = "unknown"
try:
result = await super()._make_req(
method=method, url=url, handlers=wrapped_handlers, timeout=timeout
)
v = chosen_status["value"]
if isinstance(v, HTTPStatus):
status_for_metrics = int(v)
elif isinstance(v, int):
status_for_metrics = v
else:
status_for_metrics = "ok"
return result
except Exception as e:
status = (
chosen_status["value"]
or getattr(e, "status", None)
or getattr(e, "status_code", None)
)
status_for_metrics = int(status) if isinstance(status, int) else "exception"
error_type = type(e).__name__
raise
finally:
duration = perf_counter() - start
sink.observe_request(
client=self._client_name,
method=method,
route=route_label,
status=status_for_metrics,
duration_seconds=duration,
error_type=error_type,
)


def _wrap_handlers_with_status_mark(
handlers: ResponseHandlersType,
chosen_status: dict[str, int | HTTPStatus | str | None],
) -> ResponseHandlersType:
try:
wrapped: dict[int | HTTPStatus | str, ResponseHandler] = {}
for k, handler in handlers.items():
wrapped[k] = _wrap_one(handler, chosen_status)
return wrapped
except AttributeError:
return handlers


def _wrap_one(
handler: ResponseHandler,
chosen_status: dict[str, int | HTTPStatus | str | None],
) -> ResponseHandler:
async def _wrapped(response: ClientResponse) -> Any:
chosen_status["value"] = response.status
return await handler(response)

return _wrapped
13 changes: 13 additions & 0 deletions asyncly/client/metrics/route_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from yarl import URL


def default_route_resolver(url: URL) -> str:
parts: list[str] = []
for p in url.path.split("/"):
if not p:
continue
if p.isdigit() or (len(p) in (8, 16, 32, 36) and any(ch.isalpha() for ch in p)):
parts.append(":id")
else:
parts.append(p)
return "/" + "/".join(parts) if parts else "/"
Empty file.
14 changes: 14 additions & 0 deletions asyncly/client/metrics/sinks/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from typing import Protocol


class MetricsSink(Protocol):
def observe_request(
self,
*,
client: str,
method: str,
route: str,
status: int | str,
duration_seconds: float,
error_type: str | None = None,
) -> None: ...
14 changes: 14 additions & 0 deletions asyncly/client/metrics/sinks/noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class NoopSink:
"""Синк по умолчанию: ничего не делает."""

def observe_request(
self,
*,
client: str,
method: str,
route: str,
status: int | str,
duration_seconds: float,
error_type: str | None = None,
) -> None:
return
52 changes: 52 additions & 0 deletions asyncly/client/metrics/sinks/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from opentelemetry.metrics import Meter


class OpenTelemetrySink:
def __init__(self, meter: Meter) -> None:
# Counter: количество
self._req_counter = meter.create_counter(
name="http_client_requests_total",
unit="1",
description="Total HTTP client requests",
)
# Histogram: длительность
self._req_hist = meter.create_histogram(
name="http_client_request_seconds",
unit="s",
description="HTTP client request duration including handler",
)
# Counter: ошибки
self._err_counter = meter.create_counter(
name="http_client_errors_total",
unit="1",
description="Total HTTP client errors",
)

def observe_request(
self,
*,
client: str,
method: str,
route: str,
status: int | str,
duration_seconds: float,
error_type: str | None = None,
) -> None:
attrs = {
"client": client,
"method": method,
"route": route,
"status": str(status),
}
self._req_counter.add(1, attributes=attrs)
self._req_hist.record(duration_seconds, attributes=attrs)
if error_type:
self._err_counter.add(
1,
attributes={
"client": client,
"method": method,
"route": route,
"error_type": error_type,
},
)
64 changes: 64 additions & 0 deletions asyncly/client/metrics/sinks/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from collections.abc import Iterable

from prometheus_client import Counter, Histogram
from prometheus_client.registry import REGISTRY, CollectorRegistry


class PrometheusSink:
def __init__(
self,
namespace: str = "asyncly",
subsystem: str = "client",
buckets: Iterable[float] = (
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
10.0,
),
registry: CollectorRegistry = REGISTRY,
) -> None:
metric_prefix = f"{namespace}_{subsystem}"
self._latency = Histogram(
f"{metric_prefix}_request_seconds",
"HTTP client request duration including handler",
("client", "method", "route", "status"),
buckets=tuple(buckets),
registry=registry,
)
self._total = Counter(
f"{metric_prefix}_requests_total",
"Total HTTP client requests",
("client", "method", "route", "status"),
registry=registry,
)
self._errors = Counter(
f"{metric_prefix}_errors_total",
"Total HTTP client errors",
("client", "method", "route", "error_type"),
registry=registry,
)

def observe_request(
self,
*,
client: str,
method: str,
route: str,
status: int | str,
duration_seconds: float,
error_type: str | None = None,
) -> None:
status_label = str(status)
self._total.labels(client, method, route, status_label).inc()
self._latency.labels(client, method, route, status_label).observe(
duration_seconds
)
if error_type:
self._errors.labels(client, method, route, error_type).inc()
2 changes: 1 addition & 1 deletion asyncly/client/timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from aiohttp import ClientTimeout

TimeoutType = ClientTimeout | timedelta | int | float
from asyncly.client.typing import TimeoutType


@singledispatch
Expand Down
Loading