-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathHost.h
281 lines (235 loc) · 7.45 KB
/
Host.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#ifndef RAFTEX_HOST_H_
#define RAFTEX_HOST_H_
#include <folly/futures/Future.h>
#include <folly/futures/SharedPromise.h>
#include "common/base/Base.h"
#include "common/base/ErrorOr.h"
#include "common/thrift/ThriftClientManager.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
#include "interface/gen-cpp2/raftex_types.h"
namespace folly {
class EventBase;
} // namespace folly
namespace nebula {
namespace raftex {
class RaftPart;
/**
* @brief Host is a class to monitor how many log has been sent to a raft peer. It will send logs or
* start elelction to the remote peer by rpc
*/
class Host final : public std::enable_shared_from_this<Host> {
friend class RaftPart;
public:
/**
* @brief Construct a new Host
*
* @param addr Target peer address
* @param part Related RaftPart
* @param isLearner Whether target is a learner
*/
Host(const HostAddr& addr, std::shared_ptr<RaftPart> part, bool isLearner = false);
/**
* @brief Destroy the Host
*/
~Host() {
VLOG(1) << idStr_ << " The host has been destroyed!";
}
/**
* @brief The str of the Host, used in logging
*/
const char* idStr() const {
return idStr_.c_str();
}
/**
* @brief This will be called when the RaftPart lost its leadership
*/
void pause() {
std::lock_guard<std::mutex> g(lock_);
paused_ = true;
}
/**
* @brief This will be called when the RaftPart becomes the leader
*/
void resume() {
std::lock_guard<std::mutex> g(lock_);
paused_ = false;
}
/**
* @brief This will be called when the RaftPart is stopped
*/
void stop() {
std::lock_guard<std::mutex> g(lock_);
stopped_ = true;
}
/**
* @brief Reset all state, should be called when the RaftPart becomes the leader
*/
void reset() {
std::unique_lock<std::mutex> g(lock_);
noMoreRequestCV_.wait(g, [this] { return !requestOnGoing_; });
logIdToSend_ = 0;
logTermToSend_ = 0;
lastLogIdSent_ = 0;
lastLogTermSent_ = 0;
committedLogId_ = 0;
sendingSnapshot_ = false;
followerCommittedLogId_ = 0;
}
/**
* @brief Wait for all requests in flight finish or timeout
*/
void waitForStop();
/**
* @brief Return whether the target peer is a raft learner
*/
bool isLearner() const {
return isLearner_;
}
/**
* @brief Set the state in Host of a raft peer as learner
*/
void setLearner(bool isLearner) {
isLearner_ = isLearner;
}
/**
* @brief Send the leader election rpc to the peer
*
* @param req The RPC request
* @param eb The eventbase to send rpc
* @return folly::Future<cpp2::AskForVoteResponse>
*/
folly::Future<cpp2::AskForVoteResponse> askForVote(const cpp2::AskForVoteRequest& req,
folly::EventBase* eb);
/**
* @brief Send the append log to the peer
*
* @param eb The eventbase to send rpc
* @param term The term of RaftPart
* @param logId The last log to be sent
* @param committedLogId The last committed log id
* @param lastLogTermSent The last log term being sent
* @param lastLogIdSent The last log id being sent
* @return folly::Future<cpp2::AppendLogResponse>
*/
folly::Future<cpp2::AppendLogResponse> appendLogs(folly::EventBase* eb,
TermID term,
LogID logId,
LogID committedLogId,
TermID lastLogTermSent,
LogID lastLogIdSent);
/**
* @brief Send the heartbeat to the peer
*
* @param eb The eventbase to send rpc
* @param term The term of RaftPart
* @param logId The last log to be sent
* @param committedLogId The last committed log id
* @param lastLogTermSent The last log term being sent
* @param lastLogIdSent The last log id being sent
* @return folly::Future<cpp2::AppendLogResponse>
*/
folly::Future<cpp2::HeartbeatResponse> sendHeartbeat(
folly::EventBase* eb, TermID term, LogID commitLogId, TermID lastLogTerm, LogID lastLogId);
/**
* @brief Return the peer address
*/
const HostAddr& address() const {
return addr_;
}
private:
/**
* @brief Whether Host can send rpc to the peer
*/
nebula::cpp2::ErrorCode canAppendLog() const;
/**
* @brief Send append log rpc
*
* @param eb The eventbase to send rpc
* @param req The rpc request
* @return folly::Future<cpp2::AppendLogResponse>
*/
folly::Future<cpp2::AppendLogResponse> sendAppendLogRequest(
folly::EventBase* eb, std::shared_ptr<cpp2::AppendLogRequest> req);
/**
* @brief Send the append log rpc and handle the response
*
* @param eb The eventbase to send rpc
* @param req The rpc request
*/
void appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::AppendLogRequest> req);
folly::Future<cpp2::HeartbeatResponse> sendHeartbeatRequest(
folly::EventBase* eb, std::shared_ptr<cpp2::HeartbeatRequest> req);
/**
* @brief Build the append log request based on the log id
*
* @return ErrorOr<nebula::cpp2::ErrorCode, std::shared_ptr<cpp2::AppendLogRequest>>
*/
ErrorOr<nebula::cpp2::ErrorCode, std::shared_ptr<cpp2::AppendLogRequest>>
prepareAppendLogRequest();
/**
* @brief Begin to start snapshot when we don't have the log in wal file
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode startSendSnapshot();
/**
* @brief Return true if there isn't a request in flight
*/
bool noRequest() const;
/**
* @brief Notify the RaftPart the result of sending logs to peers
*
* @param resp RPC response
*/
void setResponse(const cpp2::AppendLogResponse& resp);
void setLastHeartbeatTime(int64_t time) {
lastHeartbeatTime_ = time;
}
int64_t getLastHeartbeatTime() const {
return lastHeartbeatTime_;
}
/**
* @brief If there are more logs to send, build the append log request
*
* @param self Shared ptr of Host itself
* @return std::shared_ptr<cpp2::AppendLogRequest> The request if there are logs to send, return
* nullptr there are none
*/
std::shared_ptr<cpp2::AppendLogRequest> getPendingReqIfAny(std::shared_ptr<Host> self);
private:
// <term, logId, committedLogId>
using Request = std::tuple<TermID, LogID, LogID>;
std::shared_ptr<RaftPart> part_;
const HostAddr addr_;
bool isLearner_ = false;
const std::string idStr_;
mutable std::mutex lock_;
bool paused_{false};
bool stopped_{false};
// whether there is a batch of logs for target host in on going
bool requestOnGoing_{false};
// whether there is a snapshot for target host in on going
bool sendingSnapshot_{false};
std::condition_variable noMoreRequestCV_;
folly::SharedPromise<cpp2::AppendLogResponse> promise_;
folly::SharedPromise<cpp2::AppendLogResponse> cachingPromise_;
Request pendingReq_{0, 0, 0};
// These logId and term pointing to the latest log we need to send
LogID logIdToSend_{0};
TermID logTermToSend_{0};
// The previous log before this batch
LogID lastLogIdSent_{0};
TermID lastLogTermSent_{0};
LogID committedLogId_{0};
// CommittedLogId of follower
LogID followerCommittedLogId_{0};
// last HB response time from the peer
int64_t lastHeartbeatTime_{0};
};
} // namespace raftex
} // namespace nebula
#endif // RAFTEX_HOST_H_