Skip to content

Commit 515df75

Browse files
authored
Asgi middleware (elastic#1528)
* initial version of the ASGI middleware exception tracking is still missing, as I haven't figured out yet why the exception doesn't bubble up to our middleware * added docs * mark the ASGI middleware as experimental and link to GH issue
1 parent b5e44db commit 515df75

File tree

10 files changed

+530
-0
lines changed

10 files changed

+530
-0
lines changed

docs/asgi-middleware.asciidoc

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
[[asgi-middleware]]
2+
=== ASGI Middleware
3+
4+
experimental::[]
5+
6+
Incorporating Elastic APM into your ASGI-based project only requires a few easy
7+
steps.
8+
9+
NOTE: Several ASGI frameworks are supported natively.
10+
Please check <<supported-technologies,Supported Technologies>> for more information
11+
12+
[float]
13+
[[asgi-installation]]
14+
==== Installation
15+
16+
Install the Elastic APM agent using pip:
17+
18+
[source,bash]
19+
----
20+
$ pip install elastic-apm
21+
----
22+
23+
or add `elastic-apm` to your project's `requirements.txt` file.
24+
25+
26+
[float]
27+
[[asgi-setup]]
28+
==== Setup
29+
30+
To set up the agent, you need to initialize it with appropriate settings.
31+
32+
The settings are configured either via environment variables, or as
33+
initialization arguments.
34+
35+
You can find a list of all available settings in the
36+
<<configuration, Configuration>> page.
37+
38+
To set up the APM agent, wrap your ASGI app with the `ASGITracingMiddleware`:
39+
40+
[source,python]
41+
----
42+
from elasticapm.contrib.asgi import ASGITracingMiddleware
43+
44+
app = MyGenericASGIApp() # depending on framework
45+
46+
app = ASGITracingMiddleware(app)
47+
48+
----
49+
50+
Make sure to call <<api-set-transaction-name, `elasticapm.set_transaction_name()`>> with an appropriate transaction name in all your routes.
51+
52+
NOTE: Currently, the agent doesn't support automatic capturing of exceptions.
53+
You can follow progress on this issue on https://github.com/elastic/apm-agent-python/issues/1548[Github].
54+
55+
[float]
56+
[[supported-python-versions]]
57+
==== Supported Python versions
58+
59+
A list of supported <<supported-python,Python>> versions can be found on our <<supported-technologies,Supported Technologies>> page.
60+
61+
NOTE: Elastic APM only supports `asyncio` when using Python 3.7+

elasticapm/contrib/asgi.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2022, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
import functools
32+
import urllib.parse
33+
from typing import TYPE_CHECKING, Optional, Tuple, Union
34+
35+
if TYPE_CHECKING:
36+
from asgiref.typing import ASGIApplication, ASGIReceiveCallable, ASGISendCallable, Scope, ASGISendEvent
37+
38+
import elasticapm
39+
from elasticapm import Client, get_client, instrument
40+
from elasticapm.conf import constants
41+
from elasticapm.contrib.asyncio.traces import set_context
42+
from elasticapm.utils import default_ports, encoding
43+
from elasticapm.utils.disttracing import TraceParent
44+
45+
46+
def wrap_send(send, middleware):
47+
@functools.wraps(send)
48+
async def wrapped_send(message):
49+
if message.get("type") == "http.response.start":
50+
await set_context(lambda: middleware.get_data_from_response(message, constants.TRANSACTION), "response")
51+
result = "HTTP {}xx".format(message["status"] // 100)
52+
elasticapm.set_transaction_result(result, override=False)
53+
await send(message)
54+
55+
return wrapped_send
56+
57+
58+
class ASGITracingMiddleware:
59+
__slots__ = ("_app", "client")
60+
61+
def __init__(self, app: "ASGIApplication", **options) -> None:
62+
self._app = app
63+
client = get_client()
64+
if not client:
65+
client = Client(**options)
66+
self.client = client
67+
if self.client.config.instrument and self.client.config.enabled:
68+
instrument()
69+
70+
async def __call__(self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable") -> None:
71+
if scope["type"] != "http":
72+
await self._app(scope, receive, send)
73+
return
74+
send = wrap_send(send, self)
75+
wrapped_receive = receive
76+
url, url_dict = self.get_url(scope)
77+
body = None
78+
if not self.client.should_ignore_url(url):
79+
self.client.begin_transaction(
80+
transaction_type="request", trace_parent=TraceParent.from_headers(scope["headers"])
81+
)
82+
self.set_transaction_name(scope["method"], url)
83+
if scope["method"] in constants.HTTP_WITH_BODY and self.client.config.capture_body != "off":
84+
messages = []
85+
more_body = True
86+
while more_body:
87+
message = await receive()
88+
messages.append(message)
89+
more_body = message.get("more_body", False)
90+
91+
body_raw = b"".join([message.get("body", b"") for message in messages])
92+
body = str(body_raw, errors="ignore")
93+
94+
# Dispatch to the ASGI callable
95+
async def wrapped_receive():
96+
if messages:
97+
return messages.pop(0)
98+
99+
# Once that's done we can just await any other messages.
100+
return await receive()
101+
102+
await set_context(lambda: self.get_data_from_request(scope, constants.TRANSACTION, body), "request")
103+
104+
try:
105+
await self._app(scope, wrapped_receive, send)
106+
elasticapm.set_transaction_outcome(constants.OUTCOME.SUCCESS, override=False)
107+
return
108+
except Exception as exc:
109+
self.client.capture_exception()
110+
elasticapm.set_transaction_result("HTTP 5xx", override=False)
111+
elasticapm.set_transaction_outcome(constants.OUTCOME.FAILURE, override=True)
112+
elasticapm.set_context({"status_code": 500}, "response")
113+
raise exc from None
114+
finally:
115+
self.client.end_transaction()
116+
117+
def get_headers(self, scope_or_message: Union["Scope", "ASGISendEvent"]) -> dict[str, str]:
118+
headers = {}
119+
for k, v in scope_or_message.get("headers", {}):
120+
key = k.decode("latin1")
121+
val = v.decode("latin1")
122+
if key in headers:
123+
headers[key] = f"{headers[key]}, {val}"
124+
else:
125+
headers[key] = val
126+
return headers
127+
128+
def get_url(self, scope: "Scope", host: Optional[str] = None) -> Tuple[str, dict[str, str]]:
129+
url_dict = {}
130+
scheme = scope.get("scheme", "http")
131+
server = scope.get("server", None)
132+
path = scope.get("root_path", "") + scope.get("path", "")
133+
134+
url_dict["protocol"] = scheme + ":"
135+
136+
if host:
137+
url = f"{scheme}://{host}{path}"
138+
url_dict["hostname"] = host
139+
elif server is not None:
140+
host, port = server
141+
url_dict["hostname"] = host
142+
if port:
143+
url_dict["port"] = port
144+
default_port = default_ports.get(scheme, None)
145+
if port != default_port:
146+
url = f"{scheme}://{host}:{port}{path}"
147+
else:
148+
url = f"{scheme}://{host}{path}"
149+
else:
150+
url = path
151+
qs = scope.get("query_string")
152+
if qs:
153+
query = "?" + urllib.parse.unquote(qs.decode("latin-1"))
154+
url += query
155+
url_dict["search"] = encoding.keyword_field(query)
156+
url_dict["full"] = encoding.keyword_field(url)
157+
return url, url_dict
158+
159+
def get_ip(self, scope: "Scope", headers: dict) -> Optional[str]:
160+
x_forwarded_for = headers.get("x-forwarded-for")
161+
remote_addr = headers.get("remote-addr")
162+
ip: Optional[str] = None
163+
if x_forwarded_for:
164+
ip = x_forwarded_for.split(",")[0]
165+
elif remote_addr:
166+
ip = remote_addr
167+
elif scope.get("client"):
168+
ip = scope.get("client")[0]
169+
return ip
170+
171+
async def get_data_from_request(self, scope: "Scope", event_type: str, body: Optional[str]) -> dict:
172+
"""Loads data from incoming request for APM capturing.
173+
174+
Args:
175+
request (Request)
176+
config (Config)
177+
event_type (str)
178+
body (str)
179+
180+
Returns:
181+
dict
182+
"""
183+
headers = self.get_headers(scope)
184+
result = {
185+
"method": scope["method"],
186+
"socket": {"remote_address": self.get_ip(scope, headers)},
187+
"cookies": headers.pop("cookies", {}),
188+
}
189+
if self.client.config.capture_headers:
190+
result["headers"] = headers
191+
if body and self.client.config.capture_body in ("all", event_type):
192+
result["body"] = body
193+
url, url_dict = self.get_url(scope)
194+
result["url"] = url_dict
195+
196+
return result
197+
198+
async def get_data_from_response(self, message: dict, event_type: str) -> dict:
199+
"""Loads data from response for APM capturing.
200+
201+
Args:
202+
message (dict)
203+
config (Config)
204+
event_type (str)
205+
206+
Returns:
207+
dict
208+
"""
209+
result = {}
210+
211+
if "status" in message:
212+
result["status_code"] = message["status"]
213+
214+
if self.client.config.capture_headers and "headers" in message:
215+
headers = self.get_headers(message)
216+
if headers:
217+
result["headers"] = headers
218+
219+
return result
220+
221+
def set_transaction_name(self, method: str, url: str):
222+
"""
223+
Default implementation sets transaction name to "METHOD unknown route".
224+
Subclasses may add framework specific naming.
225+
"""
226+
elasticapm.set_transaction_name(f"{method.upper()} unknown route")

elasticapm/utils/disttracing.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ def merge_duplicate_headers(cls, headers, key):
188188
# this works for all known WSGI implementations
189189
if isinstance(headers, list):
190190
return ",".join([item[1] for item in headers if item[0] == key])
191+
elif not hasattr(headers, "get") and hasattr(headers, "__iter__"):
192+
return ",".join([item[1] for item in headers if item[0] == key])
191193
return headers.get(key)
192194

193195
def _parse_tracestate(self, tracestate) -> Dict[str, str]:

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ markers =
160160
httpx
161161
prometheus_client
162162
sanic
163+
asgi
163164
jinja2
164165
aiobotocore
165166
kafka

tests/contrib/asgi/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2022, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

tests/contrib/asgi/app.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2022, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
import asyncio
32+
33+
from quart import Quart, Response, jsonify
34+
35+
from elasticapm import async_capture_span
36+
37+
app = Quart(__name__)
38+
39+
40+
@app.route("/", methods=["GET", "POST"])
41+
async def root():
42+
async with async_capture_span("sleep"):
43+
await asyncio.sleep(0.001)
44+
return "OK"
45+
46+
47+
@app.route("/foo")
48+
async def foo():
49+
resp = Response("foo")
50+
resp.headers["foo"] = "bar"
51+
return resp
52+
53+
54+
@app.route("/boom")
55+
async def boom():
56+
assert False
57+
58+
59+
@app.route("/body")
60+
async def json():
61+
return jsonify({"hello": "world"})

0 commit comments

Comments
 (0)