Skip to content

Commit

Permalink
Add wrappers for redis.search methods create_index, search, aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
mpozniak95 committed Jun 25, 2024
1 parent 919b2c2 commit 191ef9d
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,17 @@ def response_hook(span, instance, response):
from typing import Any, Collection

import redis
import redis.commands
from wrapt import wrap_function_wrapper

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.redis.package import _instruments
from opentelemetry.instrumentation.redis.util import (
_args_or_none,
_extract_conn_attributes,
_format_command_args,
_set_span_attribute,
)
from opentelemetry.instrumentation.redis.version import __version__
from opentelemetry.instrumentation.utils import unwrap
Expand Down Expand Up @@ -217,6 +220,67 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
response_hook(span, instance, response)
return response

def _traced_create_index(func, instance, args, kwargs):
span_name = "redis.create_index"
with tracer.start_as_current_span(span_name) as span:
_set_span_attribute(
span,
"redis.create_index.fields",
kwargs.get("fields").__str__(),
)
_set_span_attribute(
span,
"redis.create_index.definition",
kwargs.get("definition").__str__(),
)
response = func(*args, **kwargs)
return response

def _traced_search(func, instance, args, kwargs):
span_name = "redis.search"
with tracer.start_as_current_span(span_name) as span:
query = kwargs.get("query") or _args_or_none(args, 0)
_set_span_attribute(
span,
"redis.commands.search.query",
query.query_string(),
)
response = func(*args, **kwargs)
_set_span_attribute(
span,
"redis.commands.search.total",
response.total
)
_set_span_attribute(
span,
"redis.commands.search.duration",
response.duration
)
for index, doc in enumerate(response.docs):
_set_span_attribute(
span,
f"redis.commands.search.xdoc_{index}",
doc.__str__()
)
return response

def _traced_aggregate(func, instance, args, kwargs):
span_name = "redis.aggregate"
with tracer.start_as_current_span(span_name) as span:
query = kwargs.get("query") or _args_or_none(args, 0)
_set_span_attribute(
span,
"redis.commands.aggregate.query",
query.query_string(),
)
response = func(*args, **kwargs)
_set_span_attribute(
span,
"redis.commands.aggregate.results",
str(response.rows)
)
return response

pipeline_class = (
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
)
Expand All @@ -235,6 +299,21 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
f"{pipeline_class}.immediate_execute_command",
_traced_execute_command,
)
wrap_function_wrapper(
"redis.commands.search",
"Search.create_index",
_traced_create_index,
)
wrap_function_wrapper(
"redis.commands.search",
"Search.search",
_traced_search,
)
wrap_function_wrapper(
"redis.commands.search",
"Search.aggregate",
_traced_aggregate,
)
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
wrap_function_wrapper(
"redis.cluster",
Expand Down Expand Up @@ -345,6 +424,9 @@ def _instrument(self, **kwargs):
)

def _uninstrument(self, **kwargs):
unwrap(redis.commands.search.Search, "create_index")
unwrap(redis.commands.search.Search, "search")
unwrap(redis.commands.search.Search, "aggregate")
if redis.VERSION < (3, 0, 0):
unwrap(redis.StrictRedis, "execute_command")
unwrap(redis.StrictRedis, "pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,17 @@ def _format_command_args(args):
out_str = ""

return out_str


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


def _args_or_none(args, n):
try:
return args[n]
except IndexError:
return None

0 comments on commit 191ef9d

Please sign in to comment.