Skip to content

Commit e5257f7

Browse files
committed
HBASE-27213 Add support for claim queue operation (#4708)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
1 parent 1effd1c commit e5257f7

File tree

8 files changed

+258
-26
lines changed

8 files changed

+258
-26
lines changed

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ message UpdatePeerConfigStateData {
515515

516516
message RemovePeerStateData {
517517
optional ReplicationPeer peer_config = 1;
518+
repeated int64 ongoing_assign_replication_queues_proc_ids = 2;
518519
}
519520

520521
message EnablePeerStateData {
@@ -714,9 +715,8 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
714715
}
715716

716717
enum AssignReplicationQueuesState {
717-
ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1;
718-
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2;
719-
ASSIGN_REPLICATION_QUEUES_CLAIM = 3;
718+
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
719+
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
720720
}
721721

722722
message AssignReplicationQueuesStateData {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Iterator;
2424
import java.util.List;
2525
import java.util.Set;
26+
import java.util.stream.Collectors;
2627
import org.apache.hadoop.hbase.ServerName;
2728
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
2829
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
@@ -102,8 +103,12 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio
102103
}
103104

104105
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
106+
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
107+
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
105108
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
106-
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer);
109+
// filter out replication queue for deleted peers
110+
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream()
111+
.filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList());
107112
if (queueIds.isEmpty()) {
108113
LOG.debug("Finish claiming replication queues for {}", crashedServer);
109114
// we are done
@@ -130,10 +135,6 @@ protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesS
130135
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
131136
try {
132137
switch (state) {
133-
case ASSIGN_REPLICATION_QUEUES_PRE_CHECK:
134-
// TODO: reserved for implementing the fencing logic with Add/Remove/UpdatePeerProcedure
135-
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES);
136-
return Flow.HAS_MORE_STATE;
137138
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
138139
addMissingQueues(env);
139140
retryCounter = null;
@@ -183,7 +184,7 @@ protected int getStateId(AssignReplicationQueuesState state) {
183184

184185
@Override
185186
protected AssignReplicationQueuesState getInitialState() {
186-
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_PRE_CHECK;
187+
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES;
187188
}
188189

189190
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected abstract void prePeerModification(MasterProcedureEnv env)
7474
* update the peer storage.
7575
*/
7676
protected abstract void postPeerModification(MasterProcedureEnv env)
77-
throws IOException, ReplicationException;
77+
throws IOException, ReplicationException, ProcedureSuspendedException;
7878

7979
protected void releaseLatch(MasterProcedureEnv env) {
8080
ProcedurePrepareLatch.releaseLatch(latch, this);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@
1818
package org.apache.hadoop.hbase.master.replication;
1919

2020
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.stream.Collectors;
2125
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
2226
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
2327
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
28+
import org.apache.hadoop.hbase.procedure2.Procedure;
29+
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
2430
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
31+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
2532
import org.apache.hadoop.hbase.replication.ReplicationException;
2633
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
2734
import org.apache.yetus.audience.InterfaceAudience;
@@ -40,6 +47,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
4047

4148
private ReplicationPeerConfig peerConfig;
4249

50+
private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList();
51+
4352
public RemovePeerProcedure() {
4453
}
4554

@@ -64,15 +73,43 @@ protected void prePeerModification(MasterProcedureEnv env) throws IOException {
6473
@Override
6574
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
6675
env.getReplicationPeerManager().removePeer(peerId);
76+
// record ongoing AssignReplicationQueuesProcedures after we update the peer storage
77+
ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor()
78+
.getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure)
79+
.filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList());
6780
}
6881

6982
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
7083
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
7184
}
7285

86+
private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env)
87+
throws ProcedureSuspendedException {
88+
if (ongoingAssignReplicationQueuesProcIds.isEmpty()) {
89+
LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on",
90+
peerId);
91+
}
92+
ProcedureExecutor<MasterProcedureEnv> procExec =
93+
env.getMasterServices().getMasterProcedureExecutor();
94+
long[] unfinishedProcIds =
95+
ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure)
96+
.filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray();
97+
if (unfinishedProcIds.length == 0) {
98+
LOG.info(
99+
"All assign replication queues procedures are finished when removing peer {}, move on",
100+
peerId);
101+
} else {
102+
throw suspend(env.getMasterConfiguration(), backoff -> LOG.info(
103+
"There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs",
104+
unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000));
105+
}
106+
}
107+
73108
@Override
74109
protected void postPeerModification(MasterProcedureEnv env)
75-
throws IOException, ReplicationException {
110+
throws IOException, ReplicationException, ProcedureSuspendedException {
111+
checkAssignReplicationQueuesFinished(env);
112+
76113
if (peerConfig.isSyncReplication()) {
77114
removeRemoteWALs(env);
78115
}
@@ -94,6 +131,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
94131
if (peerConfig != null) {
95132
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
96133
}
134+
builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds);
97135
serializer.serialize(builder.build());
98136
}
99137

@@ -104,5 +142,6 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
104142
if (data.hasPeerConfig()) {
105143
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
106144
}
145+
ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList();
107146
}
108147
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
237237
*/
238238
void init() throws IOException {
239239
for (String id : this.replicationPeers.getAllPeerIds()) {
240-
addSource(id);
240+
addSource(id, true);
241241
}
242242
}
243243

@@ -257,7 +257,7 @@ public void addPeer(String peerId) throws IOException {
257257
throw new IOException(e);
258258
}
259259
if (added) {
260-
addSource(peerId);
260+
addSource(peerId, false);
261261
}
262262
}
263263

@@ -323,11 +323,16 @@ private ReplicationSourceInterface createSource(ReplicationQueueData queueData,
323323
/**
324324
* Add a normal source for the given peer on this region server. Meanwhile, add new replication
325325
* queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
326-
* group and do replication
326+
* group and do replication.
327+
* <p/>
328+
* We add a {@code init} parameter to indicate whether this is part of the initialization process.
329+
* If so, we should skip adding the replication queues as this may introduce dead lock on region
330+
* server start up and hbase:replication table online.
327331
* @param peerId the id of the replication peer
332+
* @param init whether this call is part of the initialization process
328333
* @return the source that was created
329334
*/
330-
void addSource(String peerId) throws IOException {
335+
void addSource(String peerId, boolean init) throws IOException {
331336
ReplicationPeer peer = replicationPeers.getPeer(peerId);
332337
if (
333338
ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
@@ -352,11 +357,16 @@ void addSource(String peerId) throws IOException {
352357
NavigableSet<String> wals = new TreeSet<>();
353358
wals.add(walPath.getName());
354359
walsByGroup.put(walPrefixAndPath.getKey(), wals);
355-
// Abort RS and throw exception to make add peer failed
356-
// TODO: can record the length of the current wal file so we could replicate less data
357-
abortAndThrowIOExceptionWhenFail(
358-
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
359-
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
360+
if (!init) {
361+
// Abort RS and throw exception to make add peer failed
362+
// Ideally we'd better use the current file size as offset so we can skip replicating
363+
// the data before adding replication peer, but the problem is that the file may not end
364+
// at a valid entry's ending, and the current WAL Reader implementation can not deal
365+
// with reading from the middle of a WAL entry. Can improve later.
366+
abortAndThrowIOExceptionWhenFail(
367+
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
368+
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
369+
}
360370
src.enqueueLog(walPath);
361371
LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
362372
}
@@ -795,9 +805,15 @@ public void postLogRoll(Path newLog) throws IOException {
795805
* @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
796806
*/
797807
private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
798-
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
808+
// skip replicating meta wals
809+
if (AbstractFSWALProvider.isMetaFile(wal)) {
799810
return false;
800811
}
812+
// if no offset or the offset is just a place marker, replicate
813+
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
814+
return true;
815+
}
816+
// otherwise, compare the timestamp
801817
long walTs = AbstractFSWALProvider.getTimestamp(wal);
802818
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
803819
if (walTs < startWalTs) {
@@ -892,7 +908,6 @@ Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getN
892908
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
893909
groupOffset);
894910
}
895-
walFilesPQ.add(file);
896911
}
897912
// the method is a bit long, so assign it to null here to avoid later we reuse it again by
898913
// mistake, we should use the sorted walFilesPQ instead

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void testClaim() throws Exception {
156156
hbaseAdmin.enableReplicationPeer(PEER_ID3);
157157

158158
EMPTY = false;
159-
// wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
159+
// wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
160160
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
161161
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
162162

0 commit comments

Comments
 (0)