Skip to content

HBASE-8458 Support for batch version of checkAndPut() and checkAndDelete() #594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* Used to perform CheckAndRowMutate operations on a single row.
*/
@InterfaceAudience.Public
public class CheckAndRowMutate implements Row {
private final byte[] row;
private final byte[] family;
private byte[] qualifier;
private TimeRange timeRange = null;
private CompareOperator op;
private byte[] value;
private Mutation mutation = null;

/**
* Create a CheckAndRowMutate operation for the specified row.
*
* @param row row key
* @param family family
*/
public CheckAndRowMutate(byte[] row, byte[] family) {
this.row = Bytes.copy(Mutation.checkRow(row));
this.family = Preconditions.checkNotNull(family, "family is null");
}

/**
* Create a CheckAndRowMutate operation for the specified row,
* and an existing row lock.
*
* @param row row key
* @param family family
* @param qualifier qualifier
* @param value value
* @param mutation mutation
*/
public CheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
byte[] value, Mutation mutation) {
this.row = Bytes.copy(Mutation.checkRow(row));
this.family = Preconditions.checkNotNull(family, "family is null");
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null");
this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
this.value = Preconditions.checkNotNull(value, "value is null");
this.mutation = mutation;
}

public CheckAndRowMutate qualifier(byte[] qualifier) {
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
+ " an empty byte array, or just do not call this method if you want a null qualifier");
return this;
}

public CheckAndRowMutate timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}

public CheckAndRowMutate ifNotExists() {
this.op = CompareOperator.EQUAL;
this.value = null;
return this;
}

public CheckAndRowMutate ifMatches(CompareOperator compareOp, byte[] value) {
this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
this.value = Preconditions.checkNotNull(value, "value is null");
return this;
}

public CheckAndRowMutate addMutation(Mutation mutation) throws IOException {
this.mutation = mutation;
return this;
}

/**
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
* Use {@link Row#COMPARATOR} instead
*/
@Deprecated
@Override
public int compareTo(Row i) {
return Bytes.compareTo(this.getRow(), i.getRow());
}

/**
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
* No replacement
*/
@Deprecated
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof CheckAndRowMutate) {
CheckAndRowMutate other = (CheckAndRowMutate)obj;
return compareTo(other) == 0;
}
return false;
}

/**
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
* No replacement
*/
@Deprecated
@Override
public int hashCode(){
return Arrays.hashCode(row);
}

/**
* Method for retrieving the delete's row
*
* @return row
*/
@Override
public byte[] getRow() {
return this.row;
}

public byte[] getFamily() {
return family;
}

public byte[] getQualifier() {
return qualifier;
}

public TimeRange getTimeRange() {
return timeRange;
}

public CompareOperator getOp() {
return op;
}

public byte[] getValue() {
return value;
}

public Mutation getMutation() {
return mutation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,25 @@ public <R extends Message> void batchCoprocessorService(
}
}

@Override
public Boolean [] checkAndRowMutate(final List<CheckAndRowMutate> checkAndRowMutates)
throws IOException {
try {
Object[] r1 = new Object[checkAndRowMutates.size()];
batch((List<? extends Row>)checkAndRowMutates, r1, readRpcTimeoutMs);
// Translate.
Boolean [] results = new Boolean[r1.length];
int i = 0;
for (Object obj: r1) {
// Batch ensures if there is a failure we get an exception instead
results[i++] = ((Result)obj).getExists();
}
return results;
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}

@Override
public RegionLocator getRegionLocator() {
return this.locator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
return checkAndMutate(row, family, qualifier, op, value, mutations);
}

default Boolean[] checkAndRowMutate(final List<CheckAndRowMutate> checkAndRowMutates)
throws IOException {
throw new NotImplementedException("Add an implementation!");
}

/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the Put/Delete/RowMutations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndRowMutate;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand All @@ -90,6 +91,7 @@
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.security.SecurityCapability;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
Expand Down Expand Up @@ -3357,4 +3359,30 @@ public static RegionStatesCount toTableRegionStatesCount(
.build();
}

public static ClientProtos.CheckAndRowMutate toCheckAndRowMutate(
final CheckAndRowMutate checkAndRowMutate) throws IOException {
ClientProtos.CheckAndRowMutate.Builder builder =
ClientProtos.CheckAndRowMutate.newBuilder();
builder.setRow(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getRow()))
.setFamily(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getFamily()))
.setQualifier(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getQualifier() == null ?
HConstants.EMPTY_BYTE_ARRAY : checkAndRowMutate.getQualifier()))
.setComparator(ProtobufUtil.toComparator(
new BinaryComparator(checkAndRowMutate.getValue())))
.setCompareType(HBaseProtos.CompareType.valueOf(checkAndRowMutate.getOp().name()))
.setTimeRange(ProtobufUtil.toTimeRange(checkAndRowMutate.getTimeRange()));
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
MutationType mutateType = null;
Mutation mutation = checkAndRowMutate.getMutation();
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException(mutation.getClass().getName() + " is not support");
}
builder.setMutation(ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder));
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndRowMutate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
Expand Down Expand Up @@ -694,6 +695,9 @@ public static void buildRegionActions(final byte[] regionName,
.setRequest(value)));
} else if (row instanceof RowMutations) {
rowMutationsList.add(action);
} else if (row instanceof CheckAndRowMutate) {
builder.addAction(actionBuilder.setCheckAndRowMutate(
ProtobufUtil.toCheckAndRowMutate((CheckAndRowMutate)row)));
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
Expand Down Expand Up @@ -817,6 +821,9 @@ public static void buildNoDataRegionActions(final byte[] regionName,
.setRequest(value)));
} else if (row instanceof RowMutations) {
rowMutationsList.add(action);
} else if (row instanceof CheckAndRowMutate) {
builder.addAction(actionBuilder.setCheckAndRowMutate(
ProtobufUtil.toCheckAndRowMutate((CheckAndRowMutate)row)));
} else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
}
Expand Down
11 changes: 11 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ message Get {
optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */
}

message CheckAndRowMutate {
required bytes row = 1;
required bytes family = 2;
required bytes qualifier = 3;
required CompareType compare_type = 4;
required Comparator comparator = 5;
optional TimeRange time_range = 6;
required MutationProto mutation = 7;
}

message Result {
// Result includes the Cells or else it just has a count of Cells
// that are carried otherwise.
Expand Down Expand Up @@ -443,6 +453,7 @@ message Action {
optional MutationProto mutation = 2;
optional Get get = 3;
optional CoprocessorServiceCall service_call = 4;
optional CheckAndRowMutate checkAndRowMutate = 5;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,23 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
} else if (action.hasCheckAndRowMutate()) {
ClientProtos.CheckAndRowMutate checkAndRowMutate = action.getCheckAndRowMutate();
byte[] row = checkAndRowMutate.getRow().toByteArray();
byte[] family = checkAndRowMutate.getFamily().toByteArray();
byte[] qualifier = checkAndRowMutate.getQualifier().toByteArray();
CompareOperator compareOp =
CompareOperator.valueOf(checkAndRowMutate.getCompareType().name());
ByteArrayComparable comparator =
ProtobufUtil.toComparator(checkAndRowMutate.getComparator());
TimeRange timeRange = checkAndRowMutate.hasTimeRange() ?
ProtobufUtil.toTimeRange(checkAndRowMutate.getTimeRange()) :
TimeRange.allTime();
Mutation mutation = ProtobufUtil.toMutation(checkAndRowMutate.getMutation());
boolean result = region.checkAndMutate(row, family,
qualifier, compareOp, comparator, timeRange, mutation);
r = new Result();
r.setExists(result);
} else {
throw new HBaseIOException("Unexpected Action type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
Expand Down
Loading