Skip to content

ref(transport): Add shared sync/async transport superclass and create a sync http subclass #4572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: potel-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 89 additions & 60 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,8 @@ def _update_rate_limits(
seconds=retry_after
)

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType = EndpointType.ENVELOPE,
envelope: Optional[Envelope] = None,
def _handle_request_error(
self: Self, envelope: Optional[Envelope], loss_reason: str = "network"
) -> None:
def record_loss(reason: str) -> None:
if envelope is None:
Expand All @@ -306,45 +302,45 @@ def record_loss(reason: str) -> None:
for item in envelope.items:
self.record_lost_event(reason, item=item)

self.on_dropped_event(loss_reason)
record_loss("network_error")

def _handle_response(
self: Self,
response: Union[urllib3.BaseHTTPResponse, httpcore.Response],
envelope: Optional[Envelope],
) -> None:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self._handle_request_error(
envelope=envelope, loss_reason="status_{}".format(response.status)
)

def _update_headers(
self: Self,
headers: Dict[str, str],
) -> None:

headers.update(
{
"User-Agent": str(self._auth.client),
"X-Sentry-Auth": str(self._auth.to_header()),
}
)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self.on_dropped_event("network")
record_loss("network_error")
raise

try:
self._update_rate_limits(response)

if response.status == 429:
# if we hit a 429. Something was rate limited but we already
# acted on this in `self._update_rate_limits`. Note that we
# do not want to record event loss here as we will have recorded
# an outcome in relay already.
self.on_dropped_event("status_429")
pass

elif response.status >= 300 or response.status < 200:
logger.error(
"Unexpected status code: %s (body: %s)",
response.status,
getattr(response, "data", getattr(response, "content", None)),
)
self.on_dropped_event("status_{}".format(response.status))
record_loss("network_error")
finally:
response.close()

def on_dropped_event(self: Self, _reason: str) -> None:
return None
Expand Down Expand Up @@ -381,11 +377,6 @@ def _fetch_pending_client_report(
type="client_report",
)

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def _check_disabled(self: Self, category: EventDataCategory) -> bool:
def _disabled(bucket: Optional[EventDataCategory]) -> bool:
ts = self._disabled_until.get(bucket)
Expand All @@ -404,9 +395,9 @@ def _is_worker_full(self: Self) -> bool:
def is_healthy(self: Self) -> bool:
return not (self._is_worker_full() or self._is_rate_limited())

def _send_envelope(self: Self, envelope: Envelope) -> None:

# remove all items from the envelope which are over quota
def _prepare_envelope(
self: Self, envelope: Envelope
) -> Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]:
new_items = []
for item in envelope.items:
if self._check_disabled(item.data_category):
Expand Down Expand Up @@ -448,13 +439,7 @@ def _send_envelope(self: Self, envelope: Envelope) -> None:
if content_encoding:
headers["Content-Encoding"] = content_encoding

self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None
return envelope, body, headers

def _serialize_envelope(
self: Self, envelope: Envelope
Expand Down Expand Up @@ -512,6 +497,54 @@ def _request(
) -> Union[urllib3.BaseHTTPResponse, httpcore.Response]:
raise NotImplementedError()

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class BaseSyncHttpTransport(BaseHttpTransport):

def _send_envelope(self: Self, envelope: Envelope) -> None:
_prepared_envelope = self._prepare_envelope(envelope)
if _prepared_envelope is None:
return None
envelope, body, headers = _prepared_envelope
self._send_request(
body.getvalue(),
headers=headers,
endpoint_type=EndpointType.ENVELOPE,
envelope=envelope,
)
return None

def _send_request(
self: Self,
body: bytes,
headers: Dict[str, str],
endpoint_type: EndpointType,
envelope: Optional[Envelope],
) -> None:
self._update_headers(headers)
try:
response = self._request(
"POST",
endpoint_type,
body,
headers,
)
except Exception:
self._handle_request_error(envelope=envelope, loss_reason="network")
raise
try:
self._handle_response(response=response, envelope=envelope)
finally:
response.close()

def _flush_client_reports(self: Self, force: bool = False) -> None:
client_report = self._fetch_pending_client_report(force=force, interval=60)
if client_report is not None:
self.capture_envelope(Envelope(items=[client_report]))

def capture_envelope(self: Self, envelope: Envelope) -> None:
def send_envelope_wrapper() -> None:
with capture_internal_exceptions():
Expand All @@ -534,12 +567,8 @@ def flush(
self._worker.submit(lambda: self._flush_client_reports(force=True))
self._worker.flush(timeout, callback)

def kill(self: Self) -> None:
logger.debug("Killing HTTP transport")
self._worker.kill()


class HttpTransport(BaseHttpTransport):
class HttpTransport(BaseSyncHttpTransport):
if TYPE_CHECKING:
_pool: Union[PoolManager, ProxyManager]

Expand Down Expand Up @@ -656,7 +685,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:

else:

class Http2Transport(BaseHttpTransport): # type: ignore
class Http2Transport(BaseSyncHttpTransport): # type: ignore
"""The HTTP2 transport based on httpcore."""

TIMEOUT = 15
Expand Down
35 changes: 35 additions & 0 deletions tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,3 +695,38 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_
"reason": "test",
"quantity": span_count + 1,
} in discarded_events


def test_handle_unexpected_status_invokes_handle_request_error(
make_client, monkeypatch
):
client = make_client()
transport = client.transport

monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True)

def stub_request(method, endpoint, body=None, headers=None):
class MockResponse:
def __init__(self):
self.status = 500 # Integer
self.data = b"server error"
self.headers = {}

def close(self):
pass

return MockResponse()

monkeypatch.setattr(transport, "_request", stub_request)

seen = []
monkeypatch.setattr(
transport,
"_handle_request_error",
lambda envelope, loss_reason: seen.append(loss_reason),
)

client.capture_event({"message": "test"})
client.flush()

assert seen == ["status_500"]
Loading