forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfailure_detector.hh
219 lines (171 loc) · 6.25 KB
/
failure_detector.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "unimplemented.hh"
#include "db_clock.hh"
#include "core/sstring.hh"
#include "core/shared_ptr.hh"
#include "core/distributed.hh"
#include "utils/bounded_stats_deque.hh"
#include "gms/i_failure_detector.hh"
#include <iostream>
#include <cmath>
#include <list>
#include <map>
namespace gms {
class inet_address;
class i_failure_detection_event_listener;
class endpoint_state;
class failure_detector_helper {
public:
static long get_initial_value();
static long INITIAL_VALUE_NANOS();
};
class arrival_window {
private:
long _tlast = 0;
utils::bounded_stats_deque _arrival_intervals;
// this is useless except to provide backwards compatibility in phi_convict_threshold,
// because everyone seems pretty accustomed to the default of 8, and users who have
// already tuned their phi_convict_threshold for their own environments won't need to
// change.
static constexpr double PHI_FACTOR{1.0 / std::log(10.0)};
// in the event of a long partition, never record an interval longer than the rpc timeout,
// since if a host is regularly experiencing connectivity problems lasting this long we'd
// rather mark it down quickly instead of adapting
// this value defaults to the same initial value the FD is seeded with
long MAX_INTERVAL_IN_NANO = get_max_interval();
public:
arrival_window(int size)
: _arrival_intervals(size) {
}
static long get_max_interval();
void add(long value);
double mean();
// see CASSANDRA-2597 for an explanation of the math at work here.
double phi(long tnow);
friend std::ostream& operator<<(std::ostream& os, const arrival_window& w);
};
/**
* This FailureDetector is an implementation of the paper titled
* "The Phi Accrual Failure Detector" by Hayashibara.
* Check the paper and the <i>IFailureDetector</i> interface for details.
*/
class failure_detector : public i_failure_detector {
private:
static constexpr int SAMPLE_SIZE = 1000;
// this is useless except to provide backwards compatibility in phi_convict_threshold,
// because everyone seems pretty accustomed to the default of 8, and users who have
// already tuned their phi_convict_threshold for their own environments won't need to
// change.
static constexpr double PHI_FACTOR{1.0 / std::log(10.0)}; // 0.434...
std::map<inet_address, arrival_window> _arrival_samples;
std::list<i_failure_detection_event_listener*> _fd_evnt_listeners;
public:
failure_detector() {
}
future<> stop() {
return make_ready_future<>();
}
sstring get_all_endpoint_states();
std::map<sstring, sstring> get_simple_states();
int get_down_endpoint_count();
int get_up_endpoint_count();
sstring get_endpoint_state(sstring address);
private:
void append_endpoint_state(std::stringstream& ss, endpoint_state& state);
public:
/**
* Dump the inter arrival times for examination if necessary.
*/
#if 0
void dumpInterArrivalTimes() {
File file = FileUtils.createTempFile("failuredetector-", ".dat");
OutputStream os = null;
try
{
os = new BufferedOutputStream(new FileOutputStream(file, true));
os.write(toString().getBytes());
}
catch (IOException e)
{
throw new FSWriteError(e, file);
}
finally
{
FileUtils.closeQuietly(os);
}
}
#endif
void set_phi_convict_threshold(double phi);
double get_phi_convict_threshold();
bool is_alive(inet_address ep);
void report(inet_address ep);
void interpret(inet_address ep);
void force_conviction(inet_address ep);
void remove(inet_address ep);
void register_failure_detection_event_listener(i_failure_detection_event_listener* listener);
void unregister_failure_detection_event_listener(i_failure_detection_event_listener* listener);
friend std::ostream& operator<<(std::ostream& os, const failure_detector& x);
};
extern distributed<failure_detector> _the_failure_detector;
inline failure_detector& get_local_failure_detector() {
return _the_failure_detector.local();
}
inline distributed<failure_detector>& get_failure_detector() {
return _the_failure_detector;
}
inline future<> set_phi_convict_threshold(double phi) {
return smp::submit_to(0, [phi] {
get_local_failure_detector().set_phi_convict_threshold(phi);
});
}
inline future<double> get_phi_convict_threshold() {
return smp::submit_to(0, [] {
return get_local_failure_detector().get_phi_convict_threshold();
});
}
inline future<sstring> get_all_endpoint_states() {
return smp::submit_to(0, [] {
return get_local_failure_detector().get_all_endpoint_states();
});
}
inline future<sstring> get_endpoint_state(sstring address) {
return smp::submit_to(0, [address] {
return get_local_failure_detector().get_endpoint_state(address);
});
}
inline future<std::map<sstring, sstring>> get_simple_states() {
return smp::submit_to(0, [] {
return get_local_failure_detector().get_simple_states();
});
}
inline future<int> get_down_endpoint_count() {
return smp::submit_to(0, [] {
return get_local_failure_detector().get_down_endpoint_count();
});
}
inline future<int> get_up_endpoint_count() {
return smp::submit_to(0, [] {
return get_local_failure_detector().get_up_endpoint_count();
});
}
} // namespace gms