Skip to content

HBASE-28186 Rebase CacheAwareBalance related commits #5551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,16 @@ public CompactionState getCompactionState() {
return metrics.getCompactionState();
}

@Override
public Size getRegionSizeMB() {
return metrics.getRegionSizeMB();
}

@Override
public float getCurrentRegionCachedRatio() {
return metrics.getCurrentRegionCachedRatio();
}

/**
* @see java.lang.Object#toString()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,10 @@ default String getNameAsString() {

/** Returns the compaction state of this region */
CompactionState getCompactionState();

/** Returns the total size of the hfiles in the region */
Size getRegionSizeMB();

/** Returns current prefetch ratio of this region on this server */
float getCurrentRegionCachedRatio();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public static RegionMetrics toRegionMetrics(ClusterStatusProtos.RegionLoad regio
ClusterStatusProtos.StoreSequenceId::getSequenceId)))
.setUncompressedStoreFileSize(
new Size(regionLoadPB.getStoreUncompressedSizeMB(), Size.Unit.MEGABYTE))
.build();
.setRegionSizeMB(new Size(regionLoadPB.getRegionSizeMB(), Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionLoadPB.getCurrentRegionCachedRatio()).build();
}

private static List<ClusterStatusProtos.StoreSequenceId>
Expand Down Expand Up @@ -118,7 +119,8 @@ public static ClusterStatusProtos.RegionLoad toRegionLoad(RegionMetrics regionMe
.addAllStoreCompleteSequenceId(toStoreSequenceId(regionMetrics.getStoreSequenceId()))
.setStoreUncompressedSizeMB(
(int) regionMetrics.getUncompressedStoreFileSize().get(Size.Unit.MEGABYTE))
.build();
.setRegionSizeMB((int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE))
.setCurrentRegionCachedRatio(regionMetrics.getCurrentRegionCachedRatio()).build();
}

public static RegionMetricsBuilder newBuilder(byte[] name) {
Expand Down Expand Up @@ -151,6 +153,8 @@ public static RegionMetricsBuilder newBuilder(byte[] name) {
private long blocksLocalWithSsdWeight;
private long blocksTotalWeight;
private CompactionState compactionState;
private Size regionSizeMB = Size.ZERO;
private float currentRegionCachedRatio;

private RegionMetricsBuilder(byte[] name) {
this.name = name;
Expand Down Expand Up @@ -281,14 +285,24 @@ public RegionMetricsBuilder setCompactionState(CompactionState compactionState)
return this;
}

public RegionMetricsBuilder setRegionSizeMB(Size value) {
this.regionSizeMB = value;
return this;
}

public RegionMetricsBuilder setCurrentRegionCachedRatio(float value) {
this.currentRegionCachedRatio = value;
return this;
}

public RegionMetrics build() {
return new RegionMetricsImpl(name, storeCount, storeFileCount, storeRefCount,
maxCompactedStoreFileRefCount, compactingCellCount, compactedCellCount, storeFileSize,
memStoreSize, indexSize, rootLevelIndexSize, uncompressedDataIndexSize, bloomFilterSize,
uncompressedStoreFileSize, writeRequestCount, readRequestCount, filteredReadRequestCount,
completedSequenceId, storeSequenceIds, dataLocality, lastMajorCompactionTimestamp,
dataLocalityForSsd, blocksLocalWeight, blocksLocalWithSsdWeight, blocksTotalWeight,
compactionState);
compactionState, regionSizeMB, currentRegionCachedRatio);
}

private static class RegionMetricsImpl implements RegionMetrics {
Expand Down Expand Up @@ -318,6 +332,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
private final long blocksLocalWithSsdWeight;
private final long blocksTotalWeight;
private final CompactionState compactionState;
private final Size regionSizeMB;
private final float currentRegionCachedRatio;

RegionMetricsImpl(byte[] name, int storeCount, int storeFileCount, int storeRefCount,
int maxCompactedStoreFileRefCount, final long compactingCellCount, long compactedCellCount,
Expand All @@ -326,7 +342,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
long writeRequestCount, long readRequestCount, long filteredReadRequestCount,
long completedSequenceId, Map<byte[], Long> storeSequenceIds, float dataLocality,
long lastMajorCompactionTimestamp, float dataLocalityForSsd, long blocksLocalWeight,
long blocksLocalWithSsdWeight, long blocksTotalWeight, CompactionState compactionState) {
long blocksLocalWithSsdWeight, long blocksTotalWeight, CompactionState compactionState,
Size regionSizeMB, float currentRegionCachedRatio) {
this.name = Preconditions.checkNotNull(name);
this.storeCount = storeCount;
this.storeFileCount = storeFileCount;
Expand All @@ -353,6 +370,8 @@ private static class RegionMetricsImpl implements RegionMetrics {
this.blocksLocalWithSsdWeight = blocksLocalWithSsdWeight;
this.blocksTotalWeight = blocksTotalWeight;
this.compactionState = compactionState;
this.regionSizeMB = regionSizeMB;
this.currentRegionCachedRatio = currentRegionCachedRatio;
}

@Override
Expand Down Expand Up @@ -485,6 +504,16 @@ public CompactionState getCompactionState() {
return compactionState;
}

@Override
public Size getRegionSizeMB() {
return regionSizeMB;
}

@Override
public float getCurrentRegionCachedRatio() {
return currentRegionCachedRatio;
}

@Override
public String toString() {
StringBuilder sb =
Expand Down Expand Up @@ -524,6 +553,8 @@ public String toString() {
Strings.appendKeyValue(sb, "blocksLocalWithSsdWeight", blocksLocalWithSsdWeight);
Strings.appendKeyValue(sb, "blocksTotalWeight", blocksTotalWeight);
Strings.appendKeyValue(sb, "compactionState", compactionState);
Strings.appendKeyValue(sb, "regionSizeMB", regionSizeMB);
Strings.appendKeyValue(sb, "currentRegionCachedRatio", currentRegionCachedRatio);
return sb.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ public List<ServerTask> getTasks() {
return metrics.getTasks();
}

@Override
public Map<String, Integer> getRegionCachedInfo() {
return metrics.getRegionCachedInfo();
}

/**
* Originally, this method factored in the effect of requests going to the server as well.
* However, this does not interact very well with the current region rebalancing code, which only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,10 @@ default String getVersion() {
@Nullable
List<ServerTask> getTasks();

/**
* Returns the region cache information for the regions hosted on this server
* @return map of region encoded name and the size of the region cached on this region server
* rounded to MB
*/
Map<String, Integer> getRegionCachedInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu
: null)
.setTasks(serverLoadPB.getTasksList().stream().map(ProtobufUtil::getServerTask)
.collect(Collectors.toList()))
.setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap())
.setReportTimestamp(serverLoadPB.getReportEndTime())
.setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber)
.setVersion(version).build();
Expand All @@ -109,6 +110,7 @@ public static ClusterStatusProtos.ServerLoad toServerLoad(ServerMetrics metrics)
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.addAllTasks(
metrics.getTasks().stream().map(ProtobufUtil::toServerTask).collect(Collectors.toList()))
.putAllRegionCachedInfo(metrics.getRegionCachedInfo())
.setReportStartTime(metrics.getLastReportTimestamp())
.setReportEndTime(metrics.getReportTimestamp());
if (metrics.getReplicationLoadSink() != null) {
Expand Down Expand Up @@ -138,6 +140,7 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) {
private long reportTimestamp = EnvironmentEdgeManager.currentTime();
private long lastReportTimestamp = 0;
private final List<ServerTask> tasks = new ArrayList<>();
private Map<String, Integer> regionCachedInfo = new HashMap<>();

private ServerMetricsBuilder(ServerName serverName) {
this.serverName = serverName;
Expand Down Expand Up @@ -218,10 +221,15 @@ public ServerMetricsBuilder setTasks(List<ServerTask> tasks) {
return this;
}

public ServerMetricsBuilder setRegionCachedInfo(Map<String, Integer> value) {
this.regionCachedInfo = value;
return this;
}

public ServerMetrics build() {
return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond,
requestCount, usedHeapSize, maxHeapSize, infoServerPort, sources, sink, regionStatus,
coprocessorNames, reportTimestamp, lastReportTimestamp, userMetrics, tasks);
coprocessorNames, reportTimestamp, lastReportTimestamp, userMetrics, tasks, regionCachedInfo);
}

private static class ServerMetricsImpl implements ServerMetrics {
Expand All @@ -242,12 +250,14 @@ private static class ServerMetricsImpl implements ServerMetrics {
private final long lastReportTimestamp;
private final Map<byte[], UserMetrics> userMetrics;
private final List<ServerTask> tasks;
private final Map<String, Integer> regionCachedInfo;

ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks) {
long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics, List<ServerTask> tasks,
Map<String, Integer> regionCachedInfo) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
Expand All @@ -264,6 +274,7 @@ private static class ServerMetricsImpl implements ServerMetrics {
this.reportTimestamp = reportTimestamp;
this.lastReportTimestamp = lastReportTimestamp;
this.tasks = tasks;
this.regionCachedInfo = regionCachedInfo;
}

@Override
Expand Down Expand Up @@ -356,6 +367,11 @@ public List<ServerTask> getTasks() {
return tasks;
}

@Override
public Map<String, Integer> getRegionCachedInfo() {
return Collections.unmodifiableMap(regionCachedInfo);
}

@Override
public String toString() {
int storeCount = 0;
Expand Down
12 changes: 12 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,18 @@ public enum OperationStatusCode {
*/
public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";

/**
* If the chosen ioengine can persist its state across restarts, the path to the file to persist
* to. This file is NOT the data file. It is a file into which we will serialize the map of what
* is in the data file. For example, if you pass the following argument as
* BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
* <code>file:/tmp/bucketcache.data </code>, then we will write the bucketcache data to the file
* <code>/tmp/bucketcache.data</code> but the metadata on where the data is in the supplied file
* is an in-memory map that needs to be persisted across restarts. Where to store this in-memory
* state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
*/
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path";

/**
* HConstants for fast fail on the client side follow
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ message BucketCacheEntry {
map<int32, string> deserializers = 4;
required BackingMap backing_map = 5;
optional bytes checksum = 6;
map<string, bool> prefetched_files = 7;
map<string, RegionFileSizeMap> cached_files = 7;
}

message BackingMap {
Expand Down Expand Up @@ -81,3 +81,9 @@ enum BlockPriority {
multi = 1;
memory = 2;
}

message RegionFileSizeMap {
required string region_name = 1;
required uint64 region_cached_size = 2;
}

11 changes: 11 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ message RegionLoad {
MAJOR = 2;
MAJOR_AND_MINOR = 3;
}

/** Total region size in MB */
optional uint32 region_size_MB = 28;

/** Current region cache ratio on this server */
optional float current_region_cached_ratio = 29;
}

message UserLoad {
Expand Down Expand Up @@ -301,6 +307,11 @@ message ServerLoad {
*/
repeated UserLoad userLoads = 12;

/**
* The metrics for region cached on this region server
*/
map<string, uint32> regionCachedInfo = 13;

/**
* The active monitored tasks
*/
Expand Down
36 changes: 0 additions & 36 deletions hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ private ServerMetrics mockServerMetricsWithReadRequests(ServerName server,
when(rl.getWriteRequestCount()).thenReturn(0L);
when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
when(rl.getRegionSizeMB()).thenReturn(Size.ZERO);
when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f);
regionLoadMap.put(info.getRegionName(), rl);
}
when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -154,8 +156,8 @@ default boolean isMetaBlock(BlockType blockType) {
* made into the cache).
* @param fileName the file that has been completely cached.
*/
default void notifyFileCachingCompleted(String fileName, int totalBlockCount,
int dataBlockCount) {
default void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount,
long size) {
// noop
}

Expand Down Expand Up @@ -225,7 +227,7 @@ default Optional<Integer> getBlockSize(BlockCacheKey key) {
* @return empty optional if this method is not supported, otherwise the returned optional
* contains a map of all files that have been fully cached.
*/
default Optional<Map<String, Boolean>> getFullyCachedFiles() {
default Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
return Optional.empty();
}
}
Loading