Skip to content

Commit

Permalink
Add request logging (#71)
Browse files Browse the repository at this point in the history
* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* Fix

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* Fix

* Handle 402 responses from hub

* Fix

* Fix

* Fix

* Add exclude_callback
  • Loading branch information
itssimon authored Dec 2, 2024
1 parent d8aacbc commit 3086202
Show file tree
Hide file tree
Showing 22 changed files with 1,468 additions and 212 deletions.
69 changes: 54 additions & 15 deletions apitally/client/client_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import logging
import random
import time
from contextlib import suppress
from functools import partial
from typing import Any, Dict, Optional, Tuple
from typing import Any, AsyncIterator, Dict, Optional, Tuple
from uuid import UUID

import backoff
import httpx

from apitally.client.client_base import MAX_QUEUE_TIME, REQUEST_TIMEOUT, ApitallyClientBase
from apitally.client.logging import get_logger
from apitally.client.request_logging import RequestLoggingConfig


logger = get_logger(__name__)
Expand All @@ -26,8 +29,8 @@


class ApitallyClient(ApitallyClientBase):
def __init__(self, client_id: str, env: str) -> None:
super().__init__(client_id=client_id, env=env)
def __init__(self, client_id: str, env: str, request_logging_config: Optional[RequestLoggingConfig] = None) -> None:
super().__init__(client_id=client_id, env=env, request_logging_config=request_logging_config)
self._stop_sync_loop = False
self._sync_loop_task: Optional[asyncio.Task] = None
self._sync_data_queue: asyncio.Queue[Tuple[float, Dict[str, Any]]] = asyncio.Queue()
Expand All @@ -41,20 +44,27 @@ def start_sync_loop(self) -> None:
self._sync_loop_task = asyncio.create_task(self._run_sync_loop())

async def _run_sync_loop(self) -> None:
first_iteration = True
last_sync_time = 0.0
while not self._stop_sync_loop:
try:
time_start = time.perf_counter()
async with self.get_http_client() as client:
tasks = [self.send_sync_data(client)]
if not self._startup_data_sent and not first_iteration:
tasks.append(self.send_startup_data(client))
await asyncio.gather(*tasks)
time_elapsed = time.perf_counter() - time_start
await asyncio.sleep(self.sync_interval - time_elapsed)
self.request_logger.write_to_file()
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")
first_iteration = False
logger.exception("An error occurred while writing request logs")

now = time.time()
if (now - last_sync_time) >= self.sync_interval:
try:
async with self.get_http_client() as client:
tasks = [self.send_sync_data(client), self.send_log_data(client)]
if not self._startup_data_sent and last_sync_time > 0: # not on first sync
tasks.append(self.send_startup_data(client))
await asyncio.gather(*tasks)
last_sync_time = now
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")

self.request_logger.maintain()
await asyncio.sleep(1)

def stop_sync_loop(self) -> None:
self._stop_sync_loop = True
Expand All @@ -65,6 +75,7 @@ async def handle_shutdown(self) -> None:
# Send any remaining data before exiting
async with self.get_http_client() as client:
await self.send_sync_data(client)
await self.send_log_data(client)

def set_startup_data(self, data: Dict[str, Any]) -> None:
self._startup_data_sent = False
Expand Down Expand Up @@ -99,10 +110,27 @@ async def send_sync_data(self, client: httpx.AsyncClient) -> None:
finally:
self._sync_data_queue.task_done()

async def send_log_data(self, client: httpx.AsyncClient) -> None:
self.request_logger.rotate_file()
i = 0
while log_file := self.request_logger.get_file():
if i > 0:
time.sleep(random.uniform(0.1, 0.3))
try:
stream = log_file.stream_lines_compressed()
await self._send_log_data(client, log_file.uuid, stream)
log_file.delete()
except httpx.HTTPError:
self.request_logger.retry_file_later(log_file)
break
i += 1
if i >= 10:
break

@retry(raise_on_giveup=False)
async def _send_startup_data(self, client: httpx.AsyncClient, data: Dict[str, Any]) -> None:
logger.debug("Sending startup data to Apitally hub")
response = await client.post(url="/startup", json=data, timeout=REQUEST_TIMEOUT)
response = await client.post(url="/startup", json=data)
self._handle_hub_response(response)
self._startup_data_sent = True
self._startup_data = None
Expand All @@ -113,6 +141,17 @@ async def _send_sync_data(self, client: httpx.AsyncClient, data: Dict[str, Any])
response = await client.post(url="/sync", json=data)
self._handle_hub_response(response)

async def _send_log_data(self, client: httpx.AsyncClient, uuid: UUID, stream: AsyncIterator[bytes]) -> None:
logger.debug("Streaming request log data to Apitally hub")
response = await client.post(url=f"{self.hub_url}/log?uuid={uuid}", content=stream)
if response.status_code == 402 and "Retry-After" in response.headers:
with suppress(ValueError):
retry_after = int(response.headers["Retry-After"])
self.request_logger.suspend_until = time.time() + retry_after
self.request_logger.clear()
return
self._handle_hub_response(response)

def _handle_hub_response(self, response: httpx.Response) -> None:
if response.status_code == 404:
self.stop_sync_loop()
Expand Down
4 changes: 3 additions & 1 deletion apitally/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from apitally.client.consumers import ConsumerRegistry
from apitally.client.logging import get_logger
from apitally.client.request_logging import RequestLogger, RequestLoggingConfig
from apitally.client.requests import RequestCounter
from apitally.client.server_errors import ServerErrorCounter
from apitally.client.validation_errors import ValidationErrorCounter
Expand Down Expand Up @@ -39,7 +40,7 @@ def __new__(cls: Type[TApitallyClient], *args, **kwargs) -> TApitallyClient:
cls._instance = super().__new__(cls)
return cast(TApitallyClient, cls._instance)

def __init__(self, client_id: str, env: str) -> None:
def __init__(self, client_id: str, env: str, request_logging_config: Optional[RequestLoggingConfig] = None) -> None:
if hasattr(self, "client_id"):
raise RuntimeError("Apitally client is already initialized") # pragma: no cover
try:
Expand All @@ -56,6 +57,7 @@ def __init__(self, client_id: str, env: str) -> None:
self.validation_error_counter = ValidationErrorCounter()
self.server_error_counter = ServerErrorCounter()
self.consumer_registry = ConsumerRegistry()
self.request_logger = RequestLogger(request_logging_config)

self._startup_data: Optional[Dict[str, Any]] = None
self._startup_data_sent = False
Expand Down
59 changes: 50 additions & 9 deletions apitally/client/client_threading.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from __future__ import annotations

import logging
import queue
import random
import time
from contextlib import suppress
from functools import partial
from io import BufferedReader
from queue import Queue
from threading import Event, Thread
from typing import Any, Callable, Dict, Optional, Tuple
from uuid import UUID

import backoff
import requests

from apitally.client.client_base import MAX_QUEUE_TIME, REQUEST_TIMEOUT, ApitallyClientBase
from apitally.client.logging import get_logger
from apitally.client.request_logging import RequestLoggingConfig


logger = get_logger(__name__)
Expand Down Expand Up @@ -43,11 +47,11 @@ def callback():


class ApitallyClient(ApitallyClientBase):
def __init__(self, client_id: str, env: str) -> None:
super().__init__(client_id=client_id, env=env)
def __init__(self, client_id: str, env: str, request_logging_config: Optional[RequestLoggingConfig] = None) -> None:
super().__init__(client_id=client_id, env=env, request_logging_config=request_logging_config)
self._thread: Optional[Thread] = None
self._stop_sync_loop = Event()
self._sync_data_queue: queue.Queue[Tuple[float, Dict[str, Any]]] = queue.Queue()
self._sync_data_queue: Queue[Tuple[float, Dict[str, Any]]] = Queue()

def start_sync_loop(self) -> None:
self._stop_sync_loop.clear()
Expand All @@ -61,20 +65,29 @@ def _run_sync_loop(self) -> None:
last_sync_time = 0.0
while not self._stop_sync_loop.is_set():
try:
now = time.time()
if (now - last_sync_time) >= self.sync_interval:
self.request_logger.write_to_file()
except Exception: # pragma: no cover
logger.exception("An error occurred while writing request logs")

now = time.time()
if (now - last_sync_time) >= self.sync_interval:
try:
with requests.Session() as session:
if not self._startup_data_sent and last_sync_time > 0: # not on first sync
self.send_startup_data(session)
self.send_sync_data(session)
self.send_log_data(session)
last_sync_time = now
time.sleep(1)
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")
except Exception: # pragma: no cover
logger.exception("An error occurred during sync with Apitally hub")

self.request_logger.maintain()
time.sleep(1)
finally:
# Send any remaining data before exiting
with requests.Session() as session:
self.send_sync_data(session)
self.send_log_data(session)

def stop_sync_loop(self) -> None:
self._stop_sync_loop.set()
Expand Down Expand Up @@ -112,6 +125,23 @@ def send_sync_data(self, session: requests.Session) -> None:
finally:
self._sync_data_queue.task_done()

def send_log_data(self, session: requests.Session) -> None:
self.request_logger.rotate_file()
i = 0
while log_file := self.request_logger.get_file():
if i > 0:
time.sleep(random.uniform(0.1, 0.3))
try:
with log_file.open_compressed() as fp:
self._send_log_data(session, log_file.uuid, fp)
log_file.delete()
except requests.RequestException:
self.request_logger.retry_file_later(log_file)
break
i += 1
if i >= 10:
break

@retry(raise_on_giveup=False)
def _send_startup_data(self, session: requests.Session, data: Dict[str, Any]) -> None:
logger.debug("Sending startup data to Apitally hub")
Expand All @@ -126,6 +156,17 @@ def _send_sync_data(self, session: requests.Session, data: Dict[str, Any]) -> No
response = session.post(url=f"{self.hub_url}/sync", json=data, timeout=REQUEST_TIMEOUT)
self._handle_hub_response(response)

def _send_log_data(self, session: requests.Session, uuid: UUID, fp: BufferedReader) -> None:
logger.debug("Streaming request log data to Apitally hub")
response = session.post(url=f"{self.hub_url}/log?uuid={uuid}", data=fp, timeout=REQUEST_TIMEOUT)
if response.status_code == 402 and "Retry-After" in response.headers:
with suppress(ValueError):
retry_after = int(response.headers["Retry-After"])
self.request_logger.suspend_until = time.time() + retry_after
self.request_logger.clear()
return
self._handle_hub_response(response)

def _handle_hub_response(self, response: requests.Response) -> None:
if response.status_code == 404:
self.stop_sync_loop()
Expand Down
Loading

0 comments on commit 3086202

Please sign in to comment.