Skip to content

Commit c25a63f

Browse files
committed
HBASE-27775 Use a separate WAL provider for hbase:replication table (#5157)
Signed-off-by: Liangjun He <heliangjun@apache.org>
1 parent 0fcc0f2 commit c25a63f

File tree

6 files changed

+260
-72
lines changed

6 files changed

+260
-72
lines changed

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,4 +136,10 @@ public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn
136136
return ReflectionUtils.newInstance(clazz, conf, tableName);
137137
}
138138
}
139+
140+
public static boolean isReplicationQueueTable(Configuration conf, TableName tableName) {
141+
TableName replicationQueueTableName = TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
142+
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
143+
return replicationQueueTableName.equals(tableName);
144+
}
139145
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -982,12 +982,12 @@ synchronized public void run() {
982982

983983
lastRan = currentTime;
984984

985-
final WALProvider provider = regionServer.getWalFactory().getWALProvider();
986-
final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider();
987-
numWALFiles = (provider == null ? 0 : provider.getNumLogFiles())
988-
+ (metaProvider == null ? 0 : metaProvider.getNumLogFiles());
989-
walFileSize = (provider == null ? 0 : provider.getLogFileSize())
990-
+ (metaProvider == null ? 0 : metaProvider.getLogFileSize());
985+
List<WALProvider> providers = regionServer.getWalFactory().getAllWALProviders();
986+
for (WALProvider provider : providers) {
987+
numWALFiles += provider.getNumLogFiles();
988+
walFileSize += provider.getLogFileSize();
989+
}
990+
991991
// Copy over computed values so that no thread sees half computed values.
992992
numStores = tempNumStores;
993993
numStoreFiles = tempNumStoreFiles;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.hbase.Abortable;
25+
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
26+
import org.apache.hadoop.hbase.wal.WALFactory.Providers;
27+
import org.apache.yetus.audience.InterfaceAudience;
28+
29+
/**
30+
* A lazy initialized WAL provider for holding the WALProvider for some special tables, such as
31+
* hbase:meta, hbase:replication, etc.
32+
*/
33+
@InterfaceAudience.Private
34+
class LazyInitializedWALProvider implements Closeable {
35+
36+
private final WALFactory factory;
37+
38+
private final String providerId;
39+
40+
private final String providerConfigName;
41+
42+
private final Abortable abortable;
43+
44+
private final AtomicReference<WALProvider> holder = new AtomicReference<>();
45+
46+
LazyInitializedWALProvider(WALFactory factory, String providerId, String providerConfigName,
47+
Abortable abortable) {
48+
this.factory = factory;
49+
this.providerId = providerId;
50+
this.providerConfigName = providerConfigName;
51+
this.abortable = abortable;
52+
}
53+
54+
WALProvider getProvider() throws IOException {
55+
Configuration conf = factory.getConf();
56+
for (;;) {
57+
WALProvider provider = this.holder.get();
58+
if (provider != null) {
59+
return provider;
60+
}
61+
Class<? extends WALProvider> clz = null;
62+
if (conf.get(providerConfigName) == null) {
63+
try {
64+
clz = conf.getClass(WALFactory.WAL_PROVIDER, Providers.defaultProvider.clazz,
65+
WALProvider.class);
66+
} catch (Throwable t) {
67+
// the WAL provider should be an enum. Proceed
68+
}
69+
}
70+
if (clz == null) {
71+
clz = factory.getProviderClass(providerConfigName,
72+
conf.get(WALFactory.WAL_PROVIDER, WALFactory.DEFAULT_WAL_PROVIDER));
73+
}
74+
provider = WALFactory.createProvider(clz);
75+
provider.init(factory, conf, providerId, this.abortable);
76+
provider.addWALActionsListener(new MetricsWAL());
77+
if (this.holder.compareAndSet(null, provider)) {
78+
return provider;
79+
} else {
80+
// someone is ahead of us, close and try again.
81+
provider.close();
82+
}
83+
}
84+
}
85+
86+
/**
87+
* Get the provider if it already initialized, otherwise just return {@code null} instead of
88+
* creating it.
89+
*/
90+
WALProvider getProviderNoCreate() {
91+
return holder.get();
92+
}
93+
94+
@Override
95+
public void close() throws IOException {
96+
WALProvider provider = this.holder.get();
97+
if (provider != null) {
98+
provider.close();
99+
}
100+
}
101+
102+
void shutdown() throws IOException {
103+
WALProvider provider = this.holder.get();
104+
if (provider != null) {
105+
provider.shutdown();
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)