Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
([#936](https://github.com/census-instrumentation/opencensus-python/pull/936))
- Fix attach rate metrics for VM to only ping data service on retry
([#946](https://github.com/census-instrumentation/opencensus-python/pull/946))
- Added queue capacity configuration for exporters
([#949](https://github.com/census-instrumentation/opencensus-python/pull/949))

## 1.0.4
Released 2020-06-29
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def process_options(options):
TEMPDIR_PREFIX + TEMPDIR_SUFFIX
)

# proxies
if options.proxies is None:
options.proxies = '{}'

Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(self, *args, **kwargs):
max_batch_size=100,
minimum_retry_interval=60, # minimum retry interval in seconds
proxies=None, # string maps url schemes to the url of the proxies
queue_capacity=8192,
storage_maintenance_period=60,
storage_max_size=50*1024*1024, # 50MiB
storage_path=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, **options):
self.max_batch_size = options.max_batch_size
# TODO: queue should be moved to tracer
# too much refactor work, leave to the next PR
self._queue = Queue(capacity=8192) # TODO: make this configurable
self._queue = Queue(capacity=options.queue_capacity)
# TODO: worker should not be created in the base exporter
self._worker = Worker(self._queue, self)
self._worker.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ def gets(self):
if path.endswith('.tmp'):
if name < timeout_deadline:
try:
os.remove(path) # TODO: log data loss
os.remove(path)
logger.warning(
'File write exceeded timeout. Dropping telemetry')
except Exception:
pass # keep silent
if path.endswith('.lock'):
Expand All @@ -148,7 +150,10 @@ def gets(self):
if path.endswith('.blob'):
if name < retention_deadline:
try:
os.remove(path) # TODO: log data loss
os.remove(path)
logger.warning(
'File write exceeded retention.' +
'Dropping telemetry')
except Exception:
pass # keep silent
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def _transmit(self, envelopes):
logger.info('Transmission succeeded: %s.', text)
return 0
if response.status_code == 206: # Partial Content
# TODO: store the unsent data
if data:
try:
resend_envelopes = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, **options):
)
self._telemetry_processors = []
self.addFilter(SamplingFilter(self.options.logging_sampling_rate))
self._queue = Queue(capacity=8192) # TODO: make this configurable
self._queue = Queue(capacity=self.options.queue_capacity)
self._worker = Worker(self._queue, self)
self._worker.start()
heartbeat_metrics.enable_heartbeat_metrics(
Expand Down
32 changes: 32 additions & 0 deletions contrib/opencensus-ext-azure/tests/test_azure_log_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ def test_init_handler_with_proxies(self):
'{"https":"https://test-proxy.com"}',
)

def test_init_handler_with_queue_capacity(self):
handler = log_exporter.AzureLogHandler(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
queue_capacity=500,
)

self.assertEqual(
handler.options.queue_capacity,
500
)

self.assertEqual(
handler._worker._src._queue.maxsize,
500
)

@mock.patch('requests.post', return_value=mock.Mock())
def test_exception(self, requests_mock):
logger = logging.getLogger(self.id())
Expand Down Expand Up @@ -289,6 +305,22 @@ def test_init_handler_with_proxies(self):
'{"https":"https://test-proxy.com"}',
)

def test_init_handler_with_queue_capacity(self):
handler = log_exporter.AzureEventHandler(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
queue_capacity=500,
)

self.assertEqual(
handler.options.queue_capacity,
500
)
# pylint: disable=protected-access
self.assertEqual(
handler._worker._src._queue.maxsize,
500
)

@mock.patch('requests.post', return_value=mock.Mock())
def test_exception(self, requests_mock):
logger = logging.getLogger(self.id())
Expand Down
16 changes: 16 additions & 0 deletions contrib/opencensus-ext-azure/tests/test_azure_trace_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ def test_init_exporter_with_proxies(self):
'{"https":"https://test-proxy.com"}',
)

def test_init_exporter_with_queue_capacity(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
queue_capacity=500,
)

self.assertEqual(
exporter.options.queue_capacity,
500
)
# pylint: disable=protected-access
self.assertEqual(
exporter._worker.src._queue.maxsize,
500
)

@mock.patch('requests.post', return_value=mock.Mock())
def test_emit_empty(self, request_mock):
exporter = trace_exporter.AzureExporter(
Expand Down
5 changes: 4 additions & 1 deletion opencensus/common/schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

from six.moves import queue

import logging
import threading
import time

logger = logging.getLogger(__name__)


class PeriodicTask(threading.Thread):
"""Thread that periodically calls a given function.
Expand Down Expand Up @@ -128,7 +131,7 @@ def put(self, item, block=True, timeout=None):
try:
self._queue.put(item, block, timeout)
except queue.Full:
pass # TODO: log data loss
logger.warning('Queue is full. Dropping telemetry.')

def puts(self, items, block=True, timeout=None):
if block and timeout is not None:
Expand Down