Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 0 additions & 127 deletions prometheus_client/multiprocess.py

This file was deleted.

46 changes: 46 additions & 0 deletions prometheus_client/multiprocess/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import unicode_literals

import os

multiprocess_enabled = bool(os.environ.get('prometheus_multiproc_dir'))
multiprocess_backend = (os.environ.get('prometheus_multiproc_backend') or 'mmap')


def MultiProcessCollector(registry, path=None):
if path is None:
path = os.environ.get('prometheus_multiproc_dir')
if not path or not os.path.isdir(path):
raise ValueError('env prometheus_multiproc_dir is not set or %s is not a directory' % path)
if multiprocess_backend == 'mmap':
from prometheus_client.multiprocess.mmap_collector import MmapMultiProcessCollector
return MmapMultiProcessCollector(registry=registry, path=path)
elif multiprocess_backend == 'sqlite':
from prometheus_client.multiprocess.sqlite import SqliteMultiProcessCollector
return SqliteMultiProcessCollector(registry=registry, path=path)
else:
raise NotImplementedError('unknown multiprocess backend %s' % multiprocess_backend)


def get_multiprocess_value_class():
if multiprocess_backend == 'mmap':
from prometheus_client.multiprocess.mmaped_value import MmapedValue
return MmapedValue()
elif multiprocess_backend == 'sqlite':
from prometheus_client.multiprocess.sqlite import SqliteValue
return SqliteValue()
else:
raise NotImplementedError('unknown multiprocess backend %s' % multiprocess_backend)


def mark_process_dead(pid, path=None):
"""Do bookkeeping for when one process dies in a multi-process setup."""
if path is None:
path = os.environ.get('prometheus_multiproc_dir')
if multiprocess_backend == 'mmap':
from prometheus_client.multiprocess.mmap_utils import mmap_cleanup
mmap_cleanup(path, pid)
elif multiprocess_backend == 'sqlite':
from prometheus_client.multiprocess.sqlite import sqlite_cleanup
sqlite_cleanup(path, pid)
else:
raise NotImplementedError('unknown multiprocess backend %s' % multiprocess_backend)
79 changes: 79 additions & 0 deletions prometheus_client/multiprocess/collector_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from collections import defaultdict

from prometheus_client.metrics_core import Metric
from prometheus_client.samples import Sample
from prometheus_client.utils import floatToGoString


def populate_metrics(metrics, pid, metric_name, name, labels, multiprocess_mode, type, value):
labels_key = tuple(sorted(labels.items()))
metric = metrics.get(metric_name)
if metric is None:
metric = Metric(metric_name, 'Multiprocess metric', type)
metrics[metric_name] = metric
if type == 'gauge':
metric._multiprocess_mode = multiprocess_mode
metric.add_sample(name, labels_key + (('pid', pid),), value)
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, labels_key, value)


def postprocess_metrics(metrics, accumulate=True):
for metric in metrics.values():
metric.samples = _postprocess_metric(metric, accumulate=accumulate)


def _postprocess_metric(metric, accumulate=True):
samples = defaultdict(float)
buckets = {}
for s in metric.samples:
name, labels, value = s.name, s.labels, s.value
if metric.type == 'gauge':
without_pid = tuple(l for l in labels if l[0] != 'pid')
if metric._multiprocess_mode == 'min':
current = samples.setdefault((name, without_pid), value)
if value < current:
samples[(s.name, without_pid)] = value
elif metric._multiprocess_mode == 'max':
current = samples.setdefault((name, without_pid), value)
if value > current:
samples[(s.name, without_pid)] = value
elif metric._multiprocess_mode == 'livesum':
samples[(name, without_pid)] += value
else: # all/liveall
samples[(name, labels)] = value

elif metric.type == 'histogram':
bucket = tuple(float(l[1]) for l in labels if l[0] == 'le')
if bucket:
# _bucket
without_le = tuple(l for l in labels if l[0] != 'le')
buckets.setdefault(without_le, {})
buckets[without_le].setdefault(bucket[0], 0.0)
buckets[without_le][bucket[0]] += value
else:
# _sum/_count
samples[(s.name, labels)] += value

else:
# Counter and Summary.
samples[(s.name, labels)] += value
# Accumulate bucket values.
if metric.type == 'histogram':
for labels, values in buckets.items():
acc = 0.0
for bucket, value in sorted(values.items()):
sample_key = (
metric.name + '_bucket',
labels + (('le', floatToGoString(bucket)),),
)
if accumulate:
acc += value
samples[sample_key] = acc
else:
samples[sample_key] = value
if accumulate:
samples[(metric.name + '_count', labels)] = acc
# Convert to correct sample format.
return [Sample(name, dict(labels), value) for (name, labels), value in samples.items()]
51 changes: 51 additions & 0 deletions prometheus_client/multiprocess/mmap_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import glob
import json
import os

from prometheus_client.multiprocess.collector_utils import populate_metrics, postprocess_metrics
from prometheus_client.multiprocess.mmap_dict import MmapedDict


class MmapMultiProcessCollector(object):
"""Collector for files for mmap multi-process mode."""

def __init__(self, registry, path):
self._path = path
if registry:
registry.register(self)

def collect(self):
files = glob.glob(os.path.join(self._path, '*.db'))
return self.merge(files, accumulate=True)

def merge(self, files, accumulate=True):
"""Merge metrics from given mmap files.

By default, histograms are accumulated, as per prometheus wire format.
But if writing the merged data back to mmap files, use
accumulate=False to avoid compound accumulation.
"""
metrics = {}
for f in files:
parts = os.path.basename(f).split('_') # e.g. gauge_liveall_1234.db, counter_1234.db, histogram_1234.db
typ = parts.pop(0) # grab type (remaining e.g. ['liveall', '1234.db'] or ['1234.db'])
pid = parts.pop(-1)[:-3] # grab pid off end (remaining e.g. ['liveall'] or [])
multiprocess_mode = (parts.pop(0) if parts else None) # must be the remaining multiprocess mode bit
assert not parts # make sure nothing was unread
d = MmapedDict(f, read_mode=True)
for key, value in d.read_all_values():
metric_name, name, labels = json.loads(key)
populate_metrics(
metrics,
pid=pid,
metric_name=metric_name,
name=name,
labels=labels,
multiprocess_mode=multiprocess_mode,
type=typ,
value=value,
)
d.close()

postprocess_metrics(metrics, accumulate)
return metrics.values()
9 changes: 9 additions & 0 deletions prometheus_client/multiprocess/mmap_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import glob
import os


def mmap_cleanup(path, pid):
for f in glob.glob(os.path.join(path, 'gauge_livesum_{0}.db'.format(pid))):
os.remove(f)
for f in glob.glob(os.path.join(path, 'gauge_liveall_{0}.db'.format(pid))):
os.remove(f)
72 changes: 72 additions & 0 deletions prometheus_client/multiprocess/mmaped_value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
from threading import Lock

from prometheus_client.multiprocess.mmap_dict import MmapedDict, mmap_key


def MmapedValue(_pidFunc=os.getpid):
files = {}
values = []
pid = {'value': _pidFunc()}
# Use a single global lock when in multi-processing mode
# as we presume this means there is no threading going on.
# This avoids the need to also have mutexes in __MmapDict.
lock = Lock()

class MmapedValue(object):
'''A float protected by a mutex backed by a per-process mmaped file.'''

_multiprocess = True

def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
self._params = typ, metric_name, name, labelnames, labelvalues, multiprocess_mode
with lock:
self.__check_for_pid_change()
self.__reset()
values.append(self)

def __reset(self):
typ, metric_name, name, labelnames, labelvalues, multiprocess_mode = self._params
if typ == 'gauge':
file_prefix = typ + '_' + multiprocess_mode
else:
file_prefix = typ
if file_prefix not in files:
filename = os.path.join(
os.environ['prometheus_multiproc_dir'],
'{0}_{1}.db'.format(file_prefix, pid['value']))

files[file_prefix] = MmapedDict(filename)
self._file = files[file_prefix]
self._key = mmap_key(metric_name, name, labelnames, labelvalues)
self._value = self._file.read_value(self._key)

def __check_for_pid_change(self):
actual_pid = _pidFunc()
if pid['value'] != actual_pid:
pid['value'] = actual_pid
# There has been a fork(), reset all the values.
for f in files.values():
f.close()
files.clear()
for value in values:
value.__reset()

def inc(self, amount):
with lock:
self.__check_for_pid_change()
self._value += amount
self._file.write_value(self._key, self._value)

def set(self, value):
with lock:
self.__check_for_pid_change()
self._value = value
self._file.write_value(self._key, self._value)

def get(self):
with lock:
self.__check_for_pid_change()
return self._value

return MmapedValue
Loading