Skip to content

Commit 4b7815d

Browse files
authored
HBASE-27649 WALPlayer does not properly dedupe overridden cell versions (#5058)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 6b672cc commit 4b7815d

File tree

4 files changed

+233
-3
lines changed

4 files changed

+233
-3
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import java.io.DataInputStream;
21+
import java.io.DataOutputStream;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.OutputStream;
25+
import org.apache.hadoop.hbase.ExtendedCell;
26+
import org.apache.hadoop.hbase.KeyValue;
27+
import org.apache.hadoop.hbase.KeyValueUtil;
28+
import org.apache.hadoop.hbase.PrivateCellUtil;
29+
import org.apache.hadoop.hbase.util.Bytes;
30+
import org.apache.hadoop.io.serializer.Deserializer;
31+
import org.apache.hadoop.io.serializer.Serialization;
32+
import org.apache.hadoop.io.serializer.Serializer;
33+
import org.apache.yetus.audience.InterfaceAudience;
34+
35+
/**
36+
* Similar to CellSerialization, but includes the sequenceId from an ExtendedCell. This is necessary
37+
* so that CellSortReducer can sort by sequenceId, if applicable. Note that these two serializations
38+
* are not compatible -- data serialized by CellSerialization cannot be deserialized with
39+
* ExtendedCellSerialization and vice versa. This is ok for {@link HFileOutputFormat2} because the
40+
* serialization is not actually used for the actual written HFiles, just intermediate data (between
41+
* mapper and reducer of a single job).
42+
*/
43+
@InterfaceAudience.Private
44+
public class ExtendedCellSerialization implements Serialization<ExtendedCell> {
45+
@Override
46+
public boolean accept(Class<?> c) {
47+
return ExtendedCell.class.isAssignableFrom(c);
48+
}
49+
50+
@Override
51+
public ExtendedCellDeserializer getDeserializer(Class<ExtendedCell> t) {
52+
return new ExtendedCellDeserializer();
53+
}
54+
55+
@Override
56+
public ExtendedCellSerializer getSerializer(Class<ExtendedCell> c) {
57+
return new ExtendedCellSerializer();
58+
}
59+
60+
public static class ExtendedCellDeserializer implements Deserializer<ExtendedCell> {
61+
private DataInputStream dis;
62+
63+
@Override
64+
public void close() throws IOException {
65+
this.dis.close();
66+
}
67+
68+
@Override
69+
public KeyValue deserialize(ExtendedCell ignore) throws IOException {
70+
KeyValue kv = KeyValueUtil.create(this.dis);
71+
PrivateCellUtil.setSequenceId(kv, this.dis.readLong());
72+
return kv;
73+
}
74+
75+
@Override
76+
public void open(InputStream is) throws IOException {
77+
this.dis = new DataInputStream(is);
78+
}
79+
}
80+
81+
public static class ExtendedCellSerializer implements Serializer<ExtendedCell> {
82+
private DataOutputStream dos;
83+
84+
@Override
85+
public void close() throws IOException {
86+
this.dos.close();
87+
}
88+
89+
@Override
90+
public void open(OutputStream os) throws IOException {
91+
this.dos = new DataOutputStream(os);
92+
}
93+
94+
@Override
95+
public void serialize(ExtendedCell kv) throws IOException {
96+
dos.writeInt(PrivateCellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT);
97+
PrivateCellUtil.writeCell(kv, dos, true);
98+
dos.writeLong(kv.getSequenceId());
99+
}
100+
}
101+
}

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.nio.charset.Charset;
3131
import java.util.ArrayList;
3232
import java.util.Arrays;
33+
import java.util.Collections;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.Map.Entry;
@@ -159,6 +160,15 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
159160
static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
160161
"hbase.mapreduce.use.multi.table.hfileoutputformat";
161162

163+
/**
164+
* ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
165+
* package-private for internal usage for jobs like WALPlayer which need to use features of
166+
* ExtendedCell.
167+
*/
168+
static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
169+
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
170+
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
171+
162172
public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
163173
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
164174
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
@@ -619,9 +629,7 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
619629
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
620630
}
621631

622-
conf.setStrings("io.serializations", conf.get("io.serializations"),
623-
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
624-
CellSerialization.class.getName());
632+
mergeSerializations(conf);
625633

626634
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
627635
LOG.info("bulkload locality sensitive enabled");
@@ -670,6 +678,33 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
670678
LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
671679
}
672680

681+
private static void mergeSerializations(Configuration conf) {
682+
List<String> serializations = new ArrayList<>();
683+
684+
// add any existing values that have been set
685+
String[] existing = conf.getStrings("io.serializations");
686+
if (existing != null) {
687+
Collections.addAll(serializations, existing);
688+
}
689+
690+
serializations.add(MutationSerialization.class.getName());
691+
serializations.add(ResultSerialization.class.getName());
692+
693+
// Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
694+
// SerializationFactory runs through serializations in the order they are registered.
695+
// We want to register ExtendedCellSerialization before CellSerialization because both
696+
// work for ExtendedCells but only ExtendedCellSerialization handles them properly.
697+
if (
698+
conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
699+
EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
700+
) {
701+
serializations.add(ExtendedCellSerialization.class.getName());
702+
}
703+
serializations.add(CellSerialization.class.getName());
704+
705+
conf.setStrings("io.serializations", serializations.toArray(new String[0]));
706+
}
707+
673708
public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
674709
throws IOException {
675710
Configuration conf = job.getConfiguration();

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.hbase.Cell;
3333
import org.apache.hadoop.hbase.CellUtil;
3434
import org.apache.hadoop.hbase.HBaseConfiguration;
35+
import org.apache.hadoop.hbase.PrivateCellUtil;
3536
import org.apache.hadoop.hbase.TableName;
3637
import org.apache.hadoop.hbase.client.Connection;
3738
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -105,6 +106,13 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
105106
if (WALEdit.isMetaEditFamily(cell)) {
106107
continue;
107108
}
109+
110+
// Set sequenceId from WALKey, since it is not included by WALCellCodec. The sequenceId
111+
// on WALKey is the same value that was on the cells in the WALEdit. This enables
112+
// CellSortReducer to use sequenceId to disambiguate duplicate cell timestamps.
113+
// See HBASE-27649
114+
PrivateCellUtil.setSequenceId(cell, key.getSequenceId());
115+
108116
byte[] outKey = multiTableSupport
109117
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
110118
: CellUtil.cloneRow(cell);
@@ -308,6 +316,11 @@ public Job createSubmittableJob(String[] args) throws IOException {
308316
if (hfileOutPath != null) {
309317
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
310318

319+
// WALPlayer needs ExtendedCellSerialization so that sequenceId can be propagated when
320+
// sorting cells in CellSortReducer
321+
job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
322+
true);
323+
311324
// the bulk HFile case
312325
List<TableName> tableNames = getTableNameList(tables);
313326

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hbase.mapreduce;
1919

20+
import static org.hamcrest.CoreMatchers.equalTo;
21+
import static org.hamcrest.CoreMatchers.notNullValue;
22+
import static org.hamcrest.CoreMatchers.nullValue;
23+
import static org.hamcrest.MatcherAssert.assertThat;
2024
import static org.junit.Assert.assertEquals;
2125
import static org.junit.Assert.assertTrue;
2226
import static org.junit.Assert.fail;
@@ -29,6 +33,7 @@
2933
import java.io.File;
3034
import java.io.PrintStream;
3135
import java.util.ArrayList;
36+
import java.util.concurrent.ThreadLocalRandom;
3237
import org.apache.hadoop.conf.Configuration;
3338
import org.apache.hadoop.fs.FileSystem;
3439
import org.apache.hadoop.fs.Path;
@@ -50,8 +55,10 @@
5055
import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits;
5156
import org.apache.hadoop.hbase.testclassification.LargeTests;
5257
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
58+
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
5359
import org.apache.hadoop.hbase.util.Bytes;
5460
import org.apache.hadoop.hbase.util.CommonFSUtils;
61+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
5562
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
5663
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
5764
import org.apache.hadoop.hbase.wal.WAL;
@@ -131,6 +138,80 @@ public void testPlayingRecoveredEdit() throws Exception {
131138
assertTrue(TEST_UTIL.countRows(tn) > 0);
132139
}
133140

141+
/**
142+
* Tests that when you write multiple cells with the same timestamp they are properly sorted by
143+
* their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from
144+
* the resulting bulkloaded HFiles. See HBASE-27649
145+
*/
146+
@Test
147+
public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception {
148+
final TableName tableName = TableName.valueOf(name.getMethodName() + "1");
149+
final byte[] family = Bytes.toBytes("family");
150+
final byte[] column1 = Bytes.toBytes("c1");
151+
final byte[] column2 = Bytes.toBytes("c2");
152+
final byte[] row = Bytes.toBytes("row");
153+
Table table = TEST_UTIL.createTable(tableName, family);
154+
155+
long now = EnvironmentEdgeManager.currentTime();
156+
// put a row into the first table
157+
Put p = new Put(row);
158+
p.addColumn(family, column1, now, column1);
159+
p.addColumn(family, column2, now, column2);
160+
161+
table.put(p);
162+
163+
byte[] lastVal = null;
164+
165+
for (int i = 0; i < 50; i++) {
166+
lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
167+
p = new Put(row);
168+
p.addColumn(family, column1, now, lastVal);
169+
170+
table.put(p);
171+
172+
// wal rolling is necessary to trigger the bug. otherwise no sorting
173+
// needs to occur in the reducer because it's all sorted and coming from a single file.
174+
if (i % 10 == 0) {
175+
WAL log = cluster.getRegionServer(0).getWAL(null);
176+
log.rollWriter();
177+
}
178+
}
179+
180+
WAL log = cluster.getRegionServer(0).getWAL(null);
181+
log.rollWriter();
182+
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
183+
HConstants.HREGION_LOGDIR_NAME).toString();
184+
185+
Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
186+
String outPath = "/tmp/" + name.getMethodName();
187+
configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath);
188+
configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
189+
190+
WALPlayer player = new WALPlayer(configuration);
191+
assertEquals(0, ToolRunner.run(configuration, player,
192+
new String[] { walInputDir, tableName.getNameAsString() }));
193+
194+
Get g = new Get(row);
195+
Result result = table.get(g);
196+
byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
197+
assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal)));
198+
199+
table = TEST_UTIL.truncateTable(tableName);
200+
g = new Get(row);
201+
result = table.get(g);
202+
assertThat(result.listCells(), nullValue());
203+
204+
BulkLoadHFiles.create(configuration).bulkLoad(tableName,
205+
new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString()));
206+
207+
g = new Get(row);
208+
result = table.get(g);
209+
value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
210+
211+
assertThat(result.listCells(), notNullValue());
212+
assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal)));
213+
}
214+
134215
/**
135216
* Simple end-to-end test
136217
*/

0 commit comments

Comments
 (0)