Skip to content

Commit 31b90fc

Browse files
committed
HBASE-27216 Revisit the ReplicationSyncUp tool
1 parent c1d126d commit 31b90fc

22 files changed

+1025
-265
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,8 @@ public static String writeMapAsString(Map<String, Object> map) throws IOExceptio
4040
public static String writeObjectAsString(Object object) throws IOException {
4141
return GSON.toJson(object);
4242
}
43+
44+
public static <T> T fromJson(String json, Class<T> clazz) {
45+
return GSON.fromJson(json, clazz);
46+
}
4347
}

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,7 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
717717
enum AssignReplicationQueuesState {
718718
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
719719
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
720+
ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3;
720721
}
721722

722723
message AssignReplicationQueuesStateData {

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,25 @@ void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
203203
* Add the given hfile refs to the given peer.
204204
*/
205205
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
206+
207+
// the below method is for clean up stale data after running ReplicatoinSyncUp
208+
/**
209+
* Remove all the last sequence ids and hfile references data which are written before the given
210+
* timestamp.
211+
* <p/>
212+
* The data of these two types are not used by replication directly.
213+
* <p/>
214+
* For last sequence ids, we will check it in serial replication, to make sure that we will
215+
* replicate all edits in order, so if there are stale data, the worst case is that we will stop
216+
* replicating as we think we still need to finish previous ranges first, although actually we
217+
* have already replicated them out.
218+
* <p/>
219+
* For hfile references, it is just used by hfile cleaner to not remove these hfiles before we
220+
* replicate them out, so if there are stale data, the worst case is that we can not remove these
221+
* hfiles, although actually they have already been replicated out.
222+
* <p/>
223+
* So it is OK for us to just bring up the cluster first, and then use this method to delete the
224+
* stale data, i.e, the data which are written before a specific timestamp.
225+
*/
226+
void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException;
206227
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.replication;
1919

2020
import java.io.IOException;
21+
import java.lang.reflect.Constructor;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.hbase.Coprocessor;
2324
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -27,20 +28,27 @@
2728
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
2829
import org.apache.hadoop.hbase.client.TableDescriptor;
2930
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
31+
import org.apache.hadoop.hbase.util.ReflectionUtils;
3032
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
3133
import org.apache.yetus.audience.InterfaceAudience;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3236

3337
/**
3438
* Used to create replication storage(peer, queue) classes.
3539
*/
3640
@InterfaceAudience.Private
3741
public final class ReplicationStorageFactory {
3842

43+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class);
44+
3945
public static final String REPLICATION_QUEUE_TABLE_NAME = "hbase.replication.queue.table.name";
4046

4147
public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT =
4248
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
4349

50+
public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.impl";
51+
4452
public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName)
4553
throws IOException {
4654
return TableDescriptorBuilder.newBuilder(tableName)
@@ -72,15 +80,26 @@ public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Con
7280
*/
7381
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
7482
Configuration conf) {
75-
return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
76-
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
83+
return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf
84+
.get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
7785
}
7886

7987
/**
8088
* Create a new {@link ReplicationQueueStorage}.
8189
*/
8290
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
83-
TableName tableName) {
84-
return new TableReplicationQueueStorage(conn, tableName);
91+
Configuration conf, TableName tableName) {
92+
Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL,
93+
TableReplicationQueueStorage.class, ReplicationQueueStorage.class);
94+
try {
95+
Constructor<? extends ReplicationQueueStorage> c =
96+
clazz.getConstructor(Connection.class, TableName.class);
97+
return c.newInstance(conn, tableName);
98+
} catch (Exception e) {
99+
LOG.debug(
100+
"failed to create ReplicationQueueStorage with Connection, try creating with Configuration",
101+
e);
102+
return ReflectionUtils.newInstance(clazz, conf, tableName);
103+
}
85104
}
86105
}

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,4 +594,24 @@ public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
594594
throw new ReplicationException("failed to batch update hfile references", e);
595595
}
596596
}
597+
598+
@Override
599+
public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
600+
try (Table table = conn.getTable(tableName);
601+
ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY)
602+
.addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) {
603+
for (;;) {
604+
Result r = scanner.next();
605+
if (r == null) {
606+
break;
607+
}
608+
Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts)
609+
.addFamily(HFILE_REF_FAMILY, ts);
610+
table.delete(delete);
611+
}
612+
} catch (IOException e) {
613+
throw new ReplicationException(
614+
"failed to remove last sequence ids and hfile references before timestamp " + ts, e);
615+
}
616+
}
597617
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import java.net.InetAddress;
3535
import java.net.InetSocketAddress;
3636
import java.net.UnknownHostException;
37+
import java.time.Instant;
38+
import java.time.ZoneId;
39+
import java.time.format.DateTimeFormatter;
3740
import java.util.ArrayList;
3841
import java.util.Arrays;
3942
import java.util.Collection;
@@ -59,6 +62,7 @@
5962
import javax.servlet.http.HttpServlet;
6063
import org.apache.commons.lang3.StringUtils;
6164
import org.apache.hadoop.conf.Configuration;
65+
import org.apache.hadoop.fs.FSDataInputStream;
6266
import org.apache.hadoop.fs.FSDataOutputStream;
6367
import org.apache.hadoop.fs.Path;
6468
import org.apache.hadoop.hbase.CatalogFamilyFormat;
@@ -226,6 +230,8 @@
226230
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
227231
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
228232
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
233+
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
234+
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
229235
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
230236
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
231237
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
@@ -246,6 +252,7 @@
246252
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
247253
import org.apache.hadoop.hbase.util.IdLock;
248254
import org.apache.hadoop.hbase.util.JVMClusterUtil;
255+
import org.apache.hadoop.hbase.util.JsonMapper;
249256
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
250257
import org.apache.hadoop.hbase.util.Pair;
251258
import org.apache.hadoop.hbase.util.RetryCounter;
@@ -267,7 +274,9 @@
267274
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
268275
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
269276
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
277+
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
270278
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
279+
import org.apache.hbase.thirdparty.com.google.gson.JsonParseException;
271280
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
272281
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
273282
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
@@ -1278,6 +1287,38 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
12781287
status.setStatus("Initializing MOB Cleaner");
12791288
initMobCleaner();
12801289

1290+
// delete the stale data for replication sync up tool if necessary
1291+
status.setStatus("Cleanup ReplicationSyncUp status if necessary");
1292+
Path replicationSyncUpInfoFile =
1293+
new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
1294+
if (dataFs.exists(replicationSyncUpInfoFile)) {
1295+
// info file is available, load the timestamp and use it to clean up stale data in replication
1296+
// queue storage.
1297+
byte[] data;
1298+
try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
1299+
data = ByteStreams.toByteArray(in);
1300+
}
1301+
ReplicationSyncUpToolInfo info = null;
1302+
try {
1303+
info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
1304+
} catch (JsonParseException e) {
1305+
// usually this should be a partial file, which means the ReplicationSyncUp tool did not
1306+
// finish properly, so not a problem. Here we do not clean up the status as we do not know
1307+
// the reason why the tool did not finish properly, so let users clean the status up
1308+
// manually
1309+
LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
1310+
}
1311+
if (info != null) {
1312+
LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
1313+
info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
1314+
.format(Instant.ofEpochMilli(info.getStartTimeMs())));
1315+
replicationPeerManager.getQueueStorage()
1316+
.removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
1317+
// delete the file after removing the stale data, so next time we do not need to do this
1318+
// again.
1319+
dataFs.delete(replicationSyncUpInfoFile, false);
1320+
}
1321+
}
12811322
status.setStatus("Calling postStartMaster coprocessors");
12821323
if (this.cpHost != null) {
12831324
// don't let cp initialization errors kill the master

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.List;
2525
import java.util.Set;
2626
import java.util.stream.Collectors;
27+
import org.apache.hadoop.fs.Path;
2728
import org.apache.hadoop.hbase.ServerName;
29+
import org.apache.hadoop.hbase.master.MasterFileSystem;
2830
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
2931
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
3032
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
@@ -37,6 +39,7 @@
3739
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
3840
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
3941
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
42+
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
4043
import org.apache.hadoop.hbase.util.RetryCounter;
4144
import org.apache.yetus.audience.InterfaceAudience;
4245
import org.slf4j.Logger;
@@ -102,7 +105,7 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio
102105
}
103106
}
104107

105-
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
108+
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
106109
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
107110
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
108111
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
@@ -130,18 +133,51 @@ private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
130133
return Flow.HAS_MORE_STATE;
131134
}
132135

136+
// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
137+
// claiming the replication queues and deleting them instead.
138+
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
139+
MasterFileSystem mfs = env.getMasterFileSystem();
140+
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
141+
return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName()));
142+
}
143+
144+
private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
145+
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
146+
for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) {
147+
storage.removeQueue(queueId);
148+
}
149+
MasterFileSystem mfs = env.getMasterFileSystem();
150+
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
151+
// remove the region server record file
152+
mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false);
153+
}
154+
133155
@Override
134156
protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state)
135157
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
136158
try {
137159
switch (state) {
138160
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
139-
addMissingQueues(env);
140-
retryCounter = null;
141-
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
142-
return Flow.HAS_MORE_STATE;
161+
if (shouldSkip(env)) {
162+
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
163+
return Flow.HAS_MORE_STATE;
164+
} else {
165+
addMissingQueues(env);
166+
retryCounter = null;
167+
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
168+
return Flow.HAS_MORE_STATE;
169+
}
143170
case ASSIGN_REPLICATION_QUEUES_CLAIM:
144-
return claimQueues(env);
171+
if (shouldSkip(env)) {
172+
retryCounter = null;
173+
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
174+
return Flow.HAS_MORE_STATE;
175+
} else {
176+
return claimQueues(env);
177+
}
178+
case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES:
179+
removeQueues(env);
180+
return Flow.NO_MORE_STATE;
145181
default:
146182
throw new UnsupportedOperationException("unhandled state=" + state);
147183
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@
1919

2020
import java.io.IOException;
2121
import java.util.Optional;
22+
import org.apache.hadoop.fs.Path;
2223
import org.apache.hadoop.hbase.ServerName;
24+
import org.apache.hadoop.hbase.master.MasterFileSystem;
2325
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
2426
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
2527
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
2628
import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
29+
import org.apache.hadoop.hbase.procedure2.Procedure;
2730
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
31+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
32+
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
2833
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
2934
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
3035
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
3136
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
37+
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
3238
import org.apache.yetus.audience.InterfaceAudience;
3339
import org.slf4j.Logger;
3440
import org.slf4j.LoggerFactory;
@@ -54,6 +60,32 @@ public ClaimReplicationQueueRemoteProcedure(ReplicationQueueId queueId, ServerNa
5460
this.targetServer = targetServer;
5561
}
5662

63+
// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
64+
// claiming the replication queues and deleting them instead.
65+
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
66+
MasterFileSystem mfs = env.getMasterFileSystem();
67+
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
68+
return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
69+
}
70+
71+
@Override
72+
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
73+
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
74+
try {
75+
if (shouldSkip(env)) {
76+
LOG.info("Skip claiming {} because replication sync up has already done it for us",
77+
getServerName());
78+
return null;
79+
}
80+
} catch (IOException e) {
81+
LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
82+
getServerName(), e);
83+
// just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
84+
return null;
85+
}
86+
return super.execute(env);
87+
}
88+
5789
@Override
5890
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
5991
assert targetServer.equals(remote);

0 commit comments

Comments
 (0)