Skip to content

Commit d287c16

Browse files
committed
HBASE-26304: Reflect out of band locality improvements in metrics and balancer
1 parent 558ab92 commit d287c16

File tree

10 files changed

+574
-11
lines changed

10 files changed

+574
-11
lines changed

hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.hadoop.conf.Configured;
3333
import org.apache.hadoop.hbase.ClusterMetrics;
3434
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
35+
import org.apache.hadoop.hbase.RegionMetrics;
36+
import org.apache.hadoop.hbase.ServerMetrics;
3537
import org.apache.hadoop.hbase.ServerName;
3638
import org.apache.hadoop.hbase.TableName;
3739
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -58,6 +60,7 @@
5860
class RegionHDFSBlockLocationFinder extends Configured {
5961
private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
6062
private static final long CACHE_TIME = 240 * 60 * 1000;
63+
private static final float EPSILON = 0.0001f;
6164
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
6265
new HDFSBlocksDistribution();
6366
private volatile ClusterMetrics status;
@@ -110,12 +113,68 @@ void setClusterInfoProvider(ClusterInfoProvider provider) {
110113

111114
void setClusterMetrics(ClusterMetrics status) {
112115
long currentTime = EnvironmentEdgeManager.currentTime();
113-
this.status = status;
116+
114117
if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
118+
this.status = status;
115119
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
116120
lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
121+
} else {
122+
refreshLocalityChangedRegions(this.status, status);
123+
this.status = status;
124+
}
125+
}
126+
127+
/**
128+
* If locality for a region has changed, that pretty certainly means our cache is out of date.
129+
* Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
130+
*/
131+
private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
132+
if (oldStatus == null || newStatus == null) {
133+
LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}", oldStatus, newStatus);
134+
return;
135+
}
136+
137+
Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
138+
Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
139+
140+
Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
141+
for (RegionInfo regionInfo : cache.asMap().keySet()) {
142+
regionsByName.put(regionInfo.getEncodedName(), regionInfo);
143+
}
144+
145+
for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
146+
Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
147+
for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
148+
String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
149+
RegionInfo region = regionsByName.get(encodedName);
150+
if (region == null) {
151+
continue;
152+
}
153+
154+
float newLocality = regionEntry.getValue().getDataLocality();
155+
float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
156+
157+
if (Math.abs(newLocality - oldLocality) > EPSILON) {
158+
LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
159+
region.getEncodedName(), oldLocality, newLocality);
160+
cache.refresh(region);
161+
}
162+
}
163+
164+
}
165+
}
166+
167+
private float getOldLocality(ServerName newServer, byte[] regionName, Map<ServerName, ServerMetrics> oldServers) {
168+
ServerMetrics serverMetrics = oldServers.get(newServer);
169+
if (serverMetrics == null) {
170+
return -1f;
171+
}
172+
RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
173+
if (regionMetrics == null) {
174+
return -1f;
117175
}
118176

177+
return regionMetrics.getDataLocality();
119178
}
120179

121180
/**

hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertNotSame;
2223
import static org.junit.Assert.assertNull;
2324
import static org.junit.Assert.assertSame;
2425
import static org.junit.Assert.assertTrue;
@@ -37,6 +38,7 @@
3738
import org.apache.hadoop.hbase.HConstants;
3839
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
3940
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
41+
import org.apache.hadoop.hbase.RegionMetrics;
4042
import org.apache.hadoop.hbase.ServerMetrics;
4143
import org.apache.hadoop.hbase.ServerName;
4244
import org.apache.hadoop.hbase.TableName;
@@ -204,4 +206,56 @@ public void testGetTopBlockLocations() {
204206
}
205207
}
206208
}
209+
210+
@Test
211+
public void testRefreshRegionsWithChangedLocality() {
212+
ServerName testServer = ServerName.valueOf("host-0", 12345, 12345);
213+
RegionInfo testRegion = REGIONS.get(0);
214+
215+
Map<RegionInfo, HDFSBlocksDistribution> cache = new HashMap<>();
216+
for (RegionInfo region : REGIONS) {
217+
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
218+
assertHostAndWeightEquals(generate(region), hbd);
219+
cache.put(region, hbd);
220+
}
221+
222+
finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(), 0.123f));
223+
224+
// everything should be cached, because metrics were null before
225+
for (RegionInfo region : REGIONS) {
226+
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
227+
assertSame(cache.get(region), hbd);
228+
}
229+
230+
finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(), 0.345f));
231+
232+
// locality changed just for our test region, so it should no longer be the same
233+
for (RegionInfo region : REGIONS) {
234+
HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
235+
if (region.equals(testRegion)) {
236+
assertNotSame(cache.get(region), hbd);
237+
} else {
238+
assertSame(cache.get(region), hbd);
239+
}
240+
}
241+
}
242+
243+
private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region, float locality) {
244+
RegionMetrics regionMetrics = mock(RegionMetrics.class);
245+
when(regionMetrics.getDataLocality()).thenReturn(locality);
246+
247+
Map<byte[], RegionMetrics> regionMetricsMap = new HashMap<>();
248+
regionMetricsMap.put(region, regionMetrics);
249+
250+
ServerMetrics serverMetrics = mock(ServerMetrics.class);
251+
when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap);
252+
253+
Map<ServerName, ServerMetrics> serverMetricsMap = new HashMap<>();
254+
serverMetricsMap.put(serverName, serverMetrics);
255+
256+
ClusterMetrics metrics = mock(ClusterMetrics.class);
257+
when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
258+
259+
return metrics;
260+
}
207261
}

hbase-common/src/main/resources/hbase-default.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,4 +2042,23 @@ possible configurations would overwhelm and obscure the important.
20422042
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
20432043
</description>
20442044
</property>
2045+
<property>
2046+
<name>hbase.locality.inputstream.derive.enabled</name>
2047+
<value>false</value>
2048+
<description>
2049+
If true, derive StoreFile locality metrics from the underlying DFSInputStream
2050+
backing reads for that StoreFile. This value will update as the DFSInputStream's
2051+
block locations are updated over time. Otherwise, locality is computed on StoreFile
2052+
open, and cached until the StoreFile is closed.
2053+
</description>
2054+
</property>
2055+
<property>
2056+
<name>hbase.locality.inputstream.derive.cache.period</name>
2057+
<value>60000</value>
2058+
<description>
2059+
If deriving StoreFile locality metrics from the underlying DFSInputStream, how
2060+
long should the derived values be cached for. The derivation process may involve
2061+
hitting the namenode, if the DFSInputStream's block list is incomplete.
2062+
</description>
2063+
</property>
20452064
</configuration>

hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int buf
126126
this.in = tryOpen();
127127
}
128128

129+
private FSDataInputStream getUnderlyingInputStream() {
130+
return in;
131+
}
132+
129133
@Override
130134
public int read() throws IOException {
131135
int res;
@@ -475,6 +479,17 @@ public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOExce
475479
return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
476480
}
477481

482+
/**
483+
* If the passed FSDataInputStream is backed by a FileLink, returns the underlying
484+
* InputStream for the resolved link target. Otherwise, returns null.
485+
*/
486+
public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
487+
if (stream.getWrappedStream() instanceof FileLinkInputStream) {
488+
return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
489+
}
490+
return null;
491+
}
492+
478493
/**
479494
* NOTE: This method must be used only in the constructor!
480495
* It creates a List with the specified locations for the link.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Set;
3030
import java.util.concurrent.atomic.AtomicBoolean;
3131
import org.apache.hadoop.conf.Configuration;
32+
import org.apache.hadoop.fs.FSDataInputStream;
3233
import org.apache.hadoop.fs.FileSystem;
3334
import org.apache.hadoop.fs.Path;
3435
import org.apache.hadoop.hbase.Cell;
@@ -43,6 +44,7 @@
4344
import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
4445
import org.apache.hadoop.hbase.util.BloomFilterFactory;
4546
import org.apache.hadoop.hbase.util.Bytes;
47+
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
4648
import org.apache.yetus.audience.InterfaceAudience;
4749
import org.slf4j.Logger;
4850
import org.slf4j.LoggerFactory;
@@ -127,6 +129,7 @@ public class HStoreFile implements StoreFile {
127129

128130
// StoreFile.Reader
129131
private volatile StoreFileReader initialReader;
132+
private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;
130133

131134
// Block cache configuration and reference.
132135
private final CacheConfig cacheConf;
@@ -344,7 +347,11 @@ public OptionalLong getBulkLoadTimestamp() {
344347
* file is opened.
345348
*/
346349
public HDFSBlocksDistribution getHDFSBlockDistribution() {
347-
return this.fileInfo.getHDFSBlockDistribution();
350+
if (initialReaderBlockDistribution != null) {
351+
return initialReaderBlockDistribution.getHDFSBlockDistribution();
352+
} else {
353+
return this.fileInfo.getHDFSBlockDistribution();
354+
}
348355
}
349356

350357
/**
@@ -362,6 +369,13 @@ private void open() throws IOException {
362369
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
363370
}
364371
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
372+
373+
if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
374+
boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
375+
FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
376+
this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
377+
}
378+
365379
// Load up indices and fileinfo. This also loads Bloom filter type.
366380
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
367381

0 commit comments

Comments
 (0)