Skip to content

Commit

Permalink
Implement telemetry upgrade (#761)
Browse files Browse the repository at this point in the history
* implement telemetry upgrade

* .

* more telemetry updates

* dep warnings

* fix

* .
  • Loading branch information
vangheem authored Apr 6, 2023
1 parent f11b06c commit 872375e
Show file tree
Hide file tree
Showing 34 changed files with 180 additions and 365 deletions.
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,21 @@ proto-clean-py:
rm -rf nucliadb_protos/nucliadb_protos/*_pb2_grpc.pyi

python-code-lint:
isort --profile black nucliadb_telemetry
isort --profile black nucliadb_dataset
isort --profile black nucliadb_client
isort --profile black nucliadb_sdk
isort --profile black nucliadb_models

black nucliadb_telemetry
black nucliadb_dataset
black nucliadb_client
black nucliadb_sdk
black nucliadb_models

flake8 --config nucliadb_telemetry/setup.cfg nucliadb_telemetry/nucliadb_telemetry
flake8 --config nucliadb_dataset/setup.cfg nucliadb_dataset/nucliadb_dataset
flake8 --config nucliadb_client/setup.cfg nucliadb_client/nucliadb_client
flake8 --config nucliadb_sdk/setup.cfg nucliadb_sdk/nucliadb_sdk
flake8 --config nucliadb_models/setup.cfg nucliadb_models/nucliadb_models

MYPYPATH=./mypy_stubs mypy nucliadb_telemetry
MYPYPATH=./mypy_stubs mypy nucliadb_dataset
MYPYPATH=./mypy_stubs mypy nucliadb_client
MYPYPATH=./mypy_stubs mypy nucliadb_models
Expand All @@ -93,6 +89,9 @@ python-code-lint:
make -C nucliadb/ format
make -C nucliadb/ lint

make -C nucliadb_telemetry/ format
make -C nucliadb_telemetry/ lint

rust-code-lint:
cargo +nightly fmt -p nucliadb_node
cargo clippy -p nucliadb_node --tests
Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ ignore_missing_imports = True
ignore_missing_imports = True

[mypy-starlette_prometheus.*]
ignore_missing_imports = True
ignore_missing_imports = True
13 changes: 2 additions & 11 deletions nucliadb/nucliadb/ingest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
from typing import Awaitable, Callable, Optional, Union

import pkg_resources
from opentelemetry.instrumentation.aiohttp_client import ( # type: ignore
AioHttpClientInstrumentor,
)
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3MultiFormat

from nucliadb.ingest import SERVICE_NAME, logger, logger_activity
from nucliadb.ingest.chitchat import start_chitchat
Expand All @@ -36,7 +31,7 @@
from nucliadb.ingest.service import start_grpc
from nucliadb.ingest.settings import settings
from nucliadb_telemetry import errors
from nucliadb_telemetry.utils import get_telemetry, init_telemetry
from nucliadb_telemetry.utils import setup_telemetry
from nucliadb_utils.fastapi.run import serve_metrics
from nucliadb_utils.indexing import IndexingUtility
from nucliadb_utils.run import run_until_exit
Expand Down Expand Up @@ -106,11 +101,7 @@ async def stop_indexing_utility():


async def initialize() -> list[Callable[[], Awaitable[None]]]:
tracer_provider = get_telemetry(SERVICE_NAME)
if tracer_provider is not None: # pragma: no cover
set_global_textmap(B3MultiFormat())
await init_telemetry(tracer_provider) # To start asyncio task
AioHttpClientInstrumentor().instrument(tracer_provider=tracer_provider)
await setup_telemetry(SERVICE_NAME)

chitchat = await start_chitchat(SERVICE_NAME)

Expand Down
76 changes: 23 additions & 53 deletions nucliadb/nucliadb/ingest/orm/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@
from nucliadb.ingest.orm.shard import Shard
from nucliadb.ingest.settings import settings
from nucliadb_telemetry import errors
from nucliadb_telemetry.grpc import OpenTelemetryGRPC
from nucliadb_telemetry.utils import get_telemetry
from nucliadb_utils.grpc import get_traced_grpc_channel
from nucliadb_utils.keys import KB_SHARDS

READ_CONNECTIONS = LRU(50)
Expand Down Expand Up @@ -392,30 +391,27 @@ async def list_members(cls) -> List[Member]:
members.append(member)
return members

def _get_service_address(self, port_map: Dict[int, int]) -> str:
hostname = self.address.split(":")[0]
if settings.node_sidecar_port is None:
# For testing proposes we need to be able to have a writing port
sidecar_port = port_map[hostname]
grpc_address = f"localhost:{sidecar_port}"
else:
grpc_address = f"{hostname}:{settings.node_sidecar_port}"
return grpc_address

@property
def sidecar(self) -> NodeSidecarStub:
if (
self._sidecar is None
and self.address not in SIDECAR_CONNECTIONS
and self.dummy is False
):
hostname = self.address.split(":")[0]
if settings.node_sidecar_port is None:
# For testing proposes we need to be able to have a writing port
sidecar_port = settings.sidecar_port_map[hostname]
grpc_address = f"localhost:{sidecar_port}"
else:
grpc_address = f"{hostname}:{settings.node_sidecar_port}"

tracer_provider = get_telemetry(SERVICE_NAME)
if tracer_provider is not None: # pragma: no cover
telemetry_grpc = OpenTelemetryGRPC(
f"{SERVICE_NAME}_grpc_sidecar", tracer_provider
)
channel = telemetry_grpc.init_client(grpc_address)
else:
channel = aio.insecure_channel(grpc_address)

grpc_address = self._get_service_address(settings.sidecar_port_map)
channel = get_traced_grpc_channel(
grpc_address, SERVICE_NAME, variant="_sidecar"
)
SIDECAR_CONNECTIONS[self.address] = NodeSidecarStub(channel)
if (
self._sidecar is None
Expand All @@ -434,23 +430,10 @@ def writer(self) -> NodeWriterStub:
and self.address not in WRITE_CONNECTIONS
and self.dummy is False
):
hostname = self.address.split(":")[0]
if settings.node_writer_port is None:
# For testing proposes we need to be able to have a writing port
writer_port = settings.writer_port_map[hostname]
grpc_address = f"localhost:{writer_port}"
else:
grpc_address = f"{hostname}:{settings.node_writer_port}"

tracer_provider = get_telemetry(SERVICE_NAME)
if tracer_provider is not None: # pragma: no cover
telemetry_grpc = OpenTelemetryGRPC(
f"{SERVICE_NAME}_grpc_writer", tracer_provider
)
channel = telemetry_grpc.init_client(grpc_address)
else:
channel = aio.insecure_channel(grpc_address)

grpc_address = self._get_service_address(settings.writer_port_map)
channel = get_traced_grpc_channel(
grpc_address, SERVICE_NAME, variant="_writer"
)
WRITE_CONNECTIONS[self.address] = NodeWriterStub(channel)
if (
self._writer is None
Expand All @@ -469,23 +452,10 @@ def reader(self) -> NodeReaderStub:
and self.address not in READ_CONNECTIONS
and self.dummy is False
):
hostname = self.address.split(":")[0]
if settings.node_reader_port is None:
# For testing proposes we need to be able to have a writing port
reader_port = settings.reader_port_map[hostname]
grpc_address = f"localhost:{reader_port}"
else:
grpc_address = f"{hostname}:{settings.node_reader_port}"

tracer_provider = get_telemetry(SERVICE_NAME)
if tracer_provider is not None: # pragma: no cover
telemetry_grpc = OpenTelemetryGRPC(
f"{SERVICE_NAME}_grpc_reader", tracer_provider
)
channel = telemetry_grpc.init_client(grpc_address)
else:
channel = aio.insecure_channel(grpc_address)

grpc_address = self._get_service_address(settings.reader_port_map)
channel = get_traced_grpc_channel(
grpc_address, SERVICE_NAME, variant="_reader"
)
READ_CONNECTIONS[self.address] = NodeReaderStub(channel)
if (
self._reader is None
Expand Down
22 changes: 7 additions & 15 deletions nucliadb/nucliadb/ingest/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from nucliadb.ingest.service.writer import WriterServicer
from nucliadb.ingest.settings import DriverConfig, settings
from nucliadb_protos import writer_pb2_grpc
from nucliadb_telemetry.grpc import OpenTelemetryGRPC
from nucliadb_telemetry.utils import get_telemetry, init_telemetry
from nucliadb_telemetry.utils import setup_telemetry
from nucliadb_utils.grpc import get_traced_grpc_server


async def health_check(health_servicer):
Expand All @@ -51,19 +51,11 @@ async def health_check(health_servicer):
async def start_grpc(service_name: Optional[str] = None):
aio.init_grpc_aio()

tracer_provider = get_telemetry(service_name)
if tracer_provider is not None: # pragma: no cover
await init_telemetry(tracer_provider)
otgrpc = OpenTelemetryGRPC(f"{service_name}_grpc", tracer_provider)
server = otgrpc.init_server()
else:
options = [
(
"grpc.max_receive_message_length",
settings.max_receive_message_length * 1024 * 1024,
),
]
server = aio.server(options=options)
await setup_telemetry(service_name or "ingest")
server = get_traced_grpc_server(
service_name or "ingest",
max_receive_message=settings.max_receive_message_length,
)

servicer = WriterServicer()
await servicer.initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from nucliadb.ingest.tests.fixtures import IngestFixture
from nucliadb_protos import knowledgebox_pb2, utils_pb2, writer_pb2, writer_pb2_grpc
from nucliadb_telemetry.settings import telemetry_settings
from nucliadb_telemetry.utils import get_telemetry, init_telemetry
from nucliadb_telemetry.utils import get_telemetry
from nucliadb_utils.keys import KB_SHARDS


Expand All @@ -37,7 +37,6 @@ async def test_create_knowledgebox(
):
tracer_provider = get_telemetry("GCS_SERVICE")
assert tracer_provider is not None
await init_telemetry(tracer_provider)

stub = writer_pb2_grpc.WriterStub(grpc_servicer.channel)
pb_prefix = knowledgebox_pb2.KnowledgeBoxPrefix(prefix="")
Expand Down Expand Up @@ -88,6 +87,7 @@ async def test_create_knowledgebox(
await asyncio.sleep(2)
else:
break

assert len(resp.json()["data"]) == expected_spans


Expand Down
15 changes: 4 additions & 11 deletions nucliadb/nucliadb/ingest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
#
from typing import Optional

from grpc import aio
from nucliadb_protos.writer_pb2_grpc import WriterStub

from nucliadb.ingest.chitchat import ChitchatNucliaDB # type: ignore
from nucliadb.ingest.maindb.driver import Driver
from nucliadb.ingest.settings import settings
from nucliadb_telemetry.grpc import OpenTelemetryGRPC
from nucliadb_telemetry.utils import get_telemetry, init_telemetry
from nucliadb_utils.grpc import get_traced_grpc_channel
from nucliadb_utils.settings import nucliadb_settings
from nucliadb_utils.store import MAIN
from nucliadb_utils.utilities import Utility, clean_utility, get_utility, set_utility
Expand Down Expand Up @@ -110,14 +108,9 @@ async def start_ingest(service_name: Optional[str] = None):
if nucliadb_settings.nucliadb_ingest is not None:
# Its distributed lets create a GRPC client
# We want Jaeger telemetry enabled
provider = get_telemetry(service_name)

if provider is not None:
await init_telemetry(provider)
otgrpc = OpenTelemetryGRPC(f"{service_name}_ingest", provider)
channel = otgrpc.init_client(nucliadb_settings.nucliadb_ingest)
else:
channel = aio.insecure_channel(nucliadb_settings.nucliadb_ingest)
channel = get_traced_grpc_channel(
nucliadb_settings.nucliadb_ingest, service_name or "ingest"
)
set_utility(Utility.CHANNEL, channel)
set_utility(Utility.INGEST, WriterStub(channel))
else:
Expand Down
12 changes: 2 additions & 10 deletions nucliadb/nucliadb/one/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
#
import pkg_resources
from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3MultiFormat
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import HTMLResponse
from starlette.routing import Mount
from starlette_prometheus import PrometheusMiddleware

from nucliadb.one.lifecycle import finalize, initialize
from nucliadb.reader import API_PREFIX
Expand All @@ -36,6 +32,7 @@
from nucliadb.train.api.v1.router import api as api_train_v1
from nucliadb.writer.api.v1.router import api as api_writer_v1
from nucliadb_telemetry import errors
from nucliadb_telemetry.fastapi import instrument_app
from nucliadb_utils.authentication import STFAuthenticationBackend
from nucliadb_utils.fastapi import metrics
from nucliadb_utils.fastapi.openapi import extend_openapi
Expand All @@ -49,7 +46,6 @@
allow_methods=["*"],
allow_headers=["*"],
),
Middleware(PrometheusMiddleware),
Middleware(
AuthenticationMiddleware,
backend=STFAuthenticationBackend(),
Expand Down Expand Up @@ -101,8 +97,4 @@ async def homepage(request):
application.add_route("/", homepage)
application.add_route("/metrics", metrics.metrics)


# Enable forwarding of B3 headers to responses and external requests
# to both inner applications
set_global_textmap(B3MultiFormat())
FastAPIInstrumentor.instrument_app(application)
instrument_app(application, excluded_urls=["/"], metrics=True)
19 changes: 6 additions & 13 deletions nucliadb/nucliadb/reader/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@
import pkg_resources
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from opentelemetry.instrumentation.aiohttp_client import ( # type: ignore
AioHttpClientInstrumentor,
)
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3MultiFormat
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import ClientDisconnect, Request
from starlette.responses import HTMLResponse
from starlette_prometheus import PrometheusMiddleware

from nucliadb.reader import API_PREFIX, SERVICE_NAME
from nucliadb.reader.api.v1.router import api as api_v1
Expand All @@ -50,7 +44,6 @@
allow_methods=["*"],
allow_headers=["*"],
),
Middleware(PrometheusMiddleware),
Middleware(
AuthenticationMiddleware,
backend=STFAuthenticationBackend(),
Expand Down Expand Up @@ -113,9 +106,9 @@ async def homepage(request: Request) -> HTMLResponse:
# Use raw starlette routes to avoid unnecessary overhead
application.add_route("/", homepage)

# Enable forwarding of B3 headers to responses and external requests
# to both inner applications
tracer_provider = get_telemetry(SERVICE_NAME)
set_global_textmap(B3MultiFormat())
instrument_app(application, tracer_provider=tracer_provider, excluded_urls=["/"])
AioHttpClientInstrumentor().instrument(tracer_provider=tracer_provider)
instrument_app(
application,
tracer_provider=get_telemetry(SERVICE_NAME),
excluded_urls=["/"],
metrics=True,
)
6 changes: 2 additions & 4 deletions nucliadb/nucliadb/reader/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@

from nucliadb.ingest.utils import start_ingest, stop_ingest
from nucliadb.reader import SERVICE_NAME, logger
from nucliadb_telemetry.utils import clean_telemetry, get_telemetry, init_telemetry
from nucliadb_telemetry.utils import clean_telemetry, setup_telemetry
from nucliadb_utils.settings import running_settings
from nucliadb_utils.utilities import start_audit_utility, stop_audit_utility


async def initialize() -> None:
tracer_provider = get_telemetry(SERVICE_NAME)
if tracer_provider:
await init_telemetry(tracer_provider)
await setup_telemetry(SERVICE_NAME)

await start_ingest(SERVICE_NAME)
await start_audit_utility(SERVICE_NAME)
Expand Down
Loading

2 comments on commit 872375e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 872375e Previous: d19d131 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 4844.367773822075 iter/sec (stddev: 0.00002209759649720485)

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 872375e Previous: d19d131 Ratio
nucliadb/tests/benchmarks/test_search.py::test_search_returns_labels 53.928137029770184 iter/sec (stddev: 0.0008170196084620727) 68.48176450054224 iter/sec (stddev: 0.00011698806646091496) 1.27
nucliadb/tests/benchmarks/test_search.py::test_search_relations 119.56984116402526 iter/sec (stddev: 0.00011617448647345154) 128.13602179672588 iter/sec (stddev: 0.000027569798386079616) 1.07

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.