Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
Expand Down Expand Up @@ -277,6 +278,15 @@ default void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
* {@link InternalScanner} with a custom implementation that is returned from this method. The
* custom scanner can then inspect {@link org.apache.hadoop.hbase.Cell}s from the wrapped scanner,
* applying its own policy to what gets written.
* <p>
* If implementations are wrapping the passed in {@link InternalScanner}, they can also have their
* implementation implement {@link Shipper} and delegate to the original scanner. This will cause
* compactions to free up memory as they progress, which is especially important for people using
* off-heap memory pools.
* <p>
* Keep in mind that when {@link Shipper#shipped()} is called, any cell references you maintain in
* your implementation may get corrupted. As such you should make sure to deep clone any cells
* that you need to keep reference to across invocations of shipped.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanner the scanner over existing data used in the store file rewriting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;

/**
* This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests
* to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch
* {@link #shipped()} will get called.
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public interface Shipper {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
Expand Down Expand Up @@ -433,7 +433,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();

throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
long shippedCallSizeLimit =
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
try {
Expand Down Expand Up @@ -473,7 +473,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
return false;
}
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
// ShipperListener will do a clone of the last cells it refer, so need to set back
Expand All @@ -489,7 +489,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
// we are doing the similar thing. In between the compaction (after every N cells
// written with collective size of 'shippedCallSizeLimit') we will call shipped which
// may clear prevBlocks list.
kvs.shipped();
shipper.shipped();
bytesWrittenProgressForShippedCall = 0;
}
if (lastCleanCell != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import static org.hamcrest.MatcherAssert.assertThat;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({ MediumTests.class, CoprocessorTests.class })
public class TestCompactionWithShippingCoprocessor {

private static final AtomicInteger SHIPPED_COUNT = new AtomicInteger();

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionWithShippingCoprocessor.class);

protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final byte[] FAMILY = Bytes.toBytes("testFamily");

@Rule
public TestName name = new TestName();

@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
TEST_UTIL.startMiniCluster(1);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

/**
* Verifies that if a coproc returns an InternalScanner which implements Shipper, the shippped
* method is appropriately called in Compactor.
*/
@Test
public void testCoprocScannersExtendingShipperGetShipped() throws Exception {
int shippedCountBefore = SHIPPED_COUNT.get();
final TableName tableName = TableName.valueOf(name.getMethodName());
// Create a table with block size as 1024
final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024,
CompactionObserver.class.getName());
TEST_UTIL.loadTable(table, FAMILY);
TEST_UTIL.flush();
try {
// get the block cache and region
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
// trigger a major compaction
TEST_UTIL.compact(true);
assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore));
} finally {
table.close();
}
}

public static class CompactionObserver implements RegionCoprocessor, RegionObserver {

@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}

@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
return new ShippedObservingScanner(scanner);
}
}

public static class ShippedObservingScanner implements InternalScanner, Shipper {

protected final InternalScanner scanner;

public ShippedObservingScanner(InternalScanner scanner) {
this.scanner = scanner;
}

@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return scanner.next(result, scannerContext);
}

@Override
public void close() throws IOException {
scanner.close();
}

@Override
public void shipped() throws IOException {
if (scanner instanceof Shipper) {
SHIPPED_COUNT.incrementAndGet();
((Shipper) scanner).shipped();
}
}
}
}