Skip to content

HBASE-22887 Fix HFileOutputFormat2 writer roll #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[]
// Map of families to writers and how much has been output on the writer.
private final Map<byte[], WriterLength> writers =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
private final Map<byte[], byte[]> previousRows =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final long now = EnvironmentEdgeManager.currentTime();
private boolean rollRequested = false;

@Override
public void write(ImmutableBytesWritable row, V cell)
Expand Down Expand Up @@ -286,12 +286,9 @@ public void write(ImmutableBytesWritable row, V cell)
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
}

if (wl != null && wl.written + length >= maxsize) {
this.rollRequested = true;
}

// This can only happen once a row is finished though
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
if (wl != null && wl.written + length >= maxsize
&& Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
rollWriters(wl);
}

Expand Down Expand Up @@ -348,7 +345,7 @@ public void write(ImmutableBytesWritable row, V cell)
wl.written += length;

// Copy the row so we know when a row transition.
this.previousRow = rowKey;
this.previousRows.put(family, rowKey);
}

private Path getTableRelativePath(byte[] tableNameBytes) {
Expand All @@ -368,7 +365,6 @@ private void rollWriters(WriterLength writerLength) throws IOException {
closeWriter(wl);
}
}
this.rollRequested = false;
}

private void closeWriter(WriterLength wl) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ public void testWritingPEData() throws Exception {
// Set down this value or we OOME in eclipse.
conf.setInt("mapreduce.task.io.sort.mb", 20);
// Write a few files.
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
long hregionMaxFilesize = 10 * 1024;
conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);

Job job = new Job(conf, "testWritingPEData");
setupRandomGeneratorMapper(job, false);
Expand All @@ -457,6 +458,26 @@ public void testWritingPEData() throws Exception {
assertTrue(job.waitForCompletion(false));
FileStatus [] files = fs.listStatus(testDir);
assertTrue(files.length > 0);

//check output file num and size.
for (byte[] family : FAMILIES) {
long kvCount= 0;
RemoteIterator<LocatedFileStatus> iterator =
fs.listFiles(testDir.suffix("/" + new String(family)), true);
while (iterator.hasNext()) {
LocatedFileStatus keyFileStatus = iterator.next();
HFile.Reader reader =
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
HFileScanner scanner = reader.getScanner(false, false, false);

kvCount += reader.getEntries();
scanner.seekTo();
long perKVSize = scanner.getCell().getSerializedSize();
assertTrue("Data size of each file should not be too large.",
perKVSize * reader.getEntries() <= hregionMaxFilesize);
}
assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
}
}

/**
Expand Down