Skip to content

HBASE-26122: Implement an optional maximum size for Gets, after which a partial result is returned #3532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class Get extends Query implements Row {
private boolean checkExistenceOnly = false;
private boolean closestRowBefore = false;
private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private long maxResultSize = -1;

/**
* Create a Get operation for the specified row.
Expand Down Expand Up @@ -339,6 +340,21 @@ public Get setFilter(Filter filter) {
return this;
}

/**
* Set the maximum result size. The default is -1; this means that no specific
* maximum result size will be set for this Get.
*
* If set to a value greater than zero, the server may respond with a Result where
* {@link Result#mayHaveMoreCellsInRow()} is true. The user is required to handle
* this case.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you do when mayHaveMoreCellsInRow is true @bbeaudreault ? How do you use this boolean in prod (if you don't mind me asking...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At HubSpot we have a wrapper implementation of Table which all downstream users go through. This wrapper table enforces that setMaxResultSize is set to a standard value that we've deemed safe. If a result comes back and mayHaveMoreCellsInRow is true, we throw an exception. If a team gets such an exception they can request a temporary allowance which disables the check. In the meantime they are expected to add a filter to paginate so they don't hit the max limit.

This is a little draconian, but we used to have lots of OOM issues due to large gets/puts/scans. Another possible solution is to iterate with PageFilter, like I did in testGetPartialResults. We planned to do something like that eventually, but in the end we had rolled this out in such a way that the number of exceptions were so few that we never did the work.

Would you be open to an automatic stitching in the future, like we do with Scans? I can't do that now, but might be a reasonable followup jira.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of that last sentence, maybe it's better to not support stitching for Gets. Instead people should rewrite these large Gets as Scans or add filters like above. Stitching obviously increases the latency and that could be very misleading for Gets. Multigets even worse (and harder to implement)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want a Cell Streaming API.

*
* @param maxResultSize The maximum result size in bytes
*/
public Get setMaxResultSize(long maxResultSize) {
this.maxResultSize = maxResultSize;
return this;
}

/* Accessors */

/**
Expand Down Expand Up @@ -458,6 +474,13 @@ public Map<String, Object> getFingerprint() {
return map;
}

/**
* @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
*/
public long getMaxResultSize() {
return maxResultSize;
}

/**
* Compile the details beyond the scope of getFingerprint (row, columns,
* timestamps, etc.) into a Map along with the fingerprinted information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ public static Result toResult(final ClientProtos.Result proto, final CellScanner

return (cells == null || cells.isEmpty())
? (proto.getStale() ? EMPTY_RESULT_STALE : EMPTY_RESULT)
: Result.create(cells, null, proto.getStale());
: Result.create(cells, null, proto.getStale(), proto.getPartial());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,9 @@ public static Get toGet(final ClientProtos.Get proto) throws IOException {
if (proto.hasLoadColumnFamiliesOnDemand()) {
get.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand());
}
if (proto.hasMaxResultSize()) {
get.setMaxResultSize(proto.getMaxResultSize());
}
return get;
}

Expand Down Expand Up @@ -1256,6 +1259,9 @@ public static ClientProtos.Get toGet(
if (loadColumnFamiliesOnDemand != null) {
builder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
}
if (get.getMaxResultSize() > 0) {
builder.setMaxResultSize(get.getMaxResultSize());
}
return builder.build();
}

Expand Down Expand Up @@ -1457,6 +1463,7 @@ public static ClientProtos.Result toResultNoData(final Result result) {
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(size);
builder.setStale(result.isStale());
builder.setPartial(result.mayHaveMoreCellsInRow());
return builder.build();
}

Expand Down Expand Up @@ -1547,7 +1554,7 @@ public static Result toResult(final ClientProtos.Result proto, final CellScanner

return (cells == null || cells.isEmpty())
? (proto.getStale() ? EMPTY_RESULT_STALE : EMPTY_RESULT)
: Result.create(cells, null, proto.getStale());
: Result.create(cells, null, proto.getStale(), proto.getPartial());
}


Expand Down
2 changes: 2 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ message Get {
optional Consistency consistency = 12 [default = STRONG];
repeated ColumnFamilyTimeRange cf_time_range = 13;
optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */

optional uint64 max_result_size = 15;
}

message Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
Expand Down Expand Up @@ -3864,8 +3865,7 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
Result result;
if (returnResults) {
// convert duplicate increment/append to get
List<Cell> results = region.get(toGet(mutation), false, nonceGroup, nonce);
result = Result.create(results);
result = region.get(toGet(mutation), false, nonceGroup, nonce);
} else {
result = Result.EMPTY_RESULT;
}
Expand Down Expand Up @@ -7497,9 +7497,7 @@ public static boolean rowIsInRange(RegionInfo info, final byte [] row, final int
@Override
public Result get(final Get get) throws IOException {
prepareGet(get);
List<Cell> results = get(get, true);
boolean stale = this.getRegionInfo().getReplicaId() != 0;
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
return get(get, true, HConstants.NO_NONCE, HConstants.NO_NONCE);
}

void prepareGet(final Get get) throws IOException {
Expand All @@ -7518,11 +7516,31 @@ void prepareGet(final Get get) throws IOException {

@Override
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
return getInternal(get, null, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
}

private List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
throws IOException {
private Result get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
throws IOException {
ScannerContext scannerContext = get.getMaxResultSize() > 0
? ScannerContext.newBuilder()
.setSizeLimit(LimitScope.BETWEEN_CELLS, get.getMaxResultSize(), get.getMaxResultSize())
.build()
: null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.


List<Cell> result = getInternal(get, scannerContext, withCoprocessor, nonceGroup, nonce);
boolean stale = this.getRegionInfo().getReplicaId() != 0;
boolean mayHaveMoreCellsInRow =
scannerContext != null && scannerContext.mayHaveMoreCellsInRow();

return Result.create(
result,
get.isCheckExistenceOnly() ? !result.isEmpty() : null,
stale,
mayHaveMoreCellsInRow);
}

private List<Cell> getInternal(Get get, ScannerContext scannerContext, boolean withCoprocessor,
long nonceGroup, long nonce) throws IOException {
List<Cell> results = new ArrayList<>();
long before = EnvironmentEdgeManager.currentTime();

Expand All @@ -7539,7 +7557,7 @@ private List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long n
}
try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) {
List<Cell> tmp = new ArrayList<>();
scanner.next(tmp);
scanner.next(tmp, scannerContext);
// Copy EC to heap, then close the scanner.
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers.
// See more details in HBASE-26036.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2651,10 +2651,15 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
}

ScannerContext scannerContext = ScannerContext.newBuilder()
.setSizeLimit(LimitScope.BETWEEN_CELLS, get.getMaxResultSize(), get.getMaxResultSize())
.build();

RegionScannerImpl scanner = null;
try {
scanner = region.getScanner(scan);
scanner.next(results);
scanner.next(results, scannerContext);
} finally {
if (scanner != null) {
if (closeCallBack == null) {
Expand All @@ -2679,7 +2684,8 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal
}
region.metricsUpdateForGet(results, before);

return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale,
scannerContext.mayHaveMoreCellsInRow());
}

private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,26 @@
import java.util.Set;
import org.apache.hadoop.hbase.client.ClientScanner;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterListWithAND;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
Expand Down Expand Up @@ -136,6 +144,46 @@ public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testGetPartialResults() throws Exception {
byte[] row = ROWS[0];

Result result;
int cf = 0;
int qf = 0;
int total = 0;

do {
// this will ensure we always return only 1 result
Get get = new Get(row)
.setMaxResultSize(1);

// we want to page through the entire row, this will ensure we always get the next
if (total > 0) {
get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
new ColumnRangeFilter(QUALIFIERS[qf], true, null, false),
new FamilyFilter(CompareOperator.GREATER_OR_EQUAL, new BinaryComparator(FAMILIES[cf]))));
}

// all values are the same, but there should be a value
result = TABLE.get(get);
assertTrue(String.format("Value for family %s (# %s) and qualifier %s (# %s)",
Bytes.toStringBinary(FAMILIES[cf]), cf, Bytes.toStringBinary(QUALIFIERS[qf]), qf),
Bytes.equals(VALUE, result.getValue(FAMILIES[cf], QUALIFIERS[qf])));

total++;
if (++qf >= NUM_QUALIFIERS) {
cf++;
qf = 0;
}
} while (result.mayHaveMoreCellsInRow());

// ensure we iterated all cells in row
assertEquals(NUM_COLS, total);
assertEquals(NUM_FAMILIES, cf);
assertEquals(0, qf);
}

/**
* Ensure that the expected key values appear in a result returned from a scanner that is
* combining partial results into complete results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7861,4 +7861,89 @@ public void run() {
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
}

@Test
public void testOversizedGetsReturnPartialResult() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2)
.setMaxResultSize(1); // 0 doesn't count as a limit, according to HBase

Result r = region.get(get);

assertTrue("Expected partial result, but result was not marked as partial", r.mayHaveMoreCellsInRow());
}

@Test
public void testGetsWithoutResultSizeLimitAreNotPartial() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2);

Result r = region.get(get);

assertFalse("Expected full result, but it was marked as partial", r.mayHaveMoreCellsInRow());
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
assertTrue(Bytes.equals(value2, r.getValue(fam1, qual2)));
}

@Test
public void testGetsWithinResultSizeLimitAreNotPartial() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2)
.setMaxResultSize(Long.MAX_VALUE);

Result r = region.get(get);

assertFalse("Expected full result, but it was marked as partial", r.mayHaveMoreCellsInRow());
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
assertTrue(Bytes.equals(value2, r.getValue(fam1, qual2)));
}

@Test
public void testGetsWithResultSizeLimitReturnPartialResults() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);

Put p = new Put(row)
.addColumn(fam1, qual1, value1)
.addColumn(fam1, qual2, value2);

region.put(p);

Get get = new Get(row)
.addColumn(fam1, qual1)
.addColumn(fam1, qual2)
.setMaxResultSize(10);

Result r = region.get(get);

assertTrue("Expected partial result, but it was marked as complete", r.mayHaveMoreCellsInRow());
assertTrue(Bytes.equals(value1, r.getValue(fam1, qual1)));
assertEquals("Got more results than expected", 1, r.size());
}

}