1212from .samples import Sample
1313from .utils import floatToGoString
1414
15+ MP_METRIC_HELP = 'Multiprocess metric'
16+
1517
1618class MultiProcessCollector (object ):
1719 """Collector for files for multi-process mode."""
@@ -33,18 +35,31 @@ def merge(files, accumulate=True):
3335 But if writing the merged data back to mmap files, use
3436 accumulate=False to avoid compound accumulation.
3537 """
38+ metrics = MultiProcessCollector ._read_metrics (files )
39+ return MultiProcessCollector ._accumulate_metrics (metrics , accumulate )
40+
41+ @staticmethod
42+ def _read_metrics (files ):
3643 metrics = {}
44+ key_cache = {}
45+
46+ def _parse_key (key ):
47+ val = key_cache .get (key )
48+ if not val :
49+ metric_name , name , labels = json .loads (key )
50+ labels_key = tuple (sorted (labels .items ()))
51+ val = key_cache [key ] = (metric_name , name , labels , labels_key )
52+ return val
53+
3754 for f in files :
3855 parts = os .path .basename (f ).split ('_' )
3956 typ = parts [0 ]
40- d = MmapedDict (f , read_mode = True )
41- for key , value in d .read_all_values ():
42- metric_name , name , labels = json .loads (key )
43- labels_key = tuple (sorted (labels .items ()))
57+ for key , value , pos in MmapedDict .read_all_values_from_file (f ):
58+ metric_name , name , labels , labels_key = _parse_key (key )
4459
4560 metric = metrics .get (metric_name )
4661 if metric is None :
47- metric = Metric (metric_name , 'Multiprocess metric' , typ )
62+ metric = Metric (metric_name , MP_METRIC_HELP , typ )
4863 metrics [metric_name ] = metric
4964
5065 if typ == 'gauge' :
@@ -54,43 +69,47 @@ def merge(files, accumulate=True):
5469 else :
5570 # The duplicates and labels are fixed in the next for.
5671 metric .add_sample (name , labels_key , value )
57- d . close ()
72+ return metrics
5873
74+ @staticmethod
75+ def _accumulate_metrics (metrics , accumulate ):
5976 for metric in metrics .values ():
6077 samples = defaultdict (float )
61- buckets = {}
78+ buckets = defaultdict (lambda : defaultdict (float ))
79+ samples_setdefault = samples .setdefault
6280 for s in metric .samples :
63- name , labels , value = s . name , s . labels , s . value
81+ name , labels , value , timestamp , exemplar = s
6482 if metric .type == 'gauge' :
65- without_pid = tuple (l for l in labels if l [0 ] != 'pid' )
83+ without_pid_key = ( name , tuple ([ l for l in labels if l [0 ] != 'pid' ]) )
6684 if metric ._multiprocess_mode == 'min' :
67- current = samples . setdefault (( name , without_pid ) , value )
85+ current = samples_setdefault ( without_pid_key , value )
6886 if value < current :
69- samples [( s . name , without_pid ) ] = value
87+ samples [without_pid_key ] = value
7088 elif metric ._multiprocess_mode == 'max' :
71- current = samples . setdefault (( name , without_pid ) , value )
89+ current = samples_setdefault ( without_pid_key , value )
7290 if value > current :
73- samples [( s . name , without_pid ) ] = value
91+ samples [without_pid_key ] = value
7492 elif metric ._multiprocess_mode == 'livesum' :
75- samples [( name , without_pid ) ] += value
93+ samples [without_pid_key ] += value
7694 else : # all/liveall
7795 samples [(name , labels )] = value
7896
7997 elif metric .type == 'histogram' :
80- bucket = tuple (float (l [1 ]) for l in labels if l [0 ] == 'le' )
81- if bucket :
82- # _bucket
83- without_le = tuple (l for l in labels if l [0 ] != 'le' )
84- buckets .setdefault (without_le , {})
85- buckets [without_le ].setdefault (bucket [0 ], 0.0 )
86- buckets [without_le ][bucket [0 ]] += value
87- else :
98+ # A for loop with early exit is faster than a genexpr
99+ # or a listcomp that ends up building unnecessary things
100+ for l in labels :
101+ if l [0 ] == 'le' :
102+ bucket_value = float (l [1 ])
103+ # _bucket
104+ without_le = tuple (l for l in labels if l [0 ] != 'le' )
105+ buckets [without_le ][bucket_value ] += value
106+ break
107+ else : # did not find the `le` key
88108 # _sum/_count
89- samples [(s .name , labels )] += value
90-
109+ samples [(name , labels )] += value
91110 else :
92111 # Counter and Summary.
93- samples [(s . name , labels )] += value
112+ samples [(name , labels )] += value
94113
95114 # Accumulate bucket values.
96115 if metric .type == 'histogram' :
0 commit comments