Skip to content

Commit ed75111

Browse files
Zuulopenstack-gerrit
Zuul
authored andcommitted
Merge "proxy: refactor error limiter to a class"
2 parents b013072 + 51730f1 commit ed75111

File tree

6 files changed

+289
-106
lines changed

6 files changed

+289
-106
lines changed

swift/common/error_limiter.py

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright (c) 2021 NVIDIA
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
# implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import collections
16+
from time import time
17+
18+
from swift.common.utils import node_to_string
19+
20+
21+
class ErrorLimiter(object):
22+
"""
23+
Tracks the number of errors that have occurred for nodes. A node will be
24+
considered to be error-limited for a given interval of time after it has
25+
accumulated more errors than a given limit.
26+
27+
:param suppression_interval: The number of seconds for which a node is
28+
error-limited once it has accumulated more than ``suppression_limit``
29+
errors. Should be a float value.
30+
:param suppression_limit: The number of errors that a node must accumulate
31+
before it is considered to be error-limited. Should be an int value.
32+
"""
33+
def __init__(self, suppression_interval, suppression_limit):
34+
self.suppression_interval = float(suppression_interval)
35+
self.suppression_limit = int(suppression_limit)
36+
self.stats = collections.defaultdict(dict)
37+
38+
def node_key(self, node):
39+
"""
40+
Get the key under which a node's error stats will be stored.
41+
42+
:param node: dictionary describing a node.
43+
:return: string key.
44+
"""
45+
return node_to_string(node)
46+
47+
def is_limited(self, node):
48+
"""
49+
Check if the node is currently error limited.
50+
51+
:param node: dictionary of node to check
52+
:returns: True if error limited, False otherwise
53+
"""
54+
now = time()
55+
node_key = self.node_key(node)
56+
error_stats = self.stats.get(node_key)
57+
58+
if error_stats is None or 'errors' not in error_stats:
59+
return False
60+
61+
if 'last_error' in error_stats and error_stats['last_error'] < \
62+
now - self.suppression_interval:
63+
self.stats.pop(node_key)
64+
return False
65+
return error_stats['errors'] > self.suppression_limit
66+
67+
def limit(self, node):
68+
"""
69+
Mark a node as error limited. This immediately pretends the
70+
node received enough errors to trigger error suppression. Use
71+
this for errors like Insufficient Storage. For other errors
72+
use :func:`increment`.
73+
74+
:param node: dictionary of node to error limit
75+
"""
76+
node_key = self.node_key(node)
77+
error_stats = self.stats[node_key]
78+
error_stats['errors'] = self.suppression_limit + 1
79+
error_stats['last_error'] = time()
80+
81+
def increment(self, node):
82+
"""
83+
Increment the error count and update the time of the last error for
84+
the given ``node``.
85+
86+
:param node: dictionary describing a node.
87+
"""
88+
node_key = self.node_key(node)
89+
error_stats = self.stats[node_key]
90+
error_stats['errors'] = error_stats.get('errors', 0) + 1
91+
error_stats['last_error'] = time()

swift/proxy/server.py

+10-30
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from swift.common.http import is_server_error
3232
from swift.common.storage_policy import POLICIES
3333
from swift.common.ring import Ring
34+
from swift.common.error_limiter import ErrorLimiter
3435
from swift.common.utils import Watchdog, get_logger, \
3536
get_remote_client, split_path, config_true_value, generate_trans_id, \
3637
affinity_key_function, affinity_locality_predicate, list_from_csv, \
@@ -204,7 +205,6 @@ def __init__(self, conf, logger=None, account_ring=None,
204205
statsd_tail_prefix='proxy-server')
205206
else:
206207
self.logger = logger
207-
self._error_limiting = {}
208208
self.backend_user_agent = 'proxy-server %s' % os.getpid()
209209

210210
swift_dir = conf.get('swift_dir', '/etc/swift')
@@ -218,10 +218,12 @@ def __init__(self, conf, logger=None, account_ring=None,
218218
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
219219
self.trans_id_suffix = conf.get('trans_id_suffix', '')
220220
self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))
221-
self.error_suppression_interval = \
221+
error_suppression_interval = \
222222
float(conf.get('error_suppression_interval', 60))
223-
self.error_suppression_limit = \
223+
error_suppression_limit = \
224224
int(conf.get('error_suppression_limit', 10))
225+
self.error_limiter = ErrorLimiter(error_suppression_interval,
226+
error_suppression_limit)
225227
self.recheck_container_existence = \
226228
int(conf.get('recheck_container_existence',
227229
DEFAULT_RECHECK_CONTAINER_EXISTENCE))
@@ -646,27 +648,14 @@ def set_node_timing(self, node, timing):
646648
timing = round(timing, 3) # sort timings to the millisecond
647649
self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
648650

649-
def _error_limit_node_key(self, node):
650-
return node_to_string(node)
651-
652651
def error_limited(self, node):
653652
"""
654653
Check if the node is currently error limited.
655654
656655
:param node: dictionary of node to check
657656
:returns: True if error limited, False otherwise
658657
"""
659-
now = time()
660-
node_key = self._error_limit_node_key(node)
661-
error_stats = self._error_limiting.get(node_key)
662-
663-
if error_stats is None or 'errors' not in error_stats:
664-
return False
665-
if 'last_error' in error_stats and error_stats['last_error'] < \
666-
now - self.error_suppression_interval:
667-
self._error_limiting.pop(node_key, None)
668-
return False
669-
limited = error_stats['errors'] > self.error_suppression_limit
658+
limited = self.error_limiter.is_limited(node)
670659
if limited:
671660
self.logger.debug(
672661
'Node error limited: %s', node_to_string(node))
@@ -677,32 +666,23 @@ def error_limit(self, node, msg):
677666
Mark a node as error limited. This immediately pretends the
678667
node received enough errors to trigger error suppression. Use
679668
this for errors like Insufficient Storage. For other errors
680-
use :func:`error_occurred`.
669+
use :func:`increment`.
681670
682671
:param node: dictionary of node to error limit
683672
:param msg: error message
684673
"""
685-
node_key = self._error_limit_node_key(node)
686-
error_stats = self._error_limiting.setdefault(node_key, {})
687-
error_stats['errors'] = self.error_suppression_limit + 1
688-
error_stats['last_error'] = time()
674+
self.error_limiter.limit(node)
689675
self.logger.error('%(msg)s %(node)s',
690676
{'msg': msg, 'node': node_to_string(node)})
691677

692-
def _incr_node_errors(self, node):
693-
node_key = self._error_limit_node_key(node)
694-
error_stats = self._error_limiting.setdefault(node_key, {})
695-
error_stats['errors'] = error_stats.get('errors', 0) + 1
696-
error_stats['last_error'] = time()
697-
698678
def error_occurred(self, node, msg):
699679
"""
700680
Handle logging, and handling of errors.
701681
702682
:param node: dictionary of node to handle errors for
703683
:param msg: error message
704684
"""
705-
self._incr_node_errors(node)
685+
self.error_limiter.increment(node)
706686
if isinstance(msg, bytes):
707687
msg = msg.decode('utf-8')
708688
self.logger.error('%(msg)s %(node)s',
@@ -721,7 +701,7 @@ def exception_occurred(self, node, typ, additional_info,
721701
:param typ: server type
722702
:param additional_info: additional information to log
723703
"""
724-
self._incr_node_errors(node)
704+
self.error_limiter.increment(node)
725705
if 'level' in kwargs:
726706
log = functools.partial(self.logger.log, kwargs.pop('level'))
727707
if 'exc_info' not in kwargs:
+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright (c) 2021 NVIDIA
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
# implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import unittest
16+
import mock
17+
from time import time
18+
19+
from swift.common.error_limiter import ErrorLimiter
20+
from test.unit import FakeRing
21+
22+
23+
class TestErrorLimiter(unittest.TestCase):
24+
def setUp(self):
25+
self.ring = FakeRing()
26+
27+
def test_init_config(self):
28+
config = {'suppression_interval': 100.9,
29+
'suppression_limit': 5}
30+
limiter = ErrorLimiter(**config)
31+
self.assertEqual(limiter.suppression_interval, 100.9)
32+
self.assertEqual(limiter.suppression_limit, 5)
33+
34+
config = {'suppression_interval': '100.9',
35+
'suppression_limit': '5'}
36+
limiter = ErrorLimiter(**config)
37+
self.assertEqual(limiter.suppression_interval, 100.9)
38+
self.assertEqual(limiter.suppression_limit, 5)
39+
40+
def test_init_bad_config(self):
41+
with self.assertRaises(ValueError):
42+
ErrorLimiter(suppression_interval='bad',
43+
suppression_limit=1)
44+
45+
with self.assertRaises(TypeError):
46+
ErrorLimiter(suppression_interval=None,
47+
suppression_limit=1)
48+
49+
with self.assertRaises(ValueError):
50+
ErrorLimiter(suppression_interval=0,
51+
suppression_limit='bad')
52+
53+
with self.assertRaises(TypeError):
54+
ErrorLimiter(suppression_interval=0,
55+
suppression_limit=None)
56+
57+
def test_is_limited(self):
58+
node = self.ring.devs[-1]
59+
limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10)
60+
61+
now = time()
62+
with mock.patch('swift.common.error_limiter.time', return_value=now):
63+
self.assertFalse(limiter.is_limited(node))
64+
limiter.limit(node)
65+
self.assertTrue(limiter.is_limited(node))
66+
node_key = limiter.node_key(node)
67+
self.assertEqual(limiter.stats.get(node_key),
68+
{'errors': limiter.suppression_limit + 1,
69+
'last_error': now})
70+
71+
def test_increment(self):
72+
node = self.ring.devs[-1]
73+
limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10)
74+
last_errors = 0
75+
node_key = limiter.node_key(node)
76+
for _ in range(limiter.suppression_limit):
77+
limiter.increment(node)
78+
node_errors = limiter.stats.get(node_key)
79+
self.assertGreater(node_errors['errors'], last_errors)
80+
self.assertFalse(limiter.is_limited(node))
81+
last_errors = node_errors['errors']
82+
83+
# One more to make sure it is > suppression_limit
84+
limiter.increment(node)
85+
node_errors = limiter.stats.get(node_key)
86+
self.assertEqual(limiter.suppression_limit + 1,
87+
node_errors['errors'])
88+
self.assertTrue(limiter.is_limited(node))
89+
last_time = node_errors['last_error']
90+
91+
# Simulate time with no errors have gone by.
92+
now = last_time + limiter.suppression_interval + 1
93+
with mock.patch('swift.common.error_limiter.time',
94+
return_value=now):
95+
self.assertFalse(limiter.is_limited(node))
96+
self.assertFalse(limiter.stats.get(node_key))
97+
98+
def test_node_key(self):
99+
limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10)
100+
node = self.ring.devs[0]
101+
expected = '%s:%s/%s' % (node['ip'], node['port'], node['device'])
102+
self.assertEqual(expected, limiter.node_key(node))

test/unit/proxy/controllers/test_container.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ def test_node_errors(self):
318318

319319
for method in ('PUT', 'DELETE', 'POST'):
320320
def test_status_map(statuses, expected):
321-
self.app._error_limiting = {}
321+
self.app.error_limiter.stats.clear()
322322
req = Request.blank('/v1/a/c', method=method)
323323
with mocked_http_conn(*statuses) as fake_conn:
324324
resp = req.get_response(self.app)
@@ -355,7 +355,7 @@ def test_status_map(statuses, expected):
355355
test_status_map(base_status[:2] + [507] + base_status[2:], 201)
356356
self.assertEqual(node_error_count(
357357
self.app, self.container_ring.devs[2]),
358-
self.app.error_suppression_limit + 1)
358+
self.app.error_limiter.suppression_limit + 1)
359359

360360
def test_response_codes_for_GET(self):
361361
nodes = self.app.container_ring.replicas

test/unit/proxy/controllers/test_obj.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,7 @@ def test_PUT_connect_exceptions(self):
12141214
self.app.sort_nodes = lambda n, *args, **kwargs: n # disable shuffle
12151215

12161216
def test_status_map(statuses, expected):
1217-
self.app._error_limiting = {}
1217+
self.app.error_limiter.stats.clear()
12181218
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
12191219
body=b'test body')
12201220
with set_http_connect(*statuses):
@@ -1249,7 +1249,7 @@ def test_status_map(statuses, expected):
12491249
test_status_map(((507, None), 201, 201, 201), 201)
12501250
self.assertEqual(
12511251
node_error_count(self.app, object_ring.devs[0]),
1252-
self.app.error_suppression_limit + 1)
1252+
self.app.error_limiter.suppression_limit + 1)
12531253
# response errors
12541254
test_status_map(((100, Timeout()), 201, 201), 201)
12551255
self.assertEqual(
@@ -1260,7 +1260,7 @@ def test_status_map(statuses, expected):
12601260
test_status_map((201, (100, 507), 201), 201)
12611261
self.assertEqual(
12621262
node_error_count(self.app, object_ring.devs[1]),
1263-
self.app.error_suppression_limit + 1)
1263+
self.app.error_limiter.suppression_limit + 1)
12641264

12651265
def test_PUT_connect_exception_with_unicode_path(self):
12661266
expected = 201
@@ -2385,7 +2385,13 @@ def make_key(r):
23852385
}
23862386
for r in log.requests[:4]
23872387
}
2388-
self.assertEqual(self.app._error_limiting, expected_error_limiting)
2388+
actual = {}
2389+
for n in self.app.get_object_ring(int(self.policy)).devs:
2390+
node_key = self.app.error_limiter.node_key(n)
2391+
stats = self.app.error_limiter.stats.get(node_key) or {}
2392+
if stats:
2393+
actual[self.app.error_limiter.node_key(n)] = stats
2394+
self.assertEqual(actual, expected_error_limiting)
23892395

23902396
def test_GET_not_found_when_404_newer(self):
23912397
# if proxy receives a 404, it keeps waiting for other connections until
@@ -2410,14 +2416,14 @@ def test_GET_primaries_error_during_rebalance(self):
24102416
with mocked_http_conn(*codes):
24112417
resp = req.get_response(self.app)
24122418
self.assertEqual(resp.status_int, 404)
2413-
self.app._error_limiting = {} # Reset error limiting
2419+
self.app.error_limiter.stats.clear() # Reset error limiting
24142420

24152421
# one more timeout is past the tipping point
24162422
codes[self.policy.object_ring.replica_count - 2] = Timeout()
24172423
with mocked_http_conn(*codes):
24182424
resp = req.get_response(self.app)
24192425
self.assertEqual(resp.status_int, 503)
2420-
self.app._error_limiting = {} # Reset error limiting
2426+
self.app.error_limiter.stats.clear() # Reset error limiting
24212427

24222428
# unless we have tombstones
24232429
with mocked_http_conn(*codes, headers={'X-Backend-Timestamp': '1'}):

0 commit comments

Comments
 (0)