Skip to content

HBASE-26273 Force ReadType.STREAM when the user does not explicitly s… #3675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
Expand Down Expand Up @@ -128,6 +129,14 @@ public class TableSnapshotInputFormatImpl {

public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;

/**
* The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot,
* default STREAM.
*/
public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE =
"hbase.TableSnapshotInputFormat.scanner.readtype";
public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM;

/**
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
Expand Down Expand Up @@ -382,6 +391,15 @@ public static Scan extractScanFromConf(Configuration conf) throws IOException {
} else {
throw new IllegalArgumentException("Unable to create scan");
}

if (scan.getReadType() == ReadType.DEFAULT) {
LOG.info("Provided Scan has DEFAULT ReadType,"
+ " updating STREAM for Snapshot-based InputFormat");
// Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case.
scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE,
SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT));
}

return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -39,6 +41,7 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
Expand Down Expand Up @@ -407,6 +410,36 @@ public void testNoDuplicateResultsWhenSplitting() throws Exception {
}
}

@Test
public void testScannerReadTypeConfiguration() throws IOException {
Configuration conf = new Configuration(false);
// Explicitly set ReadTypes should persist
for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
Scan scanWithReadType = new Scan();
scanWithReadType.setReadType(readType);
assertEquals(scanWithReadType.getReadType(),
serializeAndReturn(conf, scanWithReadType).getReadType());
}
// We should only see the DEFAULT ReadType getting updated to STREAM.
Scan scanWithoutReadType = new Scan();
assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());

// We should still be able to force a certain ReadType when DEFAULT is given.
conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
}

/**
* Serializes and deserializes the given scan in the same manner that
* TableSnapshotInputFormat does.
*/
private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
}

private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
byte[] startRow, byte[] stopRow)
throws IOException, InterruptedException {
Expand Down