Skip to content

Commit 7c911fa

Browse files
committed
HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend
1 parent 11f30de commit 7c911fa

File tree

6 files changed

+172
-1
lines changed

6 files changed

+172
-1
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.hadoop.hbase.util.Pair;
6262
import org.apache.hadoop.hbase.wal.WALEdit;
6363
import org.apache.hadoop.hbase.wal.WALKey;
64+
import org.apache.hadoop.hbase.wal.WALKeyImpl;
6465
import org.apache.yetus.audience.InterfaceAudience;
6566
import org.apache.yetus.audience.InterfaceStability;
6667

@@ -1104,4 +1105,18 @@ default DeleteTracker postInstantiateDeleteTracker(
11041105
throws IOException {
11051106
return delTracker;
11061107
}
1108+
1109+
/**
1110+
* Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
1111+
* coprocessors to add extended attributes to the WALKey that then get persisted to the
1112+
* WAL, and are available to replication endpoints to use in processing WAL Entries.
1113+
* @param ctx
1114+
* @param key
1115+
* @return
1116+
* @throws IOException
1117+
*/
1118+
default WALKeyImpl preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKeyImpl key)
1119+
throws IOException {
1120+
return key;
1121+
}
11071122
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7933,7 +7933,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
79337933
/**
79347934
* @return writeEntry associated with this append
79357935
*/
7936-
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
7936+
private WriteEntry doWALAppend( WALEdit walEdit, Durability durability, List<UUID> clusterIds,
79377937
long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
79387938
Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(),
79397939
"WALEdit is null or empty!");
@@ -7952,6 +7952,9 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
79527952
if (walEdit.isReplay()) {
79537953
walKey.setOrigLogSeqNum(origLogSeqNum);
79547954
}
7955+
if (this.coprocessorHost != null) {
7956+
walKey = this.coprocessorHost.preWALAppend(walKey);
7957+
}
79557958
WriteEntry writeEntry = null;
79567959
try {
79577960
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.hadoop.hbase.util.Pair;
8282
import org.apache.hadoop.hbase.wal.WALEdit;
8383
import org.apache.hadoop.hbase.wal.WALKey;
84+
import org.apache.hadoop.hbase.wal.WALKeyImpl;
8485
import org.apache.yetus.audience.InterfaceAudience;
8586
import org.slf4j.Logger;
8687
import org.slf4j.LoggerFactory;
@@ -1720,6 +1721,19 @@ public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
17201721
});
17211722
}
17221723

1724+
public WALKeyImpl preWALAppend(WALKeyImpl key) throws IOException {
1725+
if (this.coprocEnvironments.isEmpty()){
1726+
return key;
1727+
}
1728+
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, WALKeyImpl>(
1729+
regionObserverGetter, key) {
1730+
@Override
1731+
public WALKeyImpl call(RegionObserver observer) throws IOException {
1732+
return observer.preWALAppend(this, key);
1733+
}
1734+
});
1735+
}
1736+
17231737
public Message preEndpointInvocation(final Service service, final String methodName,
17241738
Message request) throws IOException {
17251739
if (coprocEnvironments.isEmpty()) {

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,42 @@ public WALKeyImpl(final byte[] encodedRegionName,
195195
mvcc, null, null);
196196
}
197197

198+
/**
199+
* Copy constructor that takes in an existing WALKeyImpl plus some extended attributes.
200+
* Intended for coprocessors to add annotations to a system-generated WALKey
201+
* for persistence to the WAL.
202+
* @param key
203+
* @param extendedAttributes
204+
*/
205+
public WALKeyImpl(WALKeyImpl key,
206+
Map<String, byte[]> extendedAttributes){
207+
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
208+
key.getWriteTime(), key.getClusterIds(), key.getNonceGroup(), key.getNonce(),
209+
key.getMvcc(), key.getReplicationScopes(), extendedAttributes);
210+
211+
}
212+
213+
/**
214+
* Copy constructor that takes in an existing WALKey, the extra WALKeyImpl fields that the
215+
* parent interface is missing, plus some extended attributes. Intended
216+
* for coprocessors to add annotations to a system-generated WALKey for
217+
* persistence to the WAL.
218+
* @param key
219+
* @param clusterIds
220+
* @param mvcc
221+
* @param replicationScopes
222+
* @param extendedAttributes
223+
*/
224+
public WALKeyImpl(WALKey key,
225+
List<UUID> clusterIds,
226+
MultiVersionConcurrencyControl mvcc,
227+
final NavigableMap<byte[], Integer> replicationScopes,
228+
Map<String, byte[]> extendedAttributes){
229+
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
230+
key.getWriteTime(), clusterIds, key.getNonceGroup(), key.getNonce(),
231+
mvcc, replicationScopes, extendedAttributes);
232+
233+
}
198234
/**
199235
* Create the log key for writing to somewhere.
200236
* We maintain the tablename mainly for debugging purposes.

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.junit.Assert.assertTrue;
2626

2727
import java.io.IOException;
28+
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Optional;
@@ -66,6 +67,7 @@
6667
import org.apache.hadoop.hbase.util.Pair;
6768
import org.apache.hadoop.hbase.wal.WALEdit;
6869
import org.apache.hadoop.hbase.wal.WALKey;
70+
import org.apache.hadoop.hbase.wal.WALKeyImpl;
6971

7072
/**
7173
* A sample region observer that tests the RegionObserver interface.
@@ -124,7 +126,11 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
124126
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
125127
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
126128
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
129+
final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
130+
127131
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
132+
Map<String, byte[]> extendedAttributes = new HashMap<String,byte[]>();
133+
static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
128134

129135
public void setThrowOnPostFlush(Boolean val){
130136
throwOnPostFlush.set(val);
@@ -631,6 +637,16 @@ public StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessor
631637
return reader;
632638
}
633639

640+
@Override
641+
public WALKeyImpl preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
642+
WALKeyImpl key) throws IOException {
643+
ctPreWALAppend.incrementAndGet();
644+
extendedAttributes = new HashMap<>();
645+
extendedAttributes.put(Integer.toString(ctPreWALAppend.get()),
646+
Bytes.toBytes("foo"));
647+
return new WALKeyImpl(key, extendedAttributes);
648+
}
649+
634650
public boolean hadPreGet() {
635651
return ctPreGet.get() > 0;
636652
}
@@ -864,6 +880,8 @@ public int getCtPostWALRestore() {
864880
return ctPostWALRestore.get();
865881
}
866882

883+
public int getCtPreWALAppend() { return ctPreWALAppend.get(); }
884+
867885
public boolean wasStoreFileReaderOpenCalled() {
868886
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
869887
}

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.IOException;
2626
import java.lang.reflect.Method;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.List;
2930
import java.util.Optional;
3031
import org.apache.hadoop.conf.Configuration;
@@ -70,20 +71,26 @@
7071
import org.apache.hadoop.hbase.regionserver.StoreFile;
7172
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
7273
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
74+
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
7375
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
7476
import org.apache.hadoop.hbase.testclassification.MediumTests;
7577
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
7678
import org.apache.hadoop.hbase.util.Bytes;
7779
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
7880
import org.apache.hadoop.hbase.util.JVMClusterUtil;
7981
import org.apache.hadoop.hbase.util.Threads;
82+
import org.apache.hadoop.hbase.wal.WALEdit;
83+
import org.apache.hadoop.hbase.wal.WALKey;
84+
import org.apache.hadoop.hbase.wal.WALKeyImpl;
8085
import org.junit.AfterClass;
86+
import org.junit.Assert;
8187
import org.junit.BeforeClass;
8288
import org.junit.ClassRule;
8389
import org.junit.Rule;
8490
import org.junit.Test;
8591
import org.junit.experimental.categories.Category;
8692
import org.junit.rules.TestName;
93+
import org.mockito.Mockito;
8794
import org.slf4j.Logger;
8895
import org.slf4j.LoggerFactory;
8996

@@ -663,6 +670,67 @@ public void testPreWALRestoreSkip() throws Exception {
663670
table.close();
664671
}
665672

673+
//called from testPreWALAppendIsWrittenToWAL
674+
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
675+
int expectedCalls = 0;
676+
String [] methodArray = new String[1];
677+
methodArray[0] = "getCtPreWALAppend";
678+
Object[] resultArray = new Object[1];
679+
680+
Put p = new Put(ROW);
681+
p.addColumn(A, A, A);
682+
table.put(p);
683+
resultArray[0] = ++expectedCalls;
684+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
685+
686+
Append a = new Append(ROW);
687+
a.addColumn(B, B, B);
688+
table.append(a);
689+
resultArray[0] = ++expectedCalls;
690+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
691+
692+
Increment i = new Increment(ROW);
693+
i.addColumn(C, C, 1);
694+
table.increment(i);
695+
resultArray[0] = ++expectedCalls;
696+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
697+
698+
Delete d = new Delete(ROW);
699+
table.delete(d);
700+
resultArray[0] = ++expectedCalls;
701+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
702+
}
703+
704+
@Test
705+
public void testPreWALAppend() throws Exception {
706+
SimpleRegionObserver sro = new SimpleRegionObserver();
707+
ObserverContext ctx = Mockito.mock(ObserverContext.class);
708+
WALKeyImpl key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE, EnvironmentEdgeManager.currentTime());
709+
WALKeyImpl postCPKey = sro.preWALAppend(ctx, key);
710+
Assert.assertEquals(key.getEncodedRegionName(), postCPKey.getEncodedRegionName());
711+
Assert.assertEquals(key.getTableName(), postCPKey.getTableName());
712+
Assert.assertEquals(key.getOriginatingClusterId(), postCPKey.getOriginatingClusterId());
713+
Assert.assertEquals(key.getClusterIds(), postCPKey.getClusterIds());
714+
Assert.assertEquals(1, postCPKey.getExtendedAttributes().size());
715+
Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
716+
postCPKey.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
717+
}
718+
719+
@Test
720+
public void testPreWALAppendIsWrittenToWAL() throws Exception {
721+
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + "." + name.getMethodName());
722+
Table table = util.createTable(tableName, new byte[][] { A, B, C });
723+
724+
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
725+
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
726+
//should be only one region
727+
HRegion region = regions.get(0);
728+
region.getWAL().registerWALActionsListener(listener);
729+
testPreWALAppendHook(table, tableName);
730+
boolean[] expectedResults = {true, true, true, true};
731+
Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
732+
733+
}
666734
// check each region whether the coprocessor upcalls are called or not.
667735
private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
668736
Object value[]) throws IOException {
@@ -711,4 +779,21 @@ private static void createHFile(Configuration conf, FileSystem fs, Path path, by
711779
writer.close();
712780
}
713781
}
782+
783+
private static class PreWALAppendWALActionsListener implements WALActionsListener {
784+
boolean[] walKeysCorrect = {false, false, false, false};
785+
786+
@Override
787+
public void postAppend(long entryLen, long elapsedTimeMillis,
788+
WALKey logKey, WALEdit logEdit) throws IOException {
789+
for (int k = 0; k < 4; k++) {
790+
if (!walKeysCorrect[k]) {
791+
walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
792+
logKey.getExtendedAttribute(Integer.toString(k + 1)));
793+
}
794+
}
795+
}
796+
797+
public boolean[] getWalKeysCorrectArray() { return walKeysCorrect; }
798+
}
714799
}

0 commit comments

Comments
 (0)