Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Refactor running processess metric #56

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Refactor running processess metric
  • Loading branch information
Aleksey Mikhaylov committed Dec 18, 2024
commit bc86aa8688470f92b1b3f4f2778fba275fff738e
19 changes: 11 additions & 8 deletions aqueduct/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from .metrics.collect import Collector, TasksStats
from .metrics.export import Exporter
from .metrics.manager import get_metrics_manager
from .metrics.processes import ProcessesStats
from .metrics.queue import TaskMetricsQueue
from .metrics.timer import timeit
from .multiprocessing import (
Expand Down Expand Up @@ -255,7 +254,7 @@ async def _check_memory_usage(self, sleep_sec: float = 1.):
nprocs_memory_sum = 0
for process in processes:
memory = process.memory_info().rss
nprocs_memory_sum += memory
nprocs_memory_sum += memory
metrics.add(flow_step_name, memory)
all_memory_usage += nprocs_memory_sum
if len(processes) != 1:
Expand All @@ -264,6 +263,14 @@ async def _check_memory_usage(self, sleep_sec: float = 1.):
self._metrics_manager.collector.add_memory_usage(metrics)
await asyncio.sleep(sleep_sec)

async def _count_processes(self, sleep_sec: float = 1.):
pod_name = os.environ.get('K8S_POD', 'local')
while self.state != FlowState.STOPPED:
metrics = MetricsItems()
metrics.add(f'running.{pod_name}', len(mp.active_children()))
self._metrics_manager.collector.add_processes_count(metrics)
await asyncio.sleep(sleep_sec)

def _run_steps(self, timeout: Optional[int]):
if len(self._steps) == 0:
log.info('Flow has zero steps -> do nothing')
Expand Down Expand Up @@ -331,7 +338,9 @@ def _run_tasks(self):
self._tasks.append(asyncio.ensure_future(self._check_is_alive()))

self._metrics_manager.start(queues_info=self._get_queues_info())

self._tasks.append(asyncio.ensure_future(self._check_memory_usage()))
self._tasks.append(asyncio.ensure_future(self._count_processes()))

def _get_queues_info(self) -> Dict[mp.Queue, str]:
"""Returns queues between Step handlers and its names.
Expand Down Expand Up @@ -412,20 +421,14 @@ async def _check_is_alive(self, sleep_sec: float = 1.):
If at least one process is not alive, it stops Flow.
"""
while self.state != FlowState.STOPPED:
processes_stats = ProcessesStats()
for handler, context in self._contexts.items():
for proc in context.processes:
if not proc.is_alive():
if self.is_running:
handler_name = handler.__class__.__name__
log.error('The process %s for %s handler is dead',
proc.pid, handler_name)
processes_stats.add_dead_process()
self._metrics_manager.collector.add_processes_stats(processes_stats)
await self.stop(graceful=False)
else:
processes_stats.add_running_process()
self._metrics_manager.collector.add_processes_stats(processes_stats)
await asyncio.sleep(sleep_sec)

@staticmethod
Expand Down
11 changes: 6 additions & 5 deletions aqueduct/metrics/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from . import IMetricsItems
from .base import MetricsItems, MetricsTypes
from .processes import ProcessesStats
from .task import TasksMetricsStorage


Expand All @@ -31,18 +30,20 @@ def __init__(self):
self.queue_sizes = MetricsItems()
self.tasks_stats = TasksStats()
self.memory_usage = MetricsItems()
self.processes_stats = ProcessesStats()
self.processes_count = MetricsItems()

def extend(self, storage: TasksMetricsStorage):
super().extend(storage)
if isinstance(storage, AqueductMetricsStorage):
self.queue_sizes.extend(storage.queue_sizes)
self.tasks_stats.extend(storage.tasks_stats)
self.processes_stats.extend(storage.processes_stats)

def extend_memory_usage(self, metrics: MetricsItems):
self.memory_usage.extend(metrics)

def extend_processes_count(self, metrics: MetricsItems):
self.processes_count.extend(metrics)


class Collector:
def __init__(self, collectible_metrics: Iterable[MetricsTypes] = None,
Expand Down Expand Up @@ -73,8 +74,8 @@ def add_tasks_stats(self, stats: TasksStats):
def add_memory_usage(self, metrics: MetricsItems):
self._metrics.extend_memory_usage(metrics)

def add_processes_stats(self, stats: ProcessesStats):
self._metrics.processes_stats.extend(stats)
def add_processes_count(self, metrics: MetricsItems):
self._metrics.extend_processes_count(metrics)

def extract_metrics(self) -> AqueductMetricsStorage:
metrics = self._metrics
Expand Down
5 changes: 2 additions & 3 deletions aqueduct/metrics/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ def export(self, metrics: AqueductMetricsStorage):
for name, memory_usage in metrics.memory_usage.items:
self.target.timing(f'{self.prefix}.{MEMORY_USAGE_PREFIX}.{name}', memory_usage)

for name, cnt in metrics.processes_stats.items:
if cnt > 0:
self.target.count(f'{self.prefix}.{PROCESSES_PREFIX}.{name}', cnt)
for name, processes_count in metrics.processes_count.items:
self.target.timing(f'{self.prefix}.{PROCESSES_PREFIX}.{name}', processes_count)


class DummyExporter(Exporter):
Expand Down
24 changes: 0 additions & 24 deletions aqueduct/metrics/processes.py

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
setup(
name='aqueduct',
packages=find_packages(),
version='1.11.7',
version='1.11.8',
license='MIT',
license_files='LICENSE.txt',
author='Data Science SWAT',
Expand Down
Loading