Skip to content

Commit 76516a6

Browse files
committed
HBASE-27214 Implement the new replication hfile/log cleaner (#4722)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
1 parent 2c12cc0 commit 76516a6

20 files changed

+1008
-253
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ default void preClean() {
5050
}
5151

5252
/**
53-
* Used to do some cleanup work
53+
* Will be called after cleaner run.
5454
*/
5555
default void postClean() {
5656
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ public static MasterRegion create(MasterRegionParams params) throws IOException
380380
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
381381
walRoller.start();
382382

383-
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
383+
WALFactory walFactory = new WALFactory(conf, server.getServerName(), server, false);
384384
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
385385
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
386386
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
2222
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
2323
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
24-
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
2524
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
2625
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
2726
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -84,15 +83,21 @@ protected ReplicationPeerConfig getNewPeerConfig() {
8483

8584
@Override
8685
protected void releaseLatch(MasterProcedureEnv env) {
86+
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
8787
if (peerConfig.isSyncReplication()) {
8888
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
8989
}
90-
ProcedurePrepareLatch.releaseLatch(latch, this);
90+
super.releaseLatch(env);
9191
}
9292

9393
@Override
9494
protected void prePeerModification(MasterProcedureEnv env)
9595
throws IOException, ReplicationException, ProcedureSuspendedException {
96+
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
97+
throw suspend(env.getMasterConfiguration(),
98+
backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
99+
peerId, backoff / 1000));
100+
}
96101
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
97102
if (cpHost != null) {
98103
cpHost.preAddReplicationPeer(peerId, peerConfig);
@@ -128,9 +133,13 @@ protected void postPeerModification(MasterProcedureEnv env)
128133
@Override
129134
protected void afterReplay(MasterProcedureEnv env) {
130135
if (getCurrentState() == getInitialState()) {
131-
// will try to acquire the lock when executing the procedure, no need to acquire it here
136+
// do not need to disable log cleaner or acquire lock if we are in the initial state, later
137+
// when executing the procedure we will try to disable and acquire.
132138
return;
133139
}
140+
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
141+
throw new IllegalStateException("can not disable log cleaner, this should not happen");
142+
}
134143
if (peerConfig.isSyncReplication()) {
135144
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
136145
throw new IllegalStateException(

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
6161
import org.apache.hadoop.hbase.replication.ReplicationUtils;
6262
import org.apache.hadoop.hbase.replication.SyncReplicationState;
63+
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
6364
import org.apache.hadoop.hbase.util.Pair;
6465
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
6566
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -102,6 +103,9 @@ public class ReplicationPeerManager implements ConfigurationObserver {
102103
// Only allow to add one sync replication peer concurrently
103104
private final Semaphore syncReplicationPeerLock = new Semaphore(1);
104105

106+
private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
107+
new ReplicationLogCleanerBarrier();
108+
105109
private final String clusterId;
106110

107111
private volatile Configuration conf;
@@ -705,6 +709,10 @@ public void releaseSyncReplicationPeerLock() {
705709
syncReplicationPeerLock.release();
706710
}
707711

712+
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
713+
return replicationLogCleanerBarrier;
714+
}
715+
708716
@Override
709717
public void onConfigurationChange(Configuration conf) {
710718
this.conf = conf;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1734,7 +1734,7 @@ public boolean isOnline() {
17341734
* be hooked up to WAL.
17351735
*/
17361736
private void setupWALAndReplication() throws IOException {
1737-
WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
1737+
WALFactory factory = new WALFactory(conf, serverName, this, true);
17381738
// TODO Replication make assumptions here based on the default filesystem impl
17391739
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
17401740
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
21+
import org.apache.yetus.audience.InterfaceAudience;
22+
23+
@InterfaceAudience.Private
24+
public final class ReplicationOffsetUtil {
25+
26+
private ReplicationOffsetUtil() {
27+
}
28+
29+
public static boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
30+
// if no offset or the offset is just a place marker, replicate
31+
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
32+
return true;
33+
}
34+
// otherwise, compare the timestamp
35+
long walTs = AbstractFSWALProvider.getTimestamp(wal);
36+
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
37+
if (walTs < startWalTs) {
38+
return false;
39+
} else if (walTs > startWalTs) {
40+
return true;
41+
}
42+
// if the timestamp equals, usually it means we should include this wal but there is a special
43+
// case, a negative offset means the wal has already been fully replicated, so here we should
44+
// check the offset.
45+
return offset.getOffset() >= 0;
46+
}
47+
}

0 commit comments

Comments
 (0)