Skip to content

Commit 1ed26b9

Browse files
committed
HBASE-26304: Reflect out of band locality improvements in metrics and balancer
1 parent 60254bc commit 1ed26b9

File tree

7 files changed

+309
-11
lines changed

7 files changed

+309
-11
lines changed

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.hadoop.conf.Configured;
3333
import org.apache.hadoop.hbase.ClusterMetrics;
3434
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
35+
import org.apache.hadoop.hbase.RegionMetrics;
36+
import org.apache.hadoop.hbase.ServerMetrics;
3537
import org.apache.hadoop.hbase.ServerName;
3638
import org.apache.hadoop.hbase.TableName;
3739
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -58,6 +60,7 @@
5860
class RegionHDFSBlockLocationFinder extends Configured {
5961
private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
6062
private static final long CACHE_TIME = 240 * 60 * 1000;
63+
private static final float EPSILON = 0.0001f;
6164
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
6265
new HDFSBlocksDistribution();
6366
private volatile ClusterMetrics status;
@@ -110,12 +113,68 @@ void setClusterInfoProvider(ClusterInfoProvider provider) {
110113

111114
void setClusterMetrics(ClusterMetrics status) {
112115
long currentTime = EnvironmentEdgeManager.currentTime();
113-
this.status = status;
116+
114117
if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
118+
this.status = status;
115119
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
116120
lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
121+
} else {
122+
refreshLocalityChangedRegions(this.status, status);
123+
this.status = status;
124+
}
125+
}
126+
127+
/**
128+
* If locality for a region has changed, that pretty certainly means our cache is out of date.
129+
* Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
130+
*/
131+
private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
132+
if (oldStatus == null || newStatus == null) {
133+
LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus, newStatus);
134+
return;
135+
}
136+
137+
Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
138+
Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
139+
140+
Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
141+
for (RegionInfo regionInfo : cache.asMap().keySet()) {
142+
regionsByName.put(regionInfo.getEncodedName(), regionInfo);
143+
}
144+
145+
for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
146+
Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
147+
for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
148+
String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
149+
RegionInfo region = regionsByName.get(encodedName);
150+
if (region == null) {
151+
continue;
152+
}
153+
154+
float newLocality = regionEntry.getValue().getDataLocality();
155+
float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
156+
157+
if (Math.abs(newLocality - oldLocality) > EPSILON) {
158+
LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
159+
region.getEncodedName(), oldLocality, newLocality);
160+
cache.refresh(region);
161+
}
162+
}
163+
164+
}
165+
}
166+
167+
private float getOldLocality(ServerName newServer, byte[] regionName, Map<ServerName, ServerMetrics> oldServers) {
168+
ServerMetrics serverMetrics = oldServers.get(newServer);
169+
if (serverMetrics == null) {
170+
return -1f;
171+
}
172+
RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
173+
if (regionMetrics == null) {
174+
return -1f;
117175
}
118176

177+
return regionMetrics.getDataLocality();
119178
}
120179

121180
/**

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertNotSame;
2223
import static org.junit.Assert.assertNull;
2324
import static org.junit.Assert.assertSame;
2425
import static org.junit.Assert.assertTrue;
@@ -37,6 +38,7 @@
3738
import org.apache.hadoop.hbase.HConstants;
3839
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
3940
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
41+
import org.apache.hadoop.hbase.RegionMetrics;
4042
import org.apache.hadoop.hbase.ServerMetrics;
4143
import org.apache.hadoop.hbase.ServerName;
4244
import org.apache.hadoop.hbase.TableName;
@@ -204,4 +206,56 @@ public void testGetTopBlockLocations() {
204206
}
205207
}
206208
}
209+
210+
@Test
211+
public void testRefreshRegionsWithChangedLocality() {
212+
ServerName testServer = ServerName.valueOf("host-0", 12345, 12345);
213+
RegionInfo testRegion = REGIONS.get(0);
214+
215+
Map<RegionInfo, HDFSBlocksDistribution> cache = new HashMap<>();
216+
for (RegionInfo region : REGIONS) {
217+
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
218+
assertHostAndWeightEquals(generate(region), hbd);
219+
cache.put(region, hbd);
220+
}
221+
222+
finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(), 0.123f));
223+
224+
// everything should be cached, because metrics were null before
225+
for (RegionInfo region : REGIONS) {
226+
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
227+
assertSame(cache.get(region), hbd);
228+
}
229+
230+
finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(), 0.345f));
231+
232+
// locality changed just for our test region, so it should no longer be the same
233+
for (RegionInfo region : REGIONS) {
234+
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
235+
if (region.equals(testRegion)) {
236+
assertNotSame(cache.get(region), hbd);
237+
} else {
238+
assertSame(cache.get(region), hbd);
239+
}
240+
}
241+
}
242+
243+
private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region, float locality) {
244+
RegionMetrics regionMetrics = mock(RegionMetrics.class);
245+
when(regionMetrics.getDataLocality()).thenReturn(locality);
246+
247+
Map<byte[], RegionMetrics> regionMetricsMap = new HashMap<>();
248+
regionMetricsMap.put(region, regionMetrics);
249+
250+
ServerMetrics serverMetrics = mock(ServerMetrics.class);
251+
when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap);
252+
253+
Map<ServerName, ServerMetrics> serverMetricsMap = new HashMap<>();
254+
serverMetricsMap.put(serverName, serverMetrics);
255+
256+
ClusterMetrics metrics = mock(ClusterMetrics.class);
257+
when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
258+
259+
return metrics;
260+
}
207261
}

hbase-common/src/main/resources/hbase-default.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,4 +2042,23 @@ possible configurations would overwhelm and obscure the important.
20422042
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
20432043
</description>
20442044
</property>
2045+
<property>
2046+
<name>hbase.locality.inputstream.derive.enabled</name>
2047+
<value>false</value>
2048+
<description>
2049+
If true, derive StoreFile locality metrics from the underlying DFSInputStream
2050+
backing reads for that StoreFile. This value will update as the DFSInputStream's
2051+
block locations are updated over time. Otherwise, locality is computed on StoreFile
2052+
open, and cached until the StoreFile is closed.
2053+
</description>
2054+
</property>
2055+
<property>
2056+
<name>hbase.locality.inputstream.derive.cache.period</name>
2057+
<value>60000</value>
2058+
<description>
2059+
If deriving StoreFile locality metrics from the underlying DFSInputStream, how
2060+
long should the derived values be cached for. The derivation process may involve
2061+
hitting the namenode, if the DFSInputStream's block list is incomplete.
2062+
</description>
2063+
</property>
20452064
</configuration>

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Set;
3030
import java.util.concurrent.atomic.AtomicBoolean;
3131
import org.apache.hadoop.conf.Configuration;
32+
import org.apache.hadoop.fs.FSDataInputStream;
3233
import org.apache.hadoop.fs.FileSystem;
3334
import org.apache.hadoop.fs.Path;
3435
import org.apache.hadoop.hbase.Cell;
@@ -43,6 +44,7 @@
4344
import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
4445
import org.apache.hadoop.hbase.util.BloomFilterFactory;
4546
import org.apache.hadoop.hbase.util.Bytes;
47+
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
4648
import org.apache.yetus.audience.InterfaceAudience;
4749
import org.slf4j.Logger;
4850
import org.slf4j.LoggerFactory;
@@ -127,6 +129,7 @@ public class HStoreFile implements StoreFile {
127129

128130
// StoreFile.Reader
129131
private volatile StoreFileReader initialReader;
132+
private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;
130133

131134
// Block cache configuration and reference.
132135
private final CacheConfig cacheConf;
@@ -344,7 +347,11 @@ public OptionalLong getBulkLoadTimestamp() {
344347
* file is opened.
345348
*/
346349
public HDFSBlocksDistribution getHDFSBlockDistribution() {
347-
return this.fileInfo.getHDFSBlockDistribution();
350+
if (initialReaderBlockDistribution != null) {
351+
return initialReaderBlockDistribution.getHDFSBlockDistribution();
352+
} else {
353+
return this.fileInfo.getHDFSBlockDistribution();
354+
}
348355
}
349356

350357
/**
@@ -362,6 +369,15 @@ private void open() throws IOException {
362369
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
363370
}
364371
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
372+
373+
if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
374+
boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
375+
FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
376+
if (stream instanceof HdfsDataInputStream) {
377+
this.initialReaderBlockDistribution = new InputStreamBlockDistribution((HdfsDataInputStream) stream, fileInfo);
378+
}
379+
}
380+
365381
// Load up indices and fileinfo. This also loads Bloom filter type.
366382
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
367383

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.hadoop.hbase.regionserver;
17+
18+
import java.io.IOException;
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
21+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
22+
import org.apache.hadoop.hbase.util.FSUtils;
23+
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Computes the HDFSBlockDistribution for a file based on the underlying located blocks
30+
* for an HdfsDataInputStream reading that file. This computation may involve a call to
31+
* the namenode, so the value is cached based on
32+
* {@link #HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD}.
33+
*/
34+
@InterfaceAudience.Private
35+
public class InputStreamBlockDistribution {
36+
private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
37+
38+
private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
39+
"hbase.locality.inputstream.derive.enabled";
40+
private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
41+
42+
private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
43+
"hbase.locality.inputstream.derive.cache.period";
44+
private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
45+
46+
private final HdfsDataInputStream stream;
47+
private final StoreFileInfo fileInfo;
48+
private final int cachePeriodMs;
49+
50+
private HDFSBlocksDistribution hdfsBlocksDistribution;
51+
private long lastCachedAt;
52+
53+
public InputStreamBlockDistribution(HdfsDataInputStream stream, StoreFileInfo fileInfo)
54+
throws IOException {
55+
this.stream = stream;
56+
this.fileInfo = fileInfo;
57+
this.cachePeriodMs = fileInfo.getConf().getInt(HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
58+
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
59+
computeBlockDistribution();
60+
}
61+
62+
/**
63+
* True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
64+
*/
65+
public static boolean isEnabled(Configuration conf) {
66+
return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
67+
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
68+
}
69+
70+
/**
71+
* Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
72+
* is expired.
73+
*/
74+
public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
75+
if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
76+
try {
77+
LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
78+
computeBlockDistribution();
79+
} catch (IOException e) {
80+
LOG.warn("Failed to recompute block distribution for {}, falling back on last known value",
81+
fileInfo, e);
82+
}
83+
}
84+
return hdfsBlocksDistribution;
85+
}
86+
87+
private void computeBlockDistribution() throws IOException {
88+
lastCachedAt = EnvironmentEdgeManager.currentTime();
89+
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(stream);
90+
}
91+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@
8181
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
8282
import org.apache.hadoop.hdfs.DFSUtil;
8383
import org.apache.hadoop.hdfs.DistributedFileSystem;
84+
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
85+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
86+
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
87+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
8488
import org.apache.hadoop.io.IOUtils;
8589
import org.apache.hadoop.ipc.RemoteException;
8690
import org.apache.hadoop.util.Progressable;
@@ -703,6 +707,38 @@ public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOExc
703707
return fs.exists(metaRegionDir);
704708
}
705709

710+
/**
711+
* Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
712+
* are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
713+
* This method retrieves those blocks from the input stream and uses them to calculate
714+
* HDFSBlockDistribution.
715+
*
716+
* The underlying method in DFSInputStream does attempt to use locally cached blocks, but
717+
* may hit the namenode if the cache is determined to be incomplete. The method also involves
718+
* making copies of all LocatedBlocks rather than return the underlying blocks themselves.
719+
*/
720+
static public HDFSBlocksDistribution computeHDFSBlocksDistribution(HdfsDataInputStream inputStream)
721+
throws IOException {
722+
List<LocatedBlock> blocks = inputStream.getAllBlocks();
723+
HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
724+
for (LocatedBlock block : blocks) {
725+
String[] hosts = getHostsForLocations(block);
726+
long len = block.getBlockSize();
727+
StorageType[] storageTypes = block.getStorageTypes();
728+
blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
729+
}
730+
return blocksDistribution;
731+
}
732+
733+
private static String[] getHostsForLocations(LocatedBlock block) {
734+
DatanodeInfo[] locations = block.getLocations();
735+
String[] hosts = new String[locations.length];
736+
for (int i = 0; i < hosts.length; i++) {
737+
hosts[i] = locations[i].getHostName();
738+
}
739+
return hosts;
740+
}
741+
706742
/**
707743
* Compute HDFS blocks distribution of a given file, or a portion of the file
708744
* @param fs file system

0 commit comments

Comments
 (0)