Skip to content

Commit 6d670db

Browse files
committed
HBASE-8458 Support for batch version of checkAndPut() and checkAndDelete()
1 parent 2feca0a commit 6d670db

File tree

9 files changed

+514
-1
lines changed

9 files changed

+514
-1
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.util.Arrays;
22+
import org.apache.hadoop.hbase.CompareOperator;
23+
import org.apache.hadoop.hbase.io.TimeRange;
24+
import org.apache.hadoop.hbase.util.Bytes;
25+
import org.apache.yetus.audience.InterfaceAudience;
26+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
27+
28+
/**
29+
* Used to perform CheckAndRowMutate operations on a single row.
30+
*/
31+
@InterfaceAudience.Public
32+
public class CheckAndRowMutate implements Row {
33+
private final byte[] row;
34+
private final byte[] family;
35+
private byte[] qualifier;
36+
private TimeRange timeRange = null;
37+
private CompareOperator op;
38+
private byte[] value;
39+
private Mutation mutation = null;
40+
41+
/**
42+
* Create a CheckAndRowMutate operation for the specified row.
43+
*
44+
* @param row row key
45+
* @param family family
46+
*/
47+
public CheckAndRowMutate(byte[] row, byte[] family) {
48+
this.row = Bytes.copy(Mutation.checkRow(row));
49+
this.family = Preconditions.checkNotNull(family, "family is null");
50+
}
51+
52+
/**
53+
* Create a CheckAndRowMutate operation for the specified row,
54+
* and an existing row lock.
55+
*
56+
* @param row row key
57+
* @param family family
58+
* @param qualifier qualifier
59+
* @param value value
60+
* @param mutation mutation
61+
*/
62+
public CheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator compareOp,
63+
byte[] value, Mutation mutation) {
64+
this.row = Bytes.copy(Mutation.checkRow(row));
65+
this.family = Preconditions.checkNotNull(family, "family is null");
66+
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null");
67+
this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
68+
this.value = Preconditions.checkNotNull(value, "value is null");
69+
this.mutation = mutation;
70+
}
71+
72+
public CheckAndRowMutate qualifier(byte[] qualifier) {
73+
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
74+
+ " an empty byte array, or just do not call this method if you want a null qualifier");
75+
return this;
76+
}
77+
78+
public CheckAndRowMutate timeRange(TimeRange timeRange) {
79+
this.timeRange = timeRange;
80+
return this;
81+
}
82+
83+
public CheckAndRowMutate ifNotExists() {
84+
this.op = CompareOperator.EQUAL;
85+
this.value = null;
86+
return this;
87+
}
88+
89+
public CheckAndRowMutate ifMatches(CompareOperator compareOp, byte[] value) {
90+
this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
91+
this.value = Preconditions.checkNotNull(value, "value is null");
92+
return this;
93+
}
94+
95+
public CheckAndRowMutate addMutation(Mutation mutation) throws IOException {
96+
this.mutation = mutation;
97+
return this;
98+
}
99+
100+
/**
101+
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
102+
* Use {@link Row#COMPARATOR} instead
103+
*/
104+
@Deprecated
105+
@Override
106+
public int compareTo(Row i) {
107+
return Bytes.compareTo(this.getRow(), i.getRow());
108+
}
109+
110+
/**
111+
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
112+
* No replacement
113+
*/
114+
@Deprecated
115+
@Override
116+
public boolean equals(Object obj) {
117+
if (obj == this) {
118+
return true;
119+
}
120+
if (obj instanceof CheckAndRowMutate) {
121+
CheckAndRowMutate other = (CheckAndRowMutate)obj;
122+
return compareTo(other) == 0;
123+
}
124+
return false;
125+
}
126+
127+
/**
128+
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
129+
* No replacement
130+
*/
131+
@Deprecated
132+
@Override
133+
public int hashCode(){
134+
return Arrays.hashCode(row);
135+
}
136+
137+
/**
138+
* Method for retrieving the delete's row
139+
*
140+
* @return row
141+
*/
142+
@Override
143+
public byte[] getRow() {
144+
return this.row;
145+
}
146+
147+
public byte[] getFamily() {
148+
return family;
149+
}
150+
151+
public byte[] getQualifier() {
152+
return qualifier;
153+
}
154+
155+
public TimeRange getTimeRange() {
156+
return timeRange;
157+
}
158+
159+
public CompareOperator getOp() {
160+
return op;
161+
}
162+
163+
public byte[] getValue() {
164+
return value;
165+
}
166+
167+
public Mutation getMutation() {
168+
return mutation;
169+
}
170+
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,25 @@ public <R extends Message> void batchCoprocessorService(
11921192
}
11931193
}
11941194

1195+
@Override
1196+
public Boolean [] checkAndRowMutate(final List<CheckAndRowMutate> checkAndRowMutates)
1197+
throws IOException {
1198+
try {
1199+
Object[] r1 = new Object[checkAndRowMutates.size()];
1200+
batch((List<? extends Row>)checkAndRowMutates, r1, readRpcTimeoutMs);
1201+
// Translate.
1202+
Boolean [] results = new Boolean[r1.length];
1203+
int i = 0;
1204+
for (Object obj: r1) {
1205+
// Batch ensures if there is a failure we get an exception instead
1206+
results[i++] = ((Result)obj).getExists();
1207+
}
1208+
return results;
1209+
} catch (InterruptedException e) {
1210+
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1211+
}
1212+
}
1213+
11951214
@Override
11961215
public RegionLocator getRegionLocator() {
11971216
return this.locator;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,11 @@ default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
479479
return checkAndMutate(row, family, qualifier, op, value, mutations);
480480
}
481481

482+
default Boolean[] checkAndRowMutate(final List<CheckAndRowMutate> checkAndRowMutates)
483+
throws IOException {
484+
throw new NotImplementedException("Add an implementation!");
485+
}
486+
482487
/**
483488
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
484489
* adds the Put/Delete/RowMutations.

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.hadoop.hbase.ServerName;
6666
import org.apache.hadoop.hbase.TableName;
6767
import org.apache.hadoop.hbase.client.Append;
68+
import org.apache.hadoop.hbase.client.CheckAndRowMutate;
6869
import org.apache.hadoop.hbase.client.ClientUtil;
6970
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
7071
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -89,6 +90,7 @@
8990
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
9091
import org.apache.hadoop.hbase.client.security.SecurityCapability;
9192
import org.apache.hadoop.hbase.exceptions.DeserializationException;
93+
import org.apache.hadoop.hbase.filter.BinaryComparator;
9294
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
9395
import org.apache.hadoop.hbase.filter.Filter;
9496
import org.apache.hadoop.hbase.io.TimeRange;
@@ -3299,4 +3301,30 @@ public static Set<String> toCompactedStoreFiles(byte[] bytes) throws IOException
32993301
}
33003302
return Collections.emptySet();
33013303
}
3304+
3305+
public static ClientProtos.CheckAndRowMutate toCheckAndRowMutate(
3306+
final CheckAndRowMutate checkAndRowMutate) throws IOException {
3307+
ClientProtos.CheckAndRowMutate.Builder builder =
3308+
ClientProtos.CheckAndRowMutate.newBuilder();
3309+
builder.setRow(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getRow()))
3310+
.setFamily(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getFamily()))
3311+
.setQualifier(UnsafeByteOperations.unsafeWrap(checkAndRowMutate.getQualifier() == null ?
3312+
HConstants.EMPTY_BYTE_ARRAY : checkAndRowMutate.getQualifier()))
3313+
.setComparator(ProtobufUtil.toComparator(
3314+
new BinaryComparator(checkAndRowMutate.getValue())))
3315+
.setCompareType(HBaseProtos.CompareType.valueOf(checkAndRowMutate.getOp().name()))
3316+
.setTimeRange(ProtobufUtil.toTimeRange(checkAndRowMutate.getTimeRange()));
3317+
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
3318+
MutationType mutateType = null;
3319+
Mutation mutation = checkAndRowMutate.getMutation();
3320+
if (mutation instanceof Put) {
3321+
mutateType = MutationType.PUT;
3322+
} else if (mutation instanceof Delete) {
3323+
mutateType = MutationType.DELETE;
3324+
} else {
3325+
throw new DoNotRetryIOException(mutation.getClass().getName() + " is not support");
3326+
}
3327+
builder.setMutation(ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder));
3328+
return builder.build();
3329+
}
33023330
}

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hbase.TableName;
3838
import org.apache.hadoop.hbase.client.Action;
3939
import org.apache.hadoop.hbase.client.Append;
40+
import org.apache.hadoop.hbase.client.CheckAndRowMutate;
4041
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
4142
import org.apache.hadoop.hbase.client.Delete;
4243
import org.apache.hadoop.hbase.client.Durability;
@@ -687,6 +688,9 @@ public static void buildRegionActions(final byte[] regionName,
687688
.setRequest(value)));
688689
} else if (row instanceof RowMutations) {
689690
rowMutationsList.add(action);
691+
} else if (row instanceof CheckAndRowMutate) {
692+
builder.addAction(actionBuilder.setCheckAndRowMutate(
693+
ProtobufUtil.toCheckAndRowMutate((CheckAndRowMutate)row)));
690694
} else {
691695
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
692696
}
@@ -810,6 +814,9 @@ public static void buildNoDataRegionActions(final byte[] regionName,
810814
.setRequest(value)));
811815
} else if (row instanceof RowMutations) {
812816
rowMutationsList.add(action);
817+
} else if (row instanceof CheckAndRowMutate) {
818+
builder.addAction(actionBuilder.setCheckAndRowMutate(
819+
ProtobufUtil.toCheckAndRowMutate((CheckAndRowMutate)row)));
813820
} else {
814821
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
815822
}

hbase-protocol-shaded/src/main/protobuf/Client.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ message Get {
9292
optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */
9393
}
9494

95+
message CheckAndRowMutate {
96+
required bytes row = 1;
97+
required bytes family = 2;
98+
required bytes qualifier = 3;
99+
required CompareType compare_type = 4;
100+
required Comparator comparator = 5;
101+
optional TimeRange time_range = 6;
102+
required MutationProto mutation = 7;
103+
}
104+
95105
message Result {
96106
// Result includes the Cells or else it just has a count of Cells
97107
// that are carried otherwise.
@@ -442,6 +452,7 @@ message Action {
442452
optional MutationProto mutation = 2;
443453
optional Get get = 3;
444454
optional CoprocessorServiceCall service_call = 4;
455+
optional CheckAndRowMutate checkAndRowMutate = 5;
445456
}
446457

447458
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,23 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
894894
default:
895895
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
896896
}
897+
} else if (action.hasCheckAndRowMutate()) {
898+
ClientProtos.CheckAndRowMutate checkAndRowMutate = action.getCheckAndRowMutate();
899+
byte[] row = checkAndRowMutate.getRow().toByteArray();
900+
byte[] family = checkAndRowMutate.getFamily().toByteArray();
901+
byte[] qualifier = checkAndRowMutate.getQualifier().toByteArray();
902+
CompareOperator compareOp =
903+
CompareOperator.valueOf(checkAndRowMutate.getCompareType().name());
904+
ByteArrayComparable comparator =
905+
ProtobufUtil.toComparator(checkAndRowMutate.getComparator());
906+
TimeRange timeRange = checkAndRowMutate.hasTimeRange() ?
907+
ProtobufUtil.toTimeRange(checkAndRowMutate.getTimeRange()) :
908+
TimeRange.allTime();
909+
Mutation mutation = ProtobufUtil.toMutation(checkAndRowMutate.getMutation());
910+
boolean result = region.checkAndMutate(row, family,
911+
qualifier, compareOp, comparator, timeRange, mutation);
912+
r = new Result();
913+
r.setExists(result);
897914
} else {
898915
throw new HBaseIOException("Unexpected Action type");
899916
}

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCheckAndMutate.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static org.junit.Assert.assertTrue;
2121
import static org.junit.Assert.fail;
22-
2322
import java.io.IOException;
2423
import org.apache.hadoop.hbase.HBaseClassTestRule;
2524
import org.apache.hadoop.hbase.HBaseTestingUtility;

0 commit comments

Comments
 (0)