Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Dockerfile.consumer
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ RUN apt-get update \
&& apt-get -y autoremove \
&& apt-get -y clean

RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http
# Above, `--break-system-packages` flag overrides the
# "This environment is externally managed" error that calling pip
# would otherwise incur here.

WORKDIR /app
COPY middleware/* .

Expand Down
5 changes: 5 additions & 0 deletions Dockerfile.exporter
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ RUN apt-get update \
&& apt-get -y autoremove \
&& apt-get -y clean

RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http
# Above, `--break-system-packages` flag overrides the
# "This environment is externally managed" error that calling pip
# would otherwise incur here.

WORKDIR /app
COPY middleware/* .

Expand Down
5 changes: 5 additions & 0 deletions Dockerfile.redoer
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ RUN apt-get update \
&& apt-get -y autoremove \
&& apt-get -y clean

RUN pip3 install --break-system-packages opentelemetry-distro opentelemetry-exporter-otlp-proto-http
# Above, `--break-system-packages` flag overrides the
# "This environment is externally managed" error that calling pip
# would otherwise incur here.

WORKDIR /app
COPY middleware/* .

Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ _Environment variables (see docker-compose.yaml):_
initially retrieved from SQS
- Sets the maximum amount of time the Consumer will wait for a Senzing
`add_record` to complete before bailing and moving on.
- `RUNTIME_ENV` -- the runtime environment (e.g., "Dev", "Prod", etc.).
- Optional; defaults to "unknown".
- `OTEL_USE_OTLP_EXPORTER` -- 'true' or 'false' (default is false)
- `OTEL_EXPORTER_OTLP_ENDPOINT`

_Mounts in docker-compose.yaml:_

Expand Down Expand Up @@ -243,6 +247,10 @@ _Environment variables:_
- When either (a) Senzing's internal redo queue is empty or (b) a
`SzRetryableError` is encountered, this sets how long to wait before
attemping the next Senzing op.
- `RUNTIME_ENV` -- the runtime environment (e.g., "Dev", "Prod", etc.).
- Optional; defaults to "unknown".
- `OTEL_USE_OTLP_EXPORTER` -- 'true' or 'false' (default is false)
- `OTEL_EXPORTER_OTLP_ENDPOINT`

### Exporter

Expand All @@ -261,6 +269,10 @@ docker compose run --env AWS_PROFILE=localstack --env S3_BUCKET_NAME=sqs-senzing
- `FOLDER_NAME` -- optional (defaults to `exporter-outputs`); folder inside S3
where the file will be placed.
- `LOG_LEVEL` -- optional; defaults to `INFO`.
- `RUNTIME_ENV` -- the runtime environment (e.g., "Dev", "Prod", etc.).
- Optional; defaults to "unknown".
- `OTEL_USE_OTLP_EXPORTER` -- 'true' or 'false' (default is false)
- `OTEL_EXPORTER_OTLP_ENDPOINT`

_Mounts in docker-compose.yaml:_

Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ services:
init-db:
image: senzing/init-database:latest
depends_on:
- db
db:
condition: service_healthy
environment:
SENZING_TOOLS_DATASOURCES: PEOPLE
SENZING_TOOLS_ENGINE_CONFIGURATION_JSON: >-
Expand Down
25 changes: 25 additions & 0 deletions middleware/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from timeout_handling import *

import otel

try:
log.info('Importing senzing_core library . . .')
import senzing_core as sz_core
Expand All @@ -22,6 +24,7 @@
Q_URL = os.environ['Q_URL']
SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420))
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
RUNTIME_ENV = os.environ.get('RUNTIME_ENV', 'unknown') # For OTel

POLL_SECONDS = 20 # 20 seconds is SQS max

Expand Down Expand Up @@ -167,6 +170,14 @@ def clean_up(signum, frm):
except Exception as e:
log.error(fmterr(e))

# OTel setup #
log.info('Starting OTel setup.')
meter = otel.init('consumer')
otel_msgs_counter = meter.create_counter('consumer.messages.count')
otel_durations = meter.create_histogram('consumer.messages.duration')
log.info('Finished OTel setup.')
# end OTel setup #

while 1:
try:
# Get next message.
Expand All @@ -176,11 +187,15 @@ def clean_up(signum, frm):
+ receipt_handle)
rcd = json.loads(body)

start = time.perf_counter()
success_status = otel.FAILURE # initial default value

try:
# Process and send to Senzing.
start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS)
resp = sz_eng.add_record(rcd['DATA_SOURCE'], rcd['RECORD_ID'], body)
cancel_alarm_timer()
success_status = otel.SUCCESS
log.debug(SZ_TAG + 'Successful add_record having ReceiptHandle: '
+ receipt_handle)
except KeyError as ke:
Expand Down Expand Up @@ -208,6 +223,16 @@ def clean_up(signum, frm):
else:
del_msg(sqs, Q_URL, receipt_handle)

finish = time.perf_counter()
otel_msgs_counter.add(1,
{'status': success_status,
'service': 'consumer',
'environment': RUNTIME_ENV})
otel_durations.record(finish - start,
{'status': success_status,
'service': 'consumer',
'environment': RUNTIME_ENV})

except Exception as e:
log.error(fmterr(e))

Expand Down
27 changes: 26 additions & 1 deletion middleware/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from loglib import *
log = retrieve_logger()

import otel

try:
log.info('Importing senzing_core library . . .')
import senzing_core as sz_core
Expand All @@ -28,6 +30,7 @@
sys.exit(1)
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
FOLDER_NAME = os.environ.get('FOLDER_NAME', 'exporter-outputs')
RUNTIME_ENV = os.environ.get('RUNTIME_ENV', 'unknown') # For OTel

EXPORT_FLAGS = sz.SzEngineFlags.SZ_EXPORT_DEFAULT_FLAGS

Expand Down Expand Up @@ -82,13 +85,25 @@ def go():
except Exception as e:
log.error(fmterr(e))

# OTel setup #
log.info('Starting OTel setup.')
meter = otel.init('exporter')
otel_exp_counter = meter.create_counter('exporter.export.count')
otel_duration = meter.create_histogram('exporter.export.duration')
log.info('Finished OTel setup.')
# end OTel setup #

# init buffer
buff = io.BytesIO()

# Retrieve output from sz into buff
# sz will export JSONL lines; we add the chars necessary to make
# the output as a whole be a single JSON blob.
log.info(SZ_TAG + 'Starting export from Senzing.')

start = time.perf_counter()
success_status = otel.FAILURE # initial default state

try:
export_handle = sz_eng.export_json_entity_report(EXPORT_FLAGS)
log.info(SZ_TAG + 'Obtained export_json_entity_report handle.')
Expand Down Expand Up @@ -121,9 +136,20 @@ def go():
try:
s3.upload_fileobj(buff, S3_BUCKET_NAME, full_path)
log.info(AWS_TAG + 'Successfully uploaded file.')
success_status = otel.SUCCESS
except Exception as e:
log.error(AWS_TAG + fmterr(e))

finish = time.perf_counter()
otel_exp_counter.add(1,
{'status': success_status,
'service': 'exporter',
'environment': RUNTIME_ENV})
otel_duration.record(finish - start,
{'status': success_status,
'service': 'exporter',
'environment': RUNTIME_ENV})

#-------------------------------------------------------------------------------

def main():
Expand All @@ -134,4 +160,3 @@ def main():
go()

if __name__ == '__main__': main()

35 changes: 35 additions & 0 deletions middleware/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# References:
# https://opentelemetry.io/docs/languages/python/instrumentation/#metrics
# https://opentelemetry.io/docs/languages/python/exporters/#console
# https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/

import os

from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader)
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter

INTERVAL_MS = 5000

def init(service_name):
'''Perform general OTel setup and return meter obj.'''
resource = Resource.create(attributes={SERVICE_NAME: service_name})
if os.getenv('OTEL_USE_OTLP_EXPORTER', 'false').lower() == 'true':
metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter(), export_interval_millis=INTERVAL_MS)
else:
metric_reader = PeriodicExportingMetricReader(ConsoleMetricExporter(), export_interval_millis=INTERVAL_MS)
meter_provider = MeterProvider(resource=resource,
metric_readers=[metric_reader])

# Set the global default meter provider:
metrics.set_meter_provider(meter_provider)

# Create a meter from the global meter provider:
return metrics.get_meter(service_name+'.meter')

SUCCESS = 'success'
FAILURE = 'failure'
46 changes: 45 additions & 1 deletion middleware/redoer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

from timeout_handling import *

import otel
from opentelemetry import metrics

try:
log.info('Importing senzing_core library . . .')
import senzing_core as sz_core
Expand All @@ -20,6 +23,7 @@

SZ_CALL_TIMEOUT_SECONDS = int(os.environ.get('SZ_CALL_TIMEOUT_SECONDS', 420))
SZ_CONFIG = json.loads(os.environ['SENZING_ENGINE_CONFIGURATION_JSON'])
RUNTIME_ENV = os.environ.get('RUNTIME_ENV', 'unknown') # For OTel

# How long to wait before attempting next Senzing op.
WAIT_SECONDS = int(os.environ.get('WAIT_SECONDS', 10))
Expand Down Expand Up @@ -47,6 +51,33 @@ def go():
except Exception as e:
log.error(fmterr(e))

# OTel setup #
log.info('Starting OTel setup.')
meter = otel.init('redoer')
otel_msgs_counter = meter.create_counter('redoer.messages.count')
otel_durations = meter.create_histogram('redoer.messages.duration')

def _queue_count_steward(tally):
'''Coroutine function; this lets us both:
- 1) easily pass in updated tally values via `send`
- 2) accommodate OTel's spec of a "a generator that yields
iterables of Observation"
Ref: https://opentelemetry-python.readthedocs.io/en/latest/api/metrics.html#opentelemetry.metrics.Meter.create_observable_gauge
'''
while 1:
newtally = yield [metrics.Observation(tally)]
# We check the type b/c OTel internals will send in a
# CallbackOptions object that we'll want to ignore;
# meanwhile type `int` means we sent in an updated tally value
# ourselves.
if newtally and type(newtally) is int: tally = newtally
queue_count_steward = _queue_count_steward(-1)
next(queue_count_steward) # prime it.
meter.create_observable_gauge('redoer.queue.count', [queue_count_steward])

log.info('Finished OTel setup.')
# end OTel setup #

log.info('Starting primary loop.')

# Approach:
Expand All @@ -64,12 +95,14 @@ def go():
attempts_left = MAX_REDO_ATTEMPTS
while 1:
try:

if have_rcd:
start = time.perf_counter()
success_status = otel.FAILURE # initial default value
try:
start_alarm_timer(SZ_CALL_TIMEOUT_SECONDS)
sz_eng.process_redo_record(rcd)
cancel_alarm_timer()
success_status = otel.SUCCESS
have_rcd = 0
log.debug(SZ_TAG + 'Successfully redid one record via process_redo_record().')
continue
Expand All @@ -95,9 +128,20 @@ def go():
except sz.SzError as sz_err:
log.error(SZ_TAG + fmterr(sz_err))

finish = time.perf_counter()
otel_msgs_counter.add(1,
{'status': success_status,
'service': 'redoer',
'environment': RUNTIME_ENV})
otel_durations.record(finish - start,
{'status': success_status,
'service': 'redoer',
'environment': RUNTIME_ENV})

else:
try:
tally = sz_eng.count_redo_records()
queue_count_steward.send(tally)
log.debug(SZ_TAG + 'Current redo count: ' + str(tally))
except sz.SzRetryableError as sz_ret_err:
log.error(SZ_TAG + fmterr(sz_ret_err))
Expand Down
Loading