Skip to content

HBASE-26304 Reflect out of band locality improvements in metrics and balancer #3895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions hbase-common/src/main/resources/hbase-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2046,4 +2046,23 @@ possible configurations would overwhelm and obscure the important.
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.enabled</name>
<value>false</value>
<description>
If true, derive StoreFile locality metrics from the underlying DFSInputStream
backing reads for that StoreFile. This value will update as the DFSInputStream's
block locations are updated over time. Otherwise, locality is computed on StoreFile
open, and cached until the StoreFile is closed.
</description>
</property>
<property>
<name>hbase.locality.inputstream.derive.cache.period</name>
<value>60000</value>
<description>
If deriving StoreFile locality metrics from the underlying DFSInputStream, how
long should the derived values be cached for. The derivation process may involve
hitting the namenode, if the DFSInputStream's block list is incomplete.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int buf
this.in = tryOpen();
}

private FSDataInputStream getUnderlyingInputStream() {
return in;
}

@Override
public int read() throws IOException {
int res;
Expand Down Expand Up @@ -475,6 +479,17 @@ public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOExce
return new FSDataInputStream(new FileLinkInputStream(fs, this, bufferSize));
}

/**
* If the passed FSDataInputStream is backed by a FileLink, returns the underlying
* InputStream for the resolved link target. Otherwise, returns null.
*/
public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
if (stream.getWrappedStream() instanceof FileLinkInputStream) {
return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
}
return null;
}

/**
* NOTE: This method must be used only in the constructor!
* It creates a List with the specified locations for the link.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand Down Expand Up @@ -61,7 +64,10 @@
class RegionLocationFinder {
private static final Logger LOG = LoggerFactory.getLogger(RegionLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION = new HDFSBlocksDistribution();
private static final float EPSILON = 0.0001f;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
new HDFSBlocksDistribution();

private Configuration conf;
private volatile ClusterMetrics status;
private MasterServices services;
Expand Down Expand Up @@ -127,12 +133,70 @@ public void setServices(MasterServices services) {

public void setClusterMetrics(ClusterMetrics status) {
long currentTime = EnvironmentEdgeManager.currentTime();
this.status = status;

if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
this.status = status;
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh()?currentTime:lastFullRefresh;
lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
} else {
refreshLocalityChangedRegions(this.status, status);
this.status = status;
}
}

/**
* If locality for a region has changed, that pretty certainly means our cache is out of date.
* Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
*/
private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
if (oldStatus == null || newStatus == null) {
LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}",
oldStatus, newStatus);
return;
}

Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();

Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
for (RegionInfo regionInfo : cache.asMap().keySet()) {
regionsByName.put(regionInfo.getEncodedName(), regionInfo);
}

for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
RegionInfo region = regionsByName.get(encodedName);
if (region == null) {
continue;
}

float newLocality = regionEntry.getValue().getDataLocality();
float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);

if (Math.abs(newLocality - oldLocality) > EPSILON) {
LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
region.getEncodedName(), oldLocality, newLocality);
cache.refresh(region);
}
}

}
}

private float getOldLocality(ServerName newServer, byte[] regionName,
Map<ServerName, ServerMetrics> oldServers) {
ServerMetrics serverMetrics = oldServers.get(newServer);
if (serverMetrics == null) {
return -1f;
}
RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
if (regionMetrics == null) {
return -1f;
}

return regionMetrics.getDataLocality();
}

/**
Expand Down Expand Up @@ -199,8 +263,8 @@ protected HDFSBlocksDistribution internalGetTopBlockLocation(RegionInfo region)
return blocksDistribution;
}
} catch (IOException ioe) {
LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
+ region.getEncodedName(), ioe);
LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}",
region.getEncodedName(), ioe);
}

return EMPTY_BLOCK_DISTRIBUTION;
Expand Down Expand Up @@ -299,7 +363,8 @@ private ListenableFuture<HDFSBlocksDistribution> asyncGetBlockDistribution(
}

public void refreshAndWait(Collection<RegionInfo> hris) {
ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures = new ArrayList<>(hris.size());
ArrayList<ListenableFuture<HDFSBlocksDistribution>> regionLocationFutures =
new ArrayList<>(hris.size());
for (RegionInfo hregionInfo : hris) {
regionLocationFutures.add(asyncGetBlockDistribution(hregionInfo));
}
Expand All @@ -312,9 +377,8 @@ public void refreshAndWait(Collection<RegionInfo> hris) {
} catch (InterruptedException ite) {
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
LOG.debug(
"ExecutionException during HDFSBlocksDistribution computation. for region = "
+ hregionInfo.getEncodedName(), ee);
LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
hregionInfo.getEncodedName(), ee);
}
index++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -122,6 +123,7 @@ public class HStoreFile implements StoreFile {

// StoreFile.Reader
private volatile StoreFileReader initialReader;
private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;

// Block cache configuration and reference.
private final CacheConfig cacheConf;
Expand Down Expand Up @@ -347,7 +349,11 @@ public OptionalLong getBulkLoadTimestamp() {
* file is opened.
*/
public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.fileInfo.getHDFSBlockDistribution();
if (initialReaderBlockDistribution != null) {
return initialReaderBlockDistribution.getHDFSBlockDistribution();
} else {
return this.fileInfo.getHDFSBlockDistribution();
}
}

/**
Expand All @@ -365,6 +371,13 @@ private void open() throws IOException {
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
}
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);

if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
}

// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.FileLink;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Computes the HDFSBlockDistribution for a file based on the underlying located blocks
* for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
* allocating an array of numBlocks size per call. It may also involve calling the namenode, if
* the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure,
* we cache the computed distribution for a configurable period of time.
* <p>
* This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e.
* the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the
* value returned by {@link HStoreFile#getHDFSBlockDistribution()}.
* <p>
* Once the backing FSDataInputStream is closed, we should not expect the distribution result
* to change anymore. This is ok becuase the initialReader's InputStream is only closed when the
* StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
* anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will
* be created for the new initialReader.
*/
@InterfaceAudience.Private
public class InputStreamBlockDistribution {
private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);

private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
"hbase.locality.inputstream.derive.enabled";
private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;

private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
"hbase.locality.inputstream.derive.cache.period";
private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;

private final FSDataInputStream stream;
private final StoreFileInfo fileInfo;
private final int cachePeriodMs;

private HDFSBlocksDistribution hdfsBlocksDistribution;
private long lastCachedAt;
private boolean streamUnsupported;

/**
* This should only be called for the first FSDataInputStream of a StoreFile,
* in {@link HStoreFile#open()}.
*
* @see InputStreamBlockDistribution
* @param stream the input stream to derive locality from
* @param fileInfo the StoreFileInfo for the related store file
*/
public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
this.stream = stream;
this.fileInfo = fileInfo;
this.cachePeriodMs = fileInfo.getConf().getInt(
HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
this.lastCachedAt = EnvironmentEdgeManager.currentTime();
this.streamUnsupported = false;
this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
}

/**
* True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
*/
public static boolean isEnabled(Configuration conf) {
return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
}

/**
* Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
* is expired.
*/
public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
try {
LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
computeBlockDistribution();
} catch (IOException e) {
LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
fileInfo, e);
}
}
return hdfsBlocksDistribution;
}

private void computeBlockDistribution() throws IOException {
lastCachedAt = EnvironmentEdgeManager.currentTime();

FSDataInputStream stream;
if (fileInfo.isLink()) {
stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
} else {
stream = this.stream;
}

if (!(stream instanceof HdfsDataInputStream)) {
if (!streamUnsupported) {
LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
+ "used to derive locality. Falling back on cached value.",
stream, fileInfo, fileInfo.isLink());
streamUnsupported = true;
}
return;
}

streamUnsupported = false;
hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
}

/**
* For tests only, sets lastCachedAt so we can force a refresh
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
synchronized void setLastCachedAt(long timestamp) {
lastCachedAt = timestamp;
}

/**
* For tests only, returns the configured cache period
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
long getCachePeriodMs() {
return cachePeriodMs;
}

/**
* For tests only, returns whether the passed stream is supported
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
boolean isStreamUnsupported() {
return streamUnsupported;
}
}
Loading