Skip to content

Commit c671168

Browse files
committed
HBASE-27212 Implement a new table based replication queue storage and make the minimum replication system work
1 parent f8dcf07 commit c671168

File tree

70 files changed

+2698
-3045
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+2698
-3045
lines changed

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -676,16 +676,13 @@ message ClaimReplicationQueueRemoteStateData {
676676
required ServerName crashed_server = 1;
677677
required string queue = 2;
678678
required ServerName target_server = 3;
679+
optional ServerName source_server = 4;
679680
}
680681

681682
message ClaimReplicationQueueRemoteParameter {
682683
required ServerName crashed_server = 1;
683684
required string queue = 2;
684-
}
685-
686-
enum ClaimReplicationQueuesState {
687-
CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
688-
CLAIM_REPLICATION_QUEUES_FINISH = 2;
685+
optional ServerName source_server = 3;
689686
}
690687

691688
enum ModifyTableDescriptorState {
@@ -712,3 +709,13 @@ message ModifyStoreFileTrackerStateData {
712709
message ModifyColumnFamilyStoreFileTrackerStateData {
713710
required bytes family = 1;
714711
}
712+
713+
enum AssignReplicationQueuesState {
714+
ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1;
715+
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2;
716+
ASSIGN_REPLICATION_QUEUES_CLAIM = 3;
717+
}
718+
719+
message AssignReplicationQueuesStateData {
720+
required ServerName crashed_server = 1;
721+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
@InterfaceAudience.Private
23+
public class ReplicationGroupOffset {
24+
25+
public static final ReplicationGroupOffset BEGIN = new ReplicationGroupOffset("", 0L);
26+
27+
private final String wal;
28+
29+
private final long offset;
30+
31+
public ReplicationGroupOffset(String wal, long offset) {
32+
this.wal = wal;
33+
this.offset = offset;
34+
}
35+
36+
public String getWal() {
37+
return wal;
38+
}
39+
40+
/**
41+
* A negative value means this file has already been fully replicated out
42+
*/
43+
public long getOffset() {
44+
return offset;
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return wal + ":" + offset;
50+
}
51+
52+
public static ReplicationGroupOffset parse(String str) {
53+
int index = str.lastIndexOf(':');
54+
return new ReplicationGroupOffset(str.substring(0, index),
55+
Long.parseLong(str.substring(index + 1)));
56+
}
57+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
23+
24+
/**
25+
* Representing all the information for a replication queue.
26+
*/
27+
@InterfaceAudience.Private
28+
public class ReplicationQueueData {
29+
30+
private final ReplicationQueueId id;
31+
32+
private final ImmutableMap<String, ReplicationGroupOffset> offsets;
33+
34+
public ReplicationQueueData(ReplicationQueueId id,
35+
ImmutableMap<String, ReplicationGroupOffset> offsets) {
36+
this.id = id;
37+
this.offsets = offsets;
38+
}
39+
40+
public ReplicationQueueId getId() {
41+
return id;
42+
}
43+
44+
public ImmutableMap<String, ReplicationGroupOffset> getOffsets() {
45+
return offsets;
46+
}
47+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication;
19+
20+
import java.util.Objects;
21+
import java.util.Optional;
22+
import org.apache.hadoop.hbase.ServerName;
23+
import org.apache.hadoop.hbase.util.Bytes;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
26+
@InterfaceAudience.Private
27+
public class ReplicationQueueId {
28+
29+
private final ServerName serverName;
30+
31+
private final String peerId;
32+
33+
private final Optional<ServerName> sourceServerName;
34+
35+
public ReplicationQueueId(ServerName serverName, String peerId) {
36+
this.serverName = Objects.requireNonNull(serverName);
37+
this.peerId = Objects.requireNonNull(peerId);
38+
this.sourceServerName = Optional.empty();
39+
}
40+
41+
public ReplicationQueueId(ServerName serverName, String peerId, ServerName sourceServerName) {
42+
this.serverName = Objects.requireNonNull(serverName);
43+
this.peerId = Objects.requireNonNull(peerId);
44+
this.sourceServerName = Optional.of(sourceServerName);
45+
}
46+
47+
public ServerName getServerName() {
48+
return serverName;
49+
}
50+
51+
public String getPeerId() {
52+
return peerId;
53+
}
54+
55+
public Optional<ServerName> getSourceServerName() {
56+
return sourceServerName;
57+
}
58+
59+
public ServerName getServerWALsBelongTo() {
60+
return sourceServerName.orElse(serverName);
61+
}
62+
63+
public boolean isRecovered() {
64+
return sourceServerName.isPresent();
65+
}
66+
67+
public ReplicationQueueId claim(ServerName targetServerName) {
68+
ServerName newSourceServerName = sourceServerName.orElse(serverName);
69+
return new ReplicationQueueId(targetServerName, peerId, newSourceServerName);
70+
}
71+
72+
@Override
73+
public int hashCode() {
74+
return Objects.hash(peerId, serverName, sourceServerName);
75+
}
76+
77+
@Override
78+
public boolean equals(Object obj) {
79+
if (this == obj) {
80+
return true;
81+
}
82+
if (!(obj instanceof ReplicationQueueId)) {
83+
return false;
84+
}
85+
ReplicationQueueId other = (ReplicationQueueId) obj;
86+
return Objects.equals(peerId, other.peerId) && Objects.equals(serverName, other.serverName)
87+
&& Objects.equals(sourceServerName, other.sourceServerName);
88+
}
89+
90+
@Override
91+
public String toString() {
92+
StringBuilder sb = new StringBuilder().append(peerId).append('-').append(serverName);
93+
sourceServerName.ifPresent(s -> sb.append('\t').append(s.toString()));
94+
return sb.toString();
95+
}
96+
97+
public static ReplicationQueueId parse(String str) {
98+
int dashIndex = str.indexOf('-');
99+
String peerId = str.substring(0, dashIndex);
100+
int tabIndex = str.indexOf('\t', dashIndex + 1);
101+
if (tabIndex < 0) {
102+
String serverName = str.substring(dashIndex + 1);
103+
return new ReplicationQueueId(ServerName.valueOf(serverName), peerId);
104+
} else {
105+
String serverName = str.substring(dashIndex + 1, tabIndex);
106+
String sourceServerName = str.substring(tabIndex + 1);
107+
return new ReplicationQueueId(ServerName.valueOf(serverName), peerId,
108+
ServerName.valueOf(sourceServerName));
109+
}
110+
}
111+
112+
public static String getPeerId(String str) {
113+
int dashIndex = str.indexOf('-');
114+
return str.substring(0, dashIndex);
115+
}
116+
117+
public static byte[] getScanPrefix(ServerName serverName, String peerId) {
118+
return Bytes.toBytes(peerId + "-" + serverName.toString());
119+
}
120+
121+
public static byte[] getScanPrefix(String peerId) {
122+
return Bytes.toBytes(peerId + "-");
123+
}
124+
125+
public static byte[] getScanStartRowForNextPeerId(String peerId) {
126+
// '.' is the next char after '-'
127+
return Bytes.toBytes(peerId + ".");
128+
}
129+
}

0 commit comments

Comments
 (0)