Skip to content

Commit a898c34

Browse files
sethmlarsonbasepi
andauthored
Add support for AsyncElasticsearch (#843)
* Add support for AsyncElasticsearch * Add to changelog Co-authored-by: Colton Myers <colton.myers@gmail.com>
1 parent fc7d4cb commit a898c34

File tree

6 files changed

+225
-7
lines changed

6 files changed

+225
-7
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ endif::[]
2626
===== Features
2727
2828
* capture number of affected rows for INSERT/UPDATE/DELETE SQL queries {pull}614[#614]
29+
* Added instrumentation for AsyncElasticsearch {pull}843[#843]
2930
3031
3132
[[release-notes-5.x]]

elasticapm/instrumentation/packages/asyncio/elasticsearch.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@
3030

3131
import elasticapm
3232
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
33-
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin
33+
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin, ElasticsearchInstrumentation
3434

3535

3636
class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule):
3737
name = "elasticsearch_connection"
3838

39-
instrument_list = [("elasticsearch_async.connection", "AIOHttpConnection.perform_request")]
39+
instrument_list = [
40+
("elasticsearch_async.connection", "AIOHttpConnection.perform_request"),
41+
("elasticsearch._async.http_aiohttp", "AIOHttpConnection.perform_request"),
42+
]
4043

4144
async def call(self, module, method, wrapped, instance, args, kwargs):
4245
signature = self.get_signature(args, kwargs)
@@ -52,3 +55,18 @@ async def call(self, module, method, wrapped, instance, args, kwargs):
5255
leaf=True,
5356
):
5457
return await wrapped(*args, **kwargs)
58+
59+
60+
class AsyncElasticsearchInstrumentation(ElasticsearchInstrumentation, AsyncAbstractInstrumentedModule):
61+
name = "elasticsearch"
62+
63+
instrument_list = [
64+
("elasticsearch._async.client", "AsyncElasticsearch.delete_by_query"),
65+
("elasticsearch._async.client", "AsyncElasticsearch.search"),
66+
("elasticsearch._async.client", "AsyncElasticsearch.count"),
67+
("elasticsearch._async.client", "AsyncElasticsearch.update"),
68+
]
69+
70+
async def call(self, module, method, wrapped, instance, args, kwargs):
71+
kwargs = self.inject_apm_params(method, kwargs)
72+
return await wrapped(*args, **kwargs)

elasticapm/instrumentation/packages/elasticsearch.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545

4646

4747
class ElasticSearchConnectionMixin(object):
48-
query_methods = ("Elasticsearch.search", "Elasticsearch.count", "Elasticsearch.delete_by_query")
48+
query_methods = ("search", "count", "delete_by_query")
4949

5050
def get_signature(self, args, kwargs):
5151
args_len = len(args)
@@ -74,7 +74,7 @@ def get_context(self, instance, args, kwargs):
7474
query.append(json.dumps(body["query"], default=compat.text_type))
7575
if query:
7676
context["db"]["statement"] = "\n\n".join(query)
77-
elif api_method == "Elasticsearch.update":
77+
elif api_method == "update":
7878
if isinstance(body, dict) and "script" in body:
7979
# only get the `script` field from the body
8080
context["db"]["statement"] = json.dumps({"script": body["script"]})
@@ -135,17 +135,21 @@ def instrument(self):
135135
super(ElasticsearchInstrumentation, self).instrument()
136136

137137
def call(self, module, method, wrapped, instance, args, kwargs):
138+
kwargs = self.inject_apm_params(method, kwargs)
139+
return wrapped(*args, **kwargs)
140+
141+
def inject_apm_params(self, method, kwargs):
138142
params = kwargs.pop("params", {})
139143

140144
# make a copy of params in case the caller reuses them for some reason
141145
params = params.copy() if params is not None else {}
142146

143-
cls_name, method_name = method.split(".", 1)
147+
method_name = method.partition(".")[-1]
144148

145149
# store a reference to the non-serialized body so we can use it in the connection layer
146150
body = kwargs.get("body")
147151
params[BODY_REF_NAME] = body
148-
params[API_METHOD_KEY_NAME] = method
152+
params[API_METHOD_KEY_NAME] = method_name
149153

150154
kwargs["params"] = params
151-
return wrapped(*args, **kwargs)
155+
return kwargs

elasticapm/instrumentation/register.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
"elasticapm.instrumentation.packages.asyncio.sleep.AsyncIOSleepInstrumentation",
6969
"elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation",
7070
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection",
71+
"elasticapm.instrumentation.packages.asyncio.elasticsearch.AsyncElasticsearchInstrumentation",
7172
"elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation",
7273
"elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation",
7374
"elasticapm.instrumentation.packages.tornado.TornadoHandleRequestExceptionInstrumentation",
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
import pytest # isort:skip
32+
33+
pytest.importorskip("elasticsearch._async") # isort:skip
34+
35+
import os
36+
37+
from elasticsearch import VERSION as ES_VERSION
38+
from elasticsearch import AsyncElasticsearch
39+
40+
from elasticapm.conf.constants import TRANSACTION
41+
42+
pytestmark = [pytest.mark.elasticsearch, pytest.mark.asyncio, pytest.mark.integrationtest]
43+
44+
if "ES_URL" not in os.environ:
45+
pytestmark.append(pytest.mark.skip("Skipping elasticsearch test, no ES_URL environment variable"))
46+
47+
48+
document_type = "_doc" if ES_VERSION[0] >= 6 else "doc"
49+
50+
51+
@pytest.fixture
52+
async def async_elasticsearch(request):
53+
"""AsyncElasticsearch client fixture."""
54+
client = AsyncElasticsearch(hosts=os.environ["ES_URL"])
55+
try:
56+
yield client
57+
finally:
58+
await client.indices.delete(index="*")
59+
await client.close()
60+
61+
62+
async def test_ping(instrument, elasticapm_client, async_elasticsearch):
63+
elasticapm_client.begin_transaction("test")
64+
result = await async_elasticsearch.ping()
65+
elasticapm_client.end_transaction("test", "OK")
66+
67+
transaction = elasticapm_client.events[TRANSACTION][0]
68+
spans = elasticapm_client.spans_for_transaction(transaction)
69+
assert len(spans) == 1
70+
span = spans[0]
71+
assert span["name"] == "ES HEAD /"
72+
assert span["type"] == "db"
73+
assert span["subtype"] == "elasticsearch"
74+
assert span["action"] == "query"
75+
assert span["sync"] is False
76+
77+
78+
async def test_info(instrument, elasticapm_client, async_elasticsearch):
79+
elasticapm_client.begin_transaction("test")
80+
result = await async_elasticsearch.info()
81+
elasticapm_client.end_transaction("test", "OK")
82+
83+
transaction = elasticapm_client.events[TRANSACTION][0]
84+
85+
spans = elasticapm_client.spans_for_transaction(transaction)
86+
assert len(spans) == 1
87+
span = spans[0]
88+
assert span["name"] == "ES GET /"
89+
assert span["type"] == "db"
90+
assert span["subtype"] == "elasticsearch"
91+
assert span["action"] == "query"
92+
assert span["sync"] is False
93+
94+
95+
async def test_create(instrument, elasticapm_client, async_elasticsearch):
96+
elasticapm_client.begin_transaction("test")
97+
if ES_VERSION[0] < 5:
98+
r1 = await async_elasticsearch.create("tweets", document_type, {"user": "kimchy", "text": "hola"}, 1)
99+
elif ES_VERSION[0] < 7:
100+
r1 = await async_elasticsearch.create("tweets", document_type, 1, body={"user": "kimchy", "text": "hola"})
101+
else:
102+
r1 = await async_elasticsearch.create("tweets", 1, body={"user": "kimchy", "text": "hola"})
103+
r2 = await async_elasticsearch.create(
104+
index="tweets", doc_type=document_type, id=2, body={"user": "kimchy", "text": "hola"}, refresh=True
105+
)
106+
elasticapm_client.end_transaction("test", "OK")
107+
108+
transaction = elasticapm_client.events[TRANSACTION][0]
109+
110+
spans = elasticapm_client.spans_for_transaction(transaction)
111+
assert len(spans) == 2
112+
113+
for i, span in enumerate(spans):
114+
if ES_VERSION[0] >= 5:
115+
assert span["name"] in (
116+
"ES PUT /tweets/%s/%d/_create" % (document_type, i + 1),
117+
"ES PUT /tweets/_create/%d" % (i + 1),
118+
)
119+
else:
120+
assert span["name"] == "ES PUT /tweets/%s/%d" % (document_type, i + 1)
121+
assert span["type"] == "db"
122+
assert span["subtype"] == "elasticsearch"
123+
assert span["action"] == "query"
124+
assert span["context"]["db"]["type"] == "elasticsearch"
125+
assert "statement" not in span["context"]["db"]
126+
127+
128+
async def test_search_body(instrument, elasticapm_client, async_elasticsearch):
129+
await async_elasticsearch.create(
130+
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
131+
)
132+
elasticapm_client.begin_transaction("test")
133+
search_query = {"query": {"term": {"user": "kimchy"}}}
134+
result = await async_elasticsearch.search(body=search_query, params=None)
135+
elasticapm_client.end_transaction("test", "OK")
136+
137+
transaction = elasticapm_client.events[TRANSACTION][0]
138+
assert result["hits"]["hits"][0]["_source"] == {"user": "kimchy", "text": "hola"}
139+
spans = elasticapm_client.spans_for_transaction(transaction)
140+
assert len(spans) == 1
141+
span = spans[0]
142+
# Depending on ES_VERSION, could be /_all/_search or /_search, and GET or POST
143+
assert span["name"] in ("ES GET /_search", "ES GET /_all/_search", "ES POST /_search")
144+
assert span["type"] == "db"
145+
assert span["subtype"] == "elasticsearch"
146+
assert span["action"] == "query"
147+
assert span["context"]["db"]["type"] == "elasticsearch"
148+
assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}'
149+
assert span["sync"] is False
150+
151+
152+
async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
153+
await async_elasticsearch.create(
154+
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
155+
)
156+
elasticapm_client.begin_transaction("test")
157+
search_query = {"query": {"term": {"user": "kimchy"}}}
158+
result = await async_elasticsearch.count(body=search_query)
159+
elasticapm_client.end_transaction("test", "OK")
160+
161+
transaction = elasticapm_client.events[TRANSACTION][0]
162+
assert result["count"] == 1
163+
spans = elasticapm_client.spans_for_transaction(transaction)
164+
assert len(spans) == 1
165+
span = spans[0]
166+
assert span["name"] in ("ES GET /_count", "ES POST /_count", "ES GET /_all/_count")
167+
assert span["type"] == "db"
168+
assert span["subtype"] == "elasticsearch"
169+
assert span["action"] == "query"
170+
assert span["context"]["db"]["type"] == "elasticsearch"
171+
assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}'
172+
assert span["sync"] is False
173+
174+
175+
async def test_delete_by_query_body(instrument, elasticapm_client, async_elasticsearch):
176+
await async_elasticsearch.create(
177+
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
178+
)
179+
elasticapm_client.begin_transaction("test")
180+
result = await async_elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}})
181+
elasticapm_client.end_transaction("test", "OK")
182+
183+
transaction = elasticapm_client.events[TRANSACTION][0]
184+
spans = elasticapm_client.spans_for_transaction(transaction)
185+
186+
span = spans[0]
187+
assert span["name"] == "ES POST /tweets/_delete_by_query"
188+
assert span["type"] == "db"
189+
assert span["subtype"] == "elasticsearch"
190+
assert span["action"] == "query"
191+
assert span["context"]["db"]["type"] == "elasticsearch"
192+
assert span["context"]["db"]["statement"] == '{"term": {"user": "kimchy"}}'
193+
assert span["sync"] is False
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
elasticsearch>=7.0,<8.0
22
elasticsearch-async ; python_version >= '3.7'
3+
aiohttp ; python_version >= '3.6'
34
-r reqs-base.txt

0 commit comments

Comments
 (0)