-
Notifications
You must be signed in to change notification settings - Fork 290
/
_driver.py
183 lines (146 loc) · 6.06 KB
/
_driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Copyright 2015 ClusterHQ Inc. See LICENSE file for details.
"""
Driver for the control service benchmarks.
"""
from eliot import start_action
from eliot.twisted import DeferredContext
from twisted.internet.task import cooperate
from twisted.internet.defer import Deferred, logError, maybeDeferred
from flocker.common import gather_deferreds
def bypass(result, func, *args, **kw):
"""
Perform the function, but fire with the result from the previous Deferred.
:param result: Value with which to fire returned Deferred.
:param func: Function to call. This function has its return value ignored,
except that if it returns a Deferred, wait for the Deferred to fire
and ignore that result.
:param args: Postional arguments to function ``func``.
:param kw: Keyword arguments to function ``func``.
:return: a Deferred that fires with ``result``.
"""
d = maybeDeferred(func, *args, **kw)
d.addErrback(logError)
d.addBoth(lambda ignored: result)
return d
def sample(operation, metric, name):
"""
Perform sampling of the operation.
:param IOperation operation: An operation to perform.
:param IMetric metric: A quantity to measure.
:param int name: Identifier for individual sample.
:return: Deferred firing with a sample. A sample is a dictionary
containing a ``success`` boolean. If ``success is True``, the
dictionary also contains a ``value`` for the sample measurement.
If ``success is False``, the dictionary also contains a
``reason`` for failure.
"""
with start_action(action_type=u'flocker:benchmark:sample', sample=name):
sampling = DeferredContext(maybeDeferred(operation.get_probe))
def run_probe(probe):
probing = metric.measure(probe.run)
probing.addCallback(
lambda measurement: dict(success=True, value=measurement)
)
probing.addCallback(bypass, probe.cleanup)
return probing
sampling.addCallback(run_probe)
# Convert an error running the probe into a failed sample.
def convert_to_result(failure):
return dict(success=False, reason=failure.getTraceback())
sampling.addErrback(convert_to_result)
return sampling.addActionFinish()
def benchmark(scenario, operation, metric, num_samples):
"""
Perform benchmarking of the operation within a scenario.
:param IScenario scenario: A load scenario.
:param IOperation operation: An operation to perform.
:param IMetric metric: A quantity to measure.
:param int num_samples: Number of samples to take.
:return: Deferred firing with a tuple containing one list of
benchmark samples and one scenario metrics result. See the
``sample`` function for the structure of the samples. The
scenario metrics are a dictionary containing information about
the scenario.
"""
scenario_established = scenario.start()
samples = []
def collect_samples(ignored):
collecting = Deferred()
task = cooperate(
sample(operation, metric, i).addCallback(samples.append)
for i in range(num_samples))
# If the scenario collapses, stop sampling
def stop_sampling_on_scenario_collapse(failure):
task.stop()
collecting.errback(failure)
scenario.maintained().addErrback(stop_sampling_on_scenario_collapse)
# Leaving the errback unhandled makes tests fail
task.whenDone().addCallbacks(
lambda ignored: collecting.callback(samples),
lambda ignored: None)
return collecting
benchmarking = scenario_established.addCallback(collect_samples)
def stop_scenario(samples):
d = scenario.stop()
def combine_results(scenario_metrics):
return (samples, scenario_metrics)
d.addCallback(combine_results)
return d
benchmarking.addCallbacks(
stop_scenario,
bypass, errbackArgs=[scenario.stop]
)
return benchmarking
def driver(
reactor, cluster, scenario_factory, operation_factory, metric_factory,
num_samples, result, output
):
"""
:param reactor: Reactor to use.
:param BenchmarkCluster cluster: Benchmark cluster.
:param callable scenario_factory: A load scenario factory.
:param callable operation_factory: An operation factory.
:param callable metric_factory: A metric factory.
:param int num_samples: Number of samples to take.
:param result: A dictionary which will be updated with values to
create a JSON result.
:param output: A callable to receive the JSON structure, for
printing or storage.
"""
control_service = cluster.get_control_service(reactor)
d = gather_deferreds([
control_service.version(),
control_service.list_nodes(),
control_service.list_containers_configuration(),
control_service.list_datasets_configuration(),
])
def add_control_service(characteristics, result):
version = characteristics[0]
node_count = len(characteristics[1])
container_count = len(characteristics[2])
dataset_count = len(characteristics[3].datasets)
result['control_service'] = dict(
host=cluster.control_node_address().compressed,
flocker_version=version[u"flocker"],
node_count=node_count,
container_count=container_count,
dataset_count=dataset_count,
)
d.addCallback(add_control_service, result)
def run_benchmark(ignored):
return benchmark(
scenario_factory(reactor, cluster),
operation_factory(reactor, cluster),
metric_factory(reactor, cluster),
num_samples,
)
d.addCallback(run_benchmark)
def add_samples(outputs, result):
samples, scenario_metrics = outputs
result['samples'] = samples
if scenario_metrics:
result['scenario']['metrics'] = scenario_metrics
return result
d.addCallback(add_samples, result)
d.addCallback(output)
return d