1
1
import logging
2
2
from typing import Callable , Dict , List , Optional , Tuple
3
+ from meta_memcache .metrics .base import BaseMetricsCollector , MetricDefinition
3
4
4
5
from meta_memcache_socket import (
5
6
RequestFlags ,
34
35
35
36
36
37
class DefaultExecutor :
38
+
37
39
def __init__ (
38
40
self ,
39
41
serializer : BaseSerializer ,
40
42
key_encoder_fn : Callable [[Key ], bytes ] = default_key_encoder ,
41
43
raise_on_server_error : bool = True ,
42
44
touch_ttl_to_consider_write_failure : Optional [int ] = 50 ,
45
+ metrics_collector : Optional [BaseMetricsCollector ] = None ,
43
46
) -> None :
44
47
self ._serializer = serializer
45
48
self ._key_encoder_fn = key_encoder_fn
46
49
self ._raise_on_server_error = raise_on_server_error
47
50
self ._touch_ttl_to_consider_write_failure = touch_ttl_to_consider_write_failure
48
51
self .on_write_failure = WriteFailureEvent ()
52
+ if metrics_collector :
53
+ labels = ("domain" ,)
54
+ metrics_collector .init_metrics (
55
+ namespace = "cache" ,
56
+ metrics = [
57
+ MetricDefinition ("read_hits" , "Number of hits" , labels ),
58
+ MetricDefinition ("read_misses" , "Number of misses" , labels ),
59
+ MetricDefinition ("read_bytes" , "Size of writes" , labels ),
60
+ MetricDefinition ("write_count" , "Number of writes" , labels ),
61
+ MetricDefinition ("write_bytes" , "Size of writes" , labels ),
62
+ ],
63
+ gauges = [],
64
+ )
65
+ self ._metrics = metrics_collector
49
66
50
67
def _build_cmd (
51
68
self ,
@@ -94,6 +111,36 @@ def _is_a_write_failure(
94
111
return True
95
112
return False
96
113
114
+ def _collect_metrics (
115
+ self ,
116
+ command : MetaCommand ,
117
+ key : Key ,
118
+ value_size : Optional [int ],
119
+ result : MemcacheResponse ,
120
+ ) -> None :
121
+ if not self ._metrics :
122
+ return None
123
+
124
+ try :
125
+ labels = {"domain" : key .domain or "default" }
126
+ if command == MetaCommand .META_GET :
127
+ if isinstance (result , Value ):
128
+ self ._metrics .metric_inc ("hits" , labels = labels )
129
+ if result .value :
130
+ self ._metrics .metric_inc (
131
+ "read_bytes" , value = len (result .value .value ), labels = labels
132
+ )
133
+ elif isinstance (result , Miss ):
134
+ self ._metrics .metric_inc ("misses" , labels = labels )
135
+ if command == MetaCommand .META_SET :
136
+ self ._metrics .metric_inc ("write_count" , labels = labels )
137
+ if value_size :
138
+ self ._metrics .metric_inc (
139
+ "write_bytes" , value = value_size , labels = labels
140
+ )
141
+ except Exception as e :
142
+ _log .exception (f"Error collecting metrics" )
143
+
97
144
def exec_on_pool (
98
145
self ,
99
146
pool : ConnectionPool ,
@@ -120,7 +167,11 @@ def exec_on_pool(
120
167
value = cmd_value ,
121
168
flags = flags ,
122
169
)
123
- return self ._conn_recv_response (conn , flags = flags )
170
+ result = self ._conn_recv_response (conn , flags = flags )
171
+ self ._collect_metrics (
172
+ command , key , len (cmd_value ) if cmd_value else None , result
173
+ )
174
+ return result
124
175
except Exception as e :
125
176
error = True
126
177
raise MemcacheServerError (pool .server , "Memcache error" ) from e
@@ -155,13 +206,15 @@ def exec_multi_on_pool( # noqa: C901
155
206
conn = pool .pop_connection ()
156
207
error = False
157
208
try :
158
- # with pool.get_connection() as conn:
209
+ value_sizes = {}
159
210
for key , value in key_values :
160
211
cmd_value , flags = (
161
212
(None , flags )
162
213
if value is None
163
214
else self ._prepare_serialized_value_and_flags (key , value , flags )
164
215
)
216
+ if cmd_value :
217
+ value_sizes [key ] = len (cmd_value )
165
218
166
219
self ._conn_send_cmd (
167
220
conn ,
@@ -170,8 +223,14 @@ def exec_multi_on_pool( # noqa: C901
170
223
value = cmd_value ,
171
224
flags = flags ,
172
225
)
173
- for key , _ in key_values :
226
+ for key , value in key_values :
174
227
results [key ] = self ._conn_recv_response (conn , flags = flags )
228
+ self ._collect_metrics (
229
+ command ,
230
+ key ,
231
+ value_sizes .get (key ) if value else None ,
232
+ results [key ],
233
+ )
175
234
except Exception as e :
176
235
error = True
177
236
raise MemcacheServerError (pool .server , "Memcache error" ) from e
0 commit comments