Skip to content

Commit

Permalink
refactor(rcm): remove code duplication in subscriber service (#7535)
Browse files Browse the repository at this point in the history
We remove some code duplication from the subscriber service class and
reuse the logic already implemented for the periodic service class.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist

- [ ] Title is accurate.
- [ ] No unnecessary changes are introduced.
- [ ] Description motivates each change.
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [ ] Testing strategy adequately addresses listed risk(s).
- [ ] Change is maintainable (easy to change, telemetry, documentation).
- [ ] Release note makes sense to a user of the library.
- [ ] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [ ] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
- [ ] If this PR touches code that signs or publishes builds or
packages, or handles credentials of any kind, I've requested a review
from `@DataDog/security-design-and-guidance`.
- [ ] This PR doesn't touch any of that.

Co-authored-by: Munir Abdinur <munir.abdinur@datadoghq.com>
  • Loading branch information
P403n1x87 and mabdinur authored Nov 14, 2023
1 parent a45b659 commit c0b6911
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 71 deletions.
75 changes: 18 additions & 57 deletions ddtrace/internal/remoteconfig/_subscribers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
import time
from typing import TYPE_CHECKING

from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import PeriodicThread
from ddtrace.internal.periodic import PeriodicService
from ddtrace.internal.remoteconfig.utils import get_poll_interval_seconds


Expand All @@ -19,80 +18,42 @@
log = get_logger(__name__)


class RemoteConfigSubscriber(object):
_th_worker = None

class RemoteConfigSubscriber(PeriodicService):
def __init__(self, data_connector, callback, name):
# type: (PublisherSubscriberConnector, Callable, str) -> None
super().__init__(get_poll_interval_seconds() / 2)

self._data_connector = data_connector
self.is_running = False
self._callback = callback
self._name = name
log.debug("[%s] Subscriber %s init", os.getpid(), self._name)
self.interval = get_poll_interval_seconds() / 2

log.debug("[PID %d] %s initialized", os.getpid(), self)

def _exec_callback(self, data, test_tracer=None):
# type: (SharedDataType, Optional[Tracer]) -> None
if data:
log.debug("[%s] Subscriber %s _exec_callback: %s", os.getpid(), self._name, str(data)[:50])
log.debug("[PID %d] %s _exec_callback: %s", os.getpid(), self, str(data)[:50])
self._callback(data, test_tracer=test_tracer)

def _get_data_from_connector_and_exec(self, test_tracer=None):
# type: (Optional[Tracer]) -> None
data = self._data_connector.read()
self._exec_callback(data, test_tracer=test_tracer)

def _worker(self):
self.is_running = True
def periodic(self):
try:
log.debug("[PID %d | PPID %d] %s is getting data", os.getpid(), os.getppid(), self)
self._get_data_from_connector_and_exec()
log.debug("[PID %d | PPID %d] %s got data", os.getpid(), os.getppid(), self)
except Exception:
log.debug("[%s][P: %s] Subscriber %s get an error", os.getpid(), os.getppid(), self._name, exc_info=True)
time.sleep(self.interval)

def start(self):
if not self.is_running and self._th_worker is None:
log.debug("[%s][P: %s] Subscriber %s starts %s", os.getpid(), os.getppid(), self._name, self.is_running)
self._th_worker = PeriodicThread(
target=self._worker,
interval=self.interval,
on_shutdown=self.stop,
name="%s:%s" % (self.__class__.__module__, self.__class__.__name__),
)
self._th_worker.start()
else:
is_alive = False
if self._th_worker is not None:
is_alive = self._th_worker.is_alive()
log.debug(
"[%s][P: %s] Subscriber %s is trying to start but the subscriber status is %s "
"and subscriber thread is %s (is alive: %s)",
os.getpid(),
os.getppid(),
self._name,
self.is_running,
self._th_worker,
is_alive,
)
log.error("[PID %d | PPID %d] %s while getting data", os.getpid(), os.getppid(), self, exc_info=True)

def force_restart(self, join):
self.is_running = False
self.stop(join)
log.debug(
"[%s][P: %s] Subscriber %s worker restarts. Status: %s",
os.getpid(),
os.getppid(),
self._name,
self.is_running,
)
def force_restart(self, join=False):
self.stop()
if join:
self.join()
self.start()
log.debug("[PID %d | PPID %d] %s restarted", os.getpid(), os.getppid(), self)

def stop(self, join=False):
# type: (bool) -> None
if self._th_worker:
self.is_running = False
self._th_worker.stop()
if join:
self._th_worker.join()
log.debug("[%s][P: %s] Subscriber %s. Stopped", os.getpid(), os.getppid(), self._name)
self._th_worker = None
def __str__(self):
return f"Subscriber {self._name}"
3 changes: 2 additions & 1 deletion ddtrace/internal/remoteconfig/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig.constants import REMOTE_CONFIG_AGENT_ENDPOINT
from ddtrace.internal.runtime import container
from ddtrace.internal.service import ServiceStatus
from ddtrace.internal.utils.time import parse_isoformat

from ..utils.formats import parse_tags_str
Expand Down Expand Up @@ -254,7 +255,7 @@ def get_pubsubs(self):
def is_subscriber_running(self, pubsub_to_check):
# type: (PubSub) -> bool
for pubsub in self.get_pubsubs():
if pubsub_to_check._subscriber is pubsub._subscriber and pubsub._subscriber.is_running:
if pubsub_to_check._subscriber is pubsub._subscriber and pubsub._subscriber.status == ServiceStatus.RUNNING:
return True
return False

Expand Down
3 changes: 1 addition & 2 deletions ddtrace/internal/remoteconfig/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ def __enter__(self):

def __exit__(self, *args):
# type: (...) -> None
self.stop_subscribers(True)
self.disable()
self.disable(join=True)


remoteconfig_poller = RemoteConfigPoller()
6 changes: 5 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ddtrace.internal.compat import parse
from ddtrace.internal.remoteconfig.client import RemoteConfigClient
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.telemetry import TelemetryWriter
from ddtrace.internal.utils.formats import parse_tags_str
from tests import utils
Expand Down Expand Up @@ -373,7 +374,10 @@ def _stop_remote_config_worker():

@pytest.fixture
def remote_config_worker():
remoteconfig_poller.disable(join=True)
try:
remoteconfig_poller.disable(join=True)
except ServiceStatusError:
pass
remoteconfig_poller._client = RemoteConfigClient()
try:
yield
Expand Down
7 changes: 5 additions & 2 deletions tests/debugging/probe/test_remoteconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ def remove_probes(self, *probe_ids):
class SyncProbeRCAdapter(ProbeRCAdapter):
def __init__(self, *args, **kwargs):
super(SyncProbeRCAdapter, self).__init__(*args, **kwargs)
# Prevent the worker thread from starting. We call methods manually.
self._subscriber.is_running = True
# Make the subscriber worker thread a no-op. We call methods manually.
self._subscriber.periodic = self.periodic

def periodic(self):
pass


def config_metadata(config_id=None):
Expand Down
4 changes: 0 additions & 4 deletions tests/internal/remoteconfig/test_remoteconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ def _reload_features(self, features, test_tracer=None):
mock_pubsub = RCMockPubSub(None, callback._reload_features)
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)

mock_pubsub.start_subscriber()
rc._online()
mock_send_request.assert_called()
sleep(0.5)
Expand Down Expand Up @@ -263,7 +262,6 @@ def _reload_features(self, features, test_tracer=None):
with RemoteConfigPoller() as rc:
mock_pubsub = RCMockPubSub(None, callback._reload_features)
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)
mock_pubsub.start_subscriber()
rc._online()
mock_send_request.assert_called_once()
sleep(0.5)
Expand Down Expand Up @@ -353,7 +351,6 @@ def _reload_features(self, features, test_tracer=None):
mock_send_request.return_value = get_mock_encoded_msg_with_signed_errors(msg, path, signed_errors)
mock_pubsub = RCMockPubSub(None, callback._reload_features)
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)
mock_pubsub.start_subscriber()
rc._online()
mock_send_request.assert_called()
sleep(0.5)
Expand All @@ -376,7 +373,6 @@ def _reload_features(self, features, test_tracer=None):
with RemoteConfigPoller() as rc:
mock_pubsub = RCMockPubSub(None, callback._reload_features)
rc.register(ASM_FEATURES_PRODUCT, mock_pubsub)
mock_pubsub.start_subscriber()
for _ in range(0, 2):
msg = b'{"asm":{"enabled":true}}'
expires_date = datetime.datetime.strftime(
Expand Down
9 changes: 5 additions & 4 deletions tests/internal/remoteconfig/test_remoteconfig_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import mock

from ddtrace.internal.remoteconfig._subscribers import RemoteConfigSubscriber
from ddtrace.internal.service import ServiceStatus
from tests.internal.remoteconfig.utils import MockConnector
from tests.utils import override_global_config

Expand All @@ -12,12 +13,12 @@ def test_subscriber_thread():
with override_global_config(dict(_remote_config_poll_interval=0.1)):
mock_callback = mock.MagicMock()
subscriber = RemoteConfigSubscriber(MockConnector({"example": "data"}), mock_callback, "TEST_DATA")
assert not subscriber.is_running
assert subscriber.status is ServiceStatus.STOPPED

subscriber.start()
sleep(0.15)
assert subscriber.is_running
assert subscriber.status is ServiceStatus.RUNNING
mock_callback.assert_called_with({"example": "data"}, test_tracer=None)

subscriber.stop()
assert not subscriber.is_running
subscriber.stop(join=True)
assert subscriber.status is ServiceStatus.STOPPED

0 comments on commit c0b6911

Please sign in to comment.