2121import static org .apache .hadoop .hbase .regionserver .HStoreFile .BULKLOAD_TIME_KEY ;
2222import static org .apache .hadoop .hbase .regionserver .HStoreFile .EXCLUDE_FROM_MINOR_COMPACTION_KEY ;
2323import static org .apache .hadoop .hbase .regionserver .HStoreFile .MAJOR_COMPACTION_KEY ;
24+ import static org .apache .hadoop .hbase .regionserver .HStoreFile .MAX_SEQ_ID_KEY ;
2425
2526import java .io .IOException ;
2627import java .io .UnsupportedEncodingException ;
@@ -209,6 +210,13 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
209210 public static final String REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY =
210211 REMOTE_CLUSTER_CONF_PREFIX + HConstants .ZOOKEEPER_ZNODE_PARENT ;
211212
213+ /**
214+ * Set the MAX_SEQ_ID metadata on the resulting HFile. This will ensure the HFiles will be sorted
215+ * properly when read by tools such as the ClientSideRegionScanner. Will have no effect if the
216+ * HFile is bulkloaded, as the sequence ID generated when bulkloading will override this metadata.
217+ */
218+ public static final String SET_MAX_SEQ_ID_KEY = "hbase.hfileoutputformat.set.max.seq.id" ;
219+
212220 public static final String STORAGE_POLICY_PROPERTY = HStore .BLOCK_STORAGE_POLICY_KEY ;
213221 public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "." ;
214222
@@ -270,7 +278,7 @@ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWrit
270278
271279 return new RecordWriter <ImmutableBytesWritable , V >() {
272280 // Map of families to writers and how much has been output on the writer.
273- private final Map <byte [], WriterLength > writers = new TreeMap <>(Bytes .BYTES_COMPARATOR );
281+ private final Map <byte [], WriterInfo > writers = new TreeMap <>(Bytes .BYTES_COMPARATOR );
274282 private final Map <byte [], byte []> previousRows = new TreeMap <>(Bytes .BYTES_COMPARATOR );
275283 private final long now = EnvironmentEdgeManager .currentTime ();
276284 private byte [] tableNameBytes = writeMultipleTables ? null : Bytes .toBytes (writeTableNames );
@@ -300,10 +308,10 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
300308 }
301309 byte [] tableAndFamily = getTableNameSuffixedWithFamily (tableNameBytes , family );
302310
303- WriterLength wl = this .writers .get (tableAndFamily );
311+ WriterInfo wi = this .writers .get (tableAndFamily );
304312
305313 // If this is a new column family, verify that the directory exists
306- if (wl == null ) {
314+ if (wi == null ) {
307315 Path writerPath = null ;
308316 if (writeMultipleTables ) {
309317 Path tableRelPath = getTableRelativePath (tableNameBytes );
@@ -317,14 +325,14 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
317325
318326 // This can only happen once a row is finished though
319327 if (
320- wl != null && wl .written + length >= maxsize
328+ wi != null && wi .written + length >= maxsize
321329 && Bytes .compareTo (this .previousRows .get (family ), rowKey ) != 0
322330 ) {
323- rollWriters (wl );
331+ rollWriters (wi );
324332 }
325333
326334 // create a new WAL writer, if necessary
327- if (wl == null || wl .writer == null ) {
335+ if (wi == null || wi .writer == null ) {
328336 InetSocketAddress [] favoredNodes = null ;
329337 if (conf .getBoolean (LOCALITY_SENSITIVE_CONF_KEY , DEFAULT_LOCALITY_SENSITIVE )) {
330338 HRegionLocation loc = null ;
@@ -355,14 +363,15 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
355363 }
356364 }
357365 }
358- wl = getNewWriter (tableNameBytes , family , conf , favoredNodes );
366+ wi = getNewWriter (tableNameBytes , family , conf , favoredNodes );
359367
360368 }
361369
362370 // we now have the proper WAL writer. full steam ahead
363371 PrivateCellUtil .updateLatestStamp (cell , this .now );
364- wl .writer .append (kv );
365- wl .written += length ;
372+ wi .writer .append (kv );
373+ wi .written += length ;
374+ wi .maxSequenceId = Math .max (kv .getSequenceId (), wi .maxSequenceId );
366375
367376 // Copy the row so we know when a row transition.
368377 this .previousRows .put (family , rowKey );
@@ -378,24 +387,25 @@ private Path getTableRelativePath(byte[] tableNameBytes) {
378387 return tableRelPath ;
379388 }
380389
381- private void rollWriters (WriterLength writerLength ) throws IOException {
382- if (writerLength != null ) {
383- closeWriter (writerLength );
390+ private void rollWriters (WriterInfo writerInfo ) throws IOException {
391+ if (writerInfo != null ) {
392+ closeWriter (writerInfo );
384393 } else {
385- for (WriterLength wl : this .writers .values ()) {
386- closeWriter (wl );
394+ for (WriterInfo wi : this .writers .values ()) {
395+ closeWriter (wi );
387396 }
388397 }
389398 }
390399
391- private void closeWriter (WriterLength wl ) throws IOException {
392- if (wl .writer != null ) {
400+ private void closeWriter (WriterInfo wi ) throws IOException {
401+ if (wi .writer != null ) {
393402 LOG .info (
394- "Writer=" + wl .writer .getPath () + ((wl .written == 0 ) ? "" : ", wrote=" + wl .written ));
395- close (wl .writer );
396- wl .writer = null ;
403+ "Writer=" + wi .writer .getPath () + ((wi .written == 0 ) ? "" : ", wrote=" + wi .written ));
404+ close (wi .writer , wi );
405+ wi .writer = null ;
397406 }
398- wl .written = 0 ;
407+ wi .written = 0 ;
408+ wi .maxSequenceId = -1 ;
399409 }
400410
401411 private Configuration createRemoteClusterConf (Configuration conf ) {
@@ -435,19 +445,19 @@ private Configuration createRemoteClusterConf(Configuration conf) {
435445
436446 /*
437447 * Create a new StoreFile.Writer.
438- * @return A WriterLength , containing a new StoreFile.Writer.
448+ * @return A WriterInfo , containing a new StoreFile.Writer.
439449 */
440450 @ edu .umd .cs .findbugs .annotations .SuppressWarnings (value = "BX_UNBOXING_IMMEDIATELY_REBOXED" ,
441451 justification = "Not important" )
442- private WriterLength getNewWriter (byte [] tableName , byte [] family , Configuration conf ,
452+ private WriterInfo getNewWriter (byte [] tableName , byte [] family , Configuration conf ,
443453 InetSocketAddress [] favoredNodes ) throws IOException {
444454 byte [] tableAndFamily = getTableNameSuffixedWithFamily (tableName , family );
445455 Path familydir = new Path (outputDir , Bytes .toString (family ));
446456 if (writeMultipleTables ) {
447457 familydir =
448458 new Path (outputDir , new Path (getTableRelativePath (tableName ), Bytes .toString (family )));
449459 }
450- WriterLength wl = new WriterLength ();
460+ WriterInfo wi = new WriterInfo ();
451461 Algorithm compression = overriddenCompression ;
452462 compression = compression == null ? compressionMap .get (tableAndFamily ) : compression ;
453463 compression = compression == null ? defaultCompression : compression ;
@@ -474,23 +484,26 @@ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
474484
475485 HFileContext hFileContext = contextBuilder .build ();
476486 if (null == favoredNodes ) {
477- wl .writer =
487+ wi .writer =
478488 new StoreFileWriter .Builder (conf , CacheConfig .DISABLED , fs ).withOutputDir (familydir )
479489 .withBloomType (bloomType ).withFileContext (hFileContext ).build ();
480490 } else {
481- wl .writer = new StoreFileWriter .Builder (conf , CacheConfig .DISABLED , new HFileSystem (fs ))
491+ wi .writer = new StoreFileWriter .Builder (conf , CacheConfig .DISABLED , new HFileSystem (fs ))
482492 .withOutputDir (familydir ).withBloomType (bloomType ).withFileContext (hFileContext )
483493 .withFavoredNodes (favoredNodes ).build ();
484494 }
485495
486- this .writers .put (tableAndFamily , wl );
487- return wl ;
496+ this .writers .put (tableAndFamily , wi );
497+ return wi ;
488498 }
489499
490- private void close (final StoreFileWriter w ) throws IOException {
500+ private void close (final StoreFileWriter w , final WriterInfo wl ) throws IOException {
491501 if (w != null ) {
492502 w .appendFileInfo (BULKLOAD_TIME_KEY , Bytes .toBytes (EnvironmentEdgeManager .currentTime ()));
493503 w .appendFileInfo (BULKLOAD_TASK_KEY , Bytes .toBytes (context .getTaskAttemptID ().toString ()));
504+ if (conf .getBoolean (SET_MAX_SEQ_ID_KEY , false ) && wl .maxSequenceId >= 0 ) {
505+ w .appendFileInfo (MAX_SEQ_ID_KEY , Bytes .toBytes (wl .maxSequenceId ));
506+ }
494507 w .appendFileInfo (MAJOR_COMPACTION_KEY , Bytes .toBytes (true ));
495508 w .appendFileInfo (EXCLUDE_FROM_MINOR_COMPACTION_KEY , Bytes .toBytes (compactionExclude ));
496509 w .appendTrackedTimestampsToMetadata ();
@@ -500,8 +513,8 @@ private void close(final StoreFileWriter w) throws IOException {
500513
501514 @ Override
502515 public void close (TaskAttemptContext c ) throws IOException , InterruptedException {
503- for (WriterLength wl : this .writers .values ()) {
504- close (wl .writer );
516+ for (WriterInfo wi : this .writers .values ()) {
517+ close (wi .writer , wi );
505518 }
506519 }
507520 };
@@ -524,8 +537,9 @@ static void configureStoragePolicy(final Configuration conf, final FileSystem fs
524537 /*
525538 * Data structure to hold a Writer and amount of data written on it.
526539 */
527- static class WriterLength {
540+ static class WriterInfo {
528541 long written = 0 ;
542+ long maxSequenceId = -1 ;
529543 StoreFileWriter writer = null ;
530544 }
531545
0 commit comments