-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_worker.py
More file actions
111 lines (82 loc) · 3.06 KB
/
test_worker.py
File metadata and controls
111 lines (82 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import threading
import time
from queue import Queue
import requests
from verdin.datasource import Datasource
from verdin.worker import QueuingDatasourceAppender
class QueueingDatasource(Datasource):
def __init__(self, name, queue=None):
super().__init__(name, None)
self.queue = queue or Queue()
def append(self, records) -> requests.Response:
if records:
self.queue.put(records)
response = requests.Response()
response.status_code = 200
return response
class TestQueuingDatasourceAppender:
def test_batching(self):
source = Queue()
destination = QueueingDatasource("datasource")
appender = QueuingDatasourceAppender(source, destination)
appender.min_interval = 0
source.put(("a", 1))
source.put(("b", 2))
source.put(("c", 3))
thread = threading.Thread(target=appender.run)
thread.start()
batch = destination.queue.get(timeout=1)
assert len(batch) == 3
assert batch[0] == ("a", 1)
assert batch[1] == ("b", 2)
assert batch[2] == ("c", 3)
source.put(("d", 4))
batch = destination.queue.get(timeout=1)
assert len(batch) == 1
assert batch[0] == ("d", 4)
appender.close()
thread.join(timeout=2)
assert appender.stopped.is_set()
def test_stop_while_running(self):
# instrument the queue
source = Queue()
destination = QueueingDatasource("datasource")
appender = QueuingDatasourceAppender(source, destination)
appender.min_interval = 0
thread = threading.Thread(target=appender.run)
thread.start()
time.sleep(0.2)
appender.close()
thread.join(timeout=2)
assert appender.stopped.is_set()
def test_retry(self):
class MockQueueingDatasource(QueueingDatasource):
first_call = True
def append(self, records) -> requests.Response:
if self.first_call:
self.first_call = False
response = requests.Response()
response.status_code = 429
response.headers["Retry-After"] = "1"
return response
return super().append(records)
source = Queue()
destination = MockQueueingDatasource("datasource")
appender = QueuingDatasourceAppender(source, destination)
appender.min_interval = 0
appender.wait_after_rate_limit = 0.5
source.put(("a", 1))
source.put(("b", 2))
thread = threading.Thread(target=appender.run)
thread.start()
time.sleep(0.5)
# should not be batched because we're still retrying with the previous batch
source.put(("c", 3))
batch = destination.queue.get(timeout=5)
assert len(batch) == 2
batch = destination.queue.get(timeout=5)
assert len(batch) == 1
assert batch[0] == ("c", 3)
appender.close()
thread.join(timeout=5)
assert appender.stopped.is_set()