Skip to content

Commit c7eb2ab

Browse files
committed
HBASE-27212 Implement a new table based replication queue storage and make the minimum replication system work (#4672)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
1 parent 21d61cf commit c7eb2ab

File tree

68 files changed

+2707
-3054
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2707
-3054
lines changed

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -679,16 +679,13 @@ message ClaimReplicationQueueRemoteStateData {
679679
required ServerName crashed_server = 1;
680680
required string queue = 2;
681681
required ServerName target_server = 3;
682+
optional ServerName source_server = 4;
682683
}
683684

684685
message ClaimReplicationQueueRemoteParameter {
685686
required ServerName crashed_server = 1;
686687
required string queue = 2;
687-
}
688-
689-
enum ClaimReplicationQueuesState {
690-
CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
691-
CLAIM_REPLICATION_QUEUES_FINISH = 2;
688+
optional ServerName source_server = 3;
692689
}
693690

694691
enum ModifyTableDescriptorState {
@@ -715,3 +712,13 @@ message ModifyStoreFileTrackerStateData {
715712
message ModifyColumnFamilyStoreFileTrackerStateData {
716713
required bytes family = 1;
717714
}
715+
716+
enum AssignReplicationQueuesState {
717+
ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1;
718+
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2;
719+
ASSIGN_REPLICATION_QUEUES_CLAIM = 3;
720+
}
721+
722+
message AssignReplicationQueuesStateData {
723+
required ServerName crashed_server = 1;
724+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
@InterfaceAudience.Private
23+
public class ReplicationGroupOffset {
24+
25+
public static final ReplicationGroupOffset BEGIN = new ReplicationGroupOffset("", 0L);
26+
27+
private final String wal;
28+
29+
private final long offset;
30+
31+
public ReplicationGroupOffset(String wal, long offset) {
32+
this.wal = wal;
33+
this.offset = offset;
34+
}
35+
36+
public String getWal() {
37+
return wal;
38+
}
39+
40+
/**
41+
* A negative value means this file has already been fully replicated out
42+
*/
43+
public long getOffset() {
44+
return offset;
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return wal + ":" + offset;
50+
}
51+
52+
public static ReplicationGroupOffset parse(String str) {
53+
int index = str.lastIndexOf(':');
54+
return new ReplicationGroupOffset(str.substring(0, index),
55+
Long.parseLong(str.substring(index + 1)));
56+
}
57+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
23+
24+
/**
25+
* Representing all the information for a replication queue.
26+
*/
27+
@InterfaceAudience.Private
28+
public class ReplicationQueueData {
29+
30+
private final ReplicationQueueId id;
31+
32+
private final ImmutableMap<String, ReplicationGroupOffset> offsets;
33+
34+
public ReplicationQueueData(ReplicationQueueId id,
35+
ImmutableMap<String, ReplicationGroupOffset> offsets) {
36+
this.id = id;
37+
this.offsets = offsets;
38+
}
39+
40+
public ReplicationQueueId getId() {
41+
return id;
42+
}
43+
44+
public ImmutableMap<String, ReplicationGroupOffset> getOffsets() {
45+
return offsets;
46+
}
47+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import java.util.Objects;
21+
import java.util.Optional;
22+
import org.apache.hadoop.hbase.ServerName;
23+
import org.apache.hadoop.hbase.util.Bytes;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
26+
@InterfaceAudience.Private
27+
public class ReplicationQueueId {
28+
29+
private final ServerName serverName;
30+
31+
private final String peerId;
32+
33+
private final Optional<ServerName> sourceServerName;
34+
35+
// we do not allow '-' in peer names so it is safe to use it as the separator for peer id and
36+
// server name
37+
private static final char PEER_ID_SEPARATOR = '-';
38+
39+
// The '/' character is not valid for a hostname or a nodename(FQDN, so it is safe to use it as
40+
// the separator for server names)
41+
private static final char SERVER_NAME_SEPARATOR = '/';
42+
43+
public ReplicationQueueId(ServerName serverName, String peerId) {
44+
this.serverName = Objects.requireNonNull(serverName);
45+
this.peerId = Objects.requireNonNull(peerId);
46+
this.sourceServerName = Optional.empty();
47+
}
48+
49+
public ReplicationQueueId(ServerName serverName, String peerId, ServerName sourceServerName) {
50+
this.serverName = Objects.requireNonNull(serverName);
51+
this.peerId = Objects.requireNonNull(peerId);
52+
this.sourceServerName = Optional.of(sourceServerName);
53+
}
54+
55+
public ServerName getServerName() {
56+
return serverName;
57+
}
58+
59+
public String getPeerId() {
60+
return peerId;
61+
}
62+
63+
public Optional<ServerName> getSourceServerName() {
64+
return sourceServerName;
65+
}
66+
67+
public ServerName getServerWALsBelongTo() {
68+
return sourceServerName.orElse(serverName);
69+
}
70+
71+
public boolean isRecovered() {
72+
return sourceServerName.isPresent();
73+
}
74+
75+
public ReplicationQueueId claim(ServerName targetServerName) {
76+
ServerName newSourceServerName = sourceServerName.orElse(serverName);
77+
return new ReplicationQueueId(targetServerName, peerId, newSourceServerName);
78+
}
79+
80+
@Override
81+
public int hashCode() {
82+
return Objects.hash(peerId, serverName, sourceServerName);
83+
}
84+
85+
@Override
86+
public boolean equals(Object obj) {
87+
if (this == obj) {
88+
return true;
89+
}
90+
if (!(obj instanceof ReplicationQueueId)) {
91+
return false;
92+
}
93+
ReplicationQueueId other = (ReplicationQueueId) obj;
94+
return Objects.equals(peerId, other.peerId) && Objects.equals(serverName, other.serverName)
95+
&& Objects.equals(sourceServerName, other.sourceServerName);
96+
}
97+
98+
@Override
99+
public String toString() {
100+
StringBuilder sb =
101+
new StringBuilder().append(peerId).append(PEER_ID_SEPARATOR).append(serverName);
102+
sourceServerName.ifPresent(s -> sb.append(SERVER_NAME_SEPARATOR).append(s.toString()));
103+
return sb.toString();
104+
}
105+
106+
public static ReplicationQueueId parse(String str) {
107+
int dashIndex = str.indexOf(PEER_ID_SEPARATOR);
108+
String peerId = str.substring(0, dashIndex);
109+
int slashIndex = str.indexOf(SERVER_NAME_SEPARATOR, dashIndex + 1);
110+
if (slashIndex < 0) {
111+
String serverName = str.substring(dashIndex + 1);
112+
return new ReplicationQueueId(ServerName.valueOf(serverName), peerId);
113+
} else {
114+
String serverName = str.substring(dashIndex + 1, slashIndex);
115+
String sourceServerName = str.substring(slashIndex + 1);
116+
return new ReplicationQueueId(ServerName.valueOf(serverName), peerId,
117+
ServerName.valueOf(sourceServerName));
118+
}
119+
}
120+
121+
public static String getPeerId(String str) {
122+
int dashIndex = str.indexOf(PEER_ID_SEPARATOR);
123+
return str.substring(0, dashIndex);
124+
}
125+
126+
public static byte[] getScanPrefix(ServerName serverName, String peerId) {
127+
return Bytes.toBytes(peerId + PEER_ID_SEPARATOR + serverName.toString());
128+
}
129+
130+
public static byte[] getScanPrefix(String peerId) {
131+
return Bytes.toBytes(peerId + PEER_ID_SEPARATOR);
132+
}
133+
134+
private static char getNextChar(char c) {
135+
return (char) ((int) c + 1);
136+
}
137+
138+
public static byte[] getScanStartRowForNextPeerId(String peerId) {
139+
return Bytes.toBytes(peerId + getNextChar(PEER_ID_SEPARATOR));
140+
}
141+
}

0 commit comments

Comments
 (0)