@@ -99,7 +99,7 @@ void Listener::stop() {
99
99
bool Listener::preProcessLog (LogID logId,
100
100
TermID termId,
101
101
ClusterID clusterId,
102
- const std::string& log) {
102
+ folly::StringPiece log) {
103
103
UNUSED (logId);
104
104
UNUSED (termId);
105
105
UNUSED (clusterId);
@@ -147,6 +147,7 @@ void Listener::doApply() {
147
147
if (isStopped ()) {
148
148
return ;
149
149
}
150
+
150
151
if (needToCleanupSnapshot ()) {
151
152
cleanupSnapshot ();
152
153
}
@@ -156,87 +157,89 @@ void Listener::doApply() {
156
157
bgWorkers_->addDelayTask (
157
158
FLAGS_listener_commit_interval_secs * 1000 , &Listener::doApply, this );
158
159
};
160
+ processLogs ();
161
+ });
162
+ }
159
163
160
- std::unique_ptr<LogIterator> iter;
161
- {
162
- std::lock_guard<std::mutex> guard (raftLock_);
163
- if (lastApplyLogId_ >= committedLogId_) {
164
- return ;
165
- }
166
- iter = wal_->iterator (lastApplyLogId_ + 1 , committedLogId_);
164
+ void Listener::processLogs () {
165
+ std::unique_ptr<LogIterator> iter;
166
+ {
167
+ std::lock_guard<std::mutex> guard (raftLock_);
168
+ if (lastApplyLogId_ >= committedLogId_) {
169
+ return ;
167
170
}
171
+ iter = wal_->iterator (lastApplyLogId_ + 1 , committedLogId_);
172
+ }
168
173
169
- LogID lastApplyId = -1 ;
170
- // the kv pair which can sync to remote safely
171
- std::vector<KV> data;
172
- while (iter->valid ()) {
173
- lastApplyId = iter->logId ();
174
-
175
- auto log = iter->logMsg ();
176
- if (log .empty ()) {
177
- // skip the heartbeat
178
- ++(*iter);
179
- continue ;
180
- }
174
+ LogID lastApplyId = -1 ;
175
+ // the kv pair which can sync to remote safely
176
+ std::vector<KV> data;
177
+ while (iter->valid ()) {
178
+ lastApplyId = iter->logId ();
181
179
182
- DCHECK_GE ( log . size (), sizeof ( int64_t ) + 1 + sizeof ( uint32_t ) );
183
- switch (log [ sizeof ( int64_t )] ) {
184
- case OP_PUT: {
185
- auto pieces = decodeMultiValues ( log );
186
- DCHECK_EQ ( 2 , pieces. size ()) ;
187
- data. emplace_back (pieces[ 0 ], pieces[ 1 ]);
188
- break ;
189
- }
190
- case OP_MULTI_PUT: {
191
- auto kvs = decodeMultiValues ( log );
192
- DCHECK_EQ ((kvs. size () + 1 ) / 2 , kvs. size () / 2 );
193
- for ( size_t i = 0 ; i < kvs .size (); i += 2 ) {
194
- data.emplace_back (kvs[i ], kvs[i + 1 ]);
195
- }
196
- break ;
197
- }
198
- case OP_REMOVE:
199
- case OP_REMOVE_RANGE:
200
- case OP_MULTI_REMOVE: {
201
- break ;
180
+ auto log = iter-> logMsg ( );
181
+ if (log . empty () ) {
182
+ // skip the heartbeat
183
+ ++(*iter );
184
+ continue ;
185
+ }
186
+
187
+ DCHECK_GE ( log . size (), sizeof ( int64_t ) + 1 + sizeof ( uint32_t ));
188
+ switch ( log [ sizeof ( int64_t )]) {
189
+ case OP_PUT: {
190
+ auto pieces = decodeMultiValues ( log );
191
+ DCHECK_EQ ( 2 , pieces .size ());
192
+ data.emplace_back (pieces[ 0 ], pieces[ 1 ]);
193
+ break ;
194
+ }
195
+ case OP_MULTI_PUT: {
196
+ auto kvs = decodeMultiValues ( log );
197
+ DCHECK_EQ ( 0 , kvs. size () % 2 );
198
+ for ( size_t i = 0 ; i < kvs. size (); i += 2 ) {
199
+ data. emplace_back (kvs[i], kvs[i + 1 ]) ;
202
200
}
203
- case OP_BATCH_WRITE: {
204
- auto batch = decodeBatchValue (log );
205
- for (auto & op : batch) {
206
- // OP_BATCH_PUT and OP_BATCH_REMOVE_RANGE is ignored
207
- if (op.first == BatchLogType::OP_BATCH_PUT) {
208
- data.emplace_back (op.second .first , op.second .second );
209
- }
201
+ break ;
202
+ }
203
+ case OP_REMOVE:
204
+ case OP_REMOVE_RANGE:
205
+ case OP_MULTI_REMOVE: {
206
+ break ;
207
+ }
208
+ case OP_BATCH_WRITE: {
209
+ auto batch = decodeBatchValue (log );
210
+ for (auto & op : batch) {
211
+ // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
212
+ if (op.first == BatchLogType::OP_BATCH_PUT) {
213
+ data.emplace_back (op.second .first , op.second .second );
210
214
}
211
- break ;
212
- }
213
- case OP_TRANS_LEADER:
214
- case OP_ADD_LEARNER:
215
- case OP_ADD_PEER:
216
- case OP_REMOVE_PEER: {
217
- break ;
218
- }
219
- default : {
220
- VLOG (2 ) << idStr_
221
- << " Should not reach here. Unknown operation: " << static_cast <int32_t >(log [0 ]);
222
215
}
216
+ break ;
223
217
}
224
-
225
- if (static_cast <int32_t >(data.size ()) > FLAGS_listener_commit_batch_size) {
218
+ case OP_TRANS_LEADER:
219
+ case OP_ADD_LEARNER:
220
+ case OP_ADD_PEER:
221
+ case OP_REMOVE_PEER: {
226
222
break ;
227
223
}
228
- ++(*iter);
224
+ default : {
225
+ VLOG (2 ) << idStr_ << " Unknown operation: " << static_cast <int32_t >(log [0 ]);
226
+ }
229
227
}
230
228
231
- // apply to state machine
232
- if (lastApplyId != -1 && apply (data)) {
233
- std::lock_guard<std::mutex> guard (raftLock_);
234
- lastApplyLogId_ = lastApplyId;
235
- persist (committedLogId_, term_, lastApplyLogId_);
236
- VLOG (2 ) << idStr_ << " Listener succeeded apply log to " << lastApplyLogId_;
237
- lastApplyTime_ = time ::WallClock::fastNowInMilliSec ();
229
+ if (static_cast <int32_t >(data.size ()) > FLAGS_listener_commit_batch_size) {
230
+ break ;
238
231
}
239
- });
232
+ ++(*iter);
233
+ }
234
+
235
+ // apply to state machine
236
+ if (lastApplyId != -1 && apply (data)) {
237
+ std::lock_guard<std::mutex> guard (raftLock_);
238
+ lastApplyLogId_ = lastApplyId;
239
+ persist (committedLogId_, term_, lastApplyLogId_);
240
+ VLOG (2 ) << idStr_ << " Listener succeeded apply log to " << lastApplyLogId_;
241
+ lastApplyTime_ = time ::WallClock::fastNowInMilliSec ();
242
+ }
240
243
}
241
244
242
245
std::tuple<nebula::cpp2::ErrorCode, int64_t , int64_t > Listener::commitSnapshot (
@@ -303,5 +306,6 @@ bool Listener::pursueLeaderDone() {
303
306
" pursue leader : leaderCommitId={}, lastApplyLogId_={}" , leaderCommitId_, lastApplyLogId_);
304
307
return (leaderCommitId_ - lastApplyLogId_) <= FLAGS_listener_pursue_leader_threshold;
305
308
}
309
+
306
310
} // namespace kvstore
307
311
} // namespace nebula
0 commit comments