Skip to content

HBASE-22622 - WALKey Extended Attributes #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions hbase-protocol-shaded/src/main/protobuf/WAL.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,20 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11;
repeated Attribute extended_attributes = 12;

/*
optional CustomEntryType custom_entry_type = 9;
/*
optional CustomEntryType custom_entry_type = 9;

enum CustomEntryType {
COMPACTION = 0;
}
*/
enum CustomEntryType {
COMPACTION = 0;
}
*/
}

message Attribute {
required string key = 1;
required bytes value = 2;
}

enum ScopeType {
Expand Down
6 changes: 5 additions & 1 deletion hbase-protocol/src/main/protobuf/WAL.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11;

repeated Attribute extended_attributes = 12;
/*
optional CustomEntryType custom_entry_type = 9;

Expand All @@ -71,6 +71,10 @@ message WALKey {
}
*/
}
message Attribute {
required string key = 1;
required bytes value = 2;
}

enum ScopeType {
REPLICATION_SCOPE_LOCAL = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.NavigableMap;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -71,7 +72,7 @@ public static WALKeyImpl writeCompactionMarker(WAL wal,
MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
Expand All @@ -87,7 +88,7 @@ public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer>
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
Expand All @@ -103,7 +104,7 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal,
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createRegionEventWALEdit(hri, r), mvcc);
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
Expand All @@ -125,19 +126,23 @@ public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
return walKey;
}

private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
final NavigableMap<byte[], Integer> replicationScope,
final RegionInfo hri,
final WALEdit edit,
final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true);
return doFullAppendTransaction(wal, replicationScope, hri,
edit, mvcc, extendedAttributes, true);
}

/**
Expand All @@ -150,11 +155,12 @@ private static WALKeyImpl writeMarker(final WAL wal,
*/
public static WALKeyImpl doFullAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final boolean sync)
throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
System.currentTimeMillis(), mvcc, replicationScope);
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.append(hri, walKey, edit, false);
Expand Down
21 changes: 21 additions & 0 deletions hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ default long getNonce() {
*/
long getOrigLogSeqNum();

/**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor
* @param attributeKey The key of a key / value pair
*/
default byte[] getExtendedAttribute(String attributeKey){
return null;
}

/**
* Returns a map of all extended attributes injected into this WAL key.
*/
default Map<String, byte[]> getExtendedAttributes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing setters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurtell - WALKey's interface comments specifically say that setters aren't permitted. They're meant to be immutable, so the extended attributes will be set during construction of the WALKeyImpl

return new HashMap<>();
}
/**
* Produces a string map for this key. Useful for programmatic use and
* manipulation of the data stored in an WALKeyImpl, for example, printing
Expand All @@ -98,6 +113,12 @@ default Map<String, Object> toStringMap() {
stringMap.put("table", getTableName());
stringMap.put("region", Bytes.toStringBinary(getEncodedRegionName()));
stringMap.put("sequence", getSequenceId());
Map<String, byte[]> extendedAttributes = getExtendedAttributes();
if (extendedAttributes != null){
for (Map.Entry<String, byte[]> entry : extendedAttributes.entrySet()){
stringMap.put(entry.getKey(), Bytes.toStringBinary(entry.getValue()));
}
}
return stringMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;

import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -116,14 +118,16 @@ public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry)
*/
private MultiVersionConcurrencyControl.WriteEntry writeEntry;

private Map<String, byte[]> extendedAttributes;

public WALKeyImpl() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null, null);
}

public WALKeyImpl(final NavigableMap<byte[], Integer> replicationScope) {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope, null);
}

@VisibleForTesting
Expand All @@ -132,7 +136,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, null);
HConstants.NO_NONCE, null, null, null);
}

@VisibleForTesting
Expand All @@ -141,7 +145,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, null);
HConstants.NO_NONCE, mvcc, null, null);
}

// TODO: Fix being able to pass in sequenceid.
Expand All @@ -153,20 +157,28 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, fin
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
null, null);
null, null, null);
}

// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, replicationScope);
HConstants.NO_NONCE, null, replicationScope, null);
}

public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope);
HConstants.NO_NONCE, mvcc, replicationScope, null);
}

public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope, extendedAttributes);
}

public WALKeyImpl(final byte[] encodedRegionName,
Expand All @@ -180,7 +192,7 @@ public WALKeyImpl(final byte[] encodedRegionName,
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
mvcc, null);
mvcc, null, null);
}

/**
Expand All @@ -206,7 +218,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, lon
final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
replicationScope, null);
}

/**
Expand All @@ -231,7 +243,8 @@ public WALKeyImpl(final byte[] encodedRegionName,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup,
nonce, mvcc, null, null);
}

/**
Expand All @@ -252,7 +265,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
null);
null, null);
}

/**
Expand All @@ -275,7 +288,7 @@ public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
replicationScope, null);
}

/**
Expand Down Expand Up @@ -304,7 +317,22 @@ public WALKeyImpl(final byte[] encodedRegionName,
EMPTY_UUIDS,
nonceGroup,
nonce,
mvcc, null);
mvcc, null, null);
}

public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes){
init(encodedRegionName,
tablename,
NO_SEQUENCE_ID,
now,
clusterIds,
nonceGroup,
nonce,
mvcc, replicationScope, extendedAttributes);
}

@InterfaceAudience.Private
Expand All @@ -316,7 +344,8 @@ protected void init(final byte[] encodedRegionName,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
NavigableMap<byte[], Integer> replicationScope,
Map<String, byte[]> extendedAttributes) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
Expand All @@ -329,6 +358,7 @@ protected void init(final byte[] encodedRegionName,
setSequenceId(logSeqNum);
}
this.replicationScope = replicationScope;
this.extendedAttributes = extendedAttributes;
}

// For deserialization. DO NOT USE. See setWriteEntry below.
Expand Down Expand Up @@ -434,6 +464,17 @@ public UUID getOriginatingClusterId(){
return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0);
}

@Override
public byte[] getExtendedAttribute(String attributeKey){
return extendedAttributes.get(attributeKey);
}

@Override
public Map<String, byte[]> getExtendedAttributes(){
return extendedAttributes != null ? new HashMap<String, byte[]>(extendedAttributes) :
new HashMap<String, byte[]>();
}

@Override
public String toString() {
return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId;
Expand Down Expand Up @@ -539,6 +580,14 @@ public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor com
.setScopeType(ScopeType.forNumber(e.getValue())));
}
}
if (extendedAttributes != null){
for (Map.Entry<String, byte[]> e : extendedAttributes.entrySet()){
WALProtos.Attribute attr = WALProtos.Attribute.newBuilder().
setKey(e.getKey()).setValue(compressor.compress(e.getValue(),
CompressionContext.DictionaryIndex.TABLE)).build();
builder.addExtendedAttributes(attr);
}
}
return builder;
}

Expand Down Expand Up @@ -573,6 +622,12 @@ public void readFieldsFromPb(WALProtos.WALKey walKey,
if (walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
if (walKey.getExtendedAttributesCount() > 0){
this.extendedAttributes = new HashMap<>(walKey.getExtendedAttributesCount());
for (WALProtos.Attribute attr : walKey.getExtendedAttributesList()){
extendedAttributes.put(attr.getKey(), attr.getValue().toByteArray());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed while backporting. Attribute values may have been compressed and need to be decompressed. Will fix up for commit.

}
}
}

@Override
Expand Down
Loading