Skip to content

Commit b7def4f

Browse files
ksravistaSravi Kommineni
andauthored
HBASE-28459 HFileOutputFormat2 ClassCastException with s3 magic committer (#5858)
Co-authored-by: Sravi Kommineni <skommineni@hubspot.com> Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 7da84c2 commit b7def4f

File tree

2 files changed

+62
-2
lines changed

2 files changed

+62
-2
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.hadoop.hbase.util.CommonFSUtils;
8080
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
8181
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
82+
import org.apache.hadoop.hbase.util.ReflectionUtils;
8283
import org.apache.hadoop.io.NullWritable;
8384
import org.apache.hadoop.io.SequenceFile;
8485
import org.apache.hadoop.io.Text;
@@ -87,7 +88,6 @@
8788
import org.apache.hadoop.mapreduce.OutputFormat;
8889
import org.apache.hadoop.mapreduce.RecordWriter;
8990
import org.apache.hadoop.mapreduce.TaskAttemptContext;
90-
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
9191
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
9292
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
9393
import org.apache.yetus.audience.InterfaceAudience;
@@ -215,11 +215,15 @@ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[]
215215
return combineTableNameSuffix(tableName, family);
216216
}
217217

218+
protected static Path getWorkPath(final OutputCommitter committer) {
219+
return (Path) ReflectionUtils.invokeMethod(committer, "getWorkPath");
220+
}
221+
218222
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
219223
final TaskAttemptContext context, final OutputCommitter committer) throws IOException {
220224

221225
// Get the path of the temporary output file
222-
final Path outputDir = ((FileOutputCommitter) committer).getWorkPath();
226+
final Path outputDir = getWorkPath(committer);
223227
final Configuration conf = context.getConfiguration();
224228
final boolean writeMultipleTables =
225229
conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,12 @@
112112
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
113113
import org.apache.hadoop.io.NullWritable;
114114
import org.apache.hadoop.mapreduce.Job;
115+
import org.apache.hadoop.mapreduce.JobContext;
115116
import org.apache.hadoop.mapreduce.Mapper;
117+
import org.apache.hadoop.mapreduce.OutputCommitter;
116118
import org.apache.hadoop.mapreduce.RecordWriter;
117119
import org.apache.hadoop.mapreduce.TaskAttemptContext;
120+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
118121
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
119122
import org.junit.ClassRule;
120123
import org.junit.Ignore;
@@ -1599,6 +1602,59 @@ public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
15991602
}
16001603
}
16011604

1605+
@Test
1606+
public void itGetsWorkPathHadoop2() throws Exception {
1607+
Configuration conf = new Configuration(this.util.getConfiguration());
1608+
Job job = new Job(conf);
1609+
FileOutputCommitter committer =
1610+
new FileOutputCommitter(new Path("/test"), createTestTaskAttemptContext(job));
1611+
assertEquals(committer.getWorkPath(), HFileOutputFormat2.getWorkPath(committer));
1612+
}
1613+
1614+
@Test
1615+
public void itGetsWorkPathHadoo3() {
1616+
Hadoop3TestOutputCommitter committer = new Hadoop3TestOutputCommitter(new Path("/test"));
1617+
assertEquals(committer.getWorkPath(), HFileOutputFormat2.getWorkPath(committer));
1618+
}
1619+
1620+
static class Hadoop3TestOutputCommitter extends OutputCommitter {
1621+
1622+
Path path;
1623+
1624+
Hadoop3TestOutputCommitter(Path path) {
1625+
this.path = path;
1626+
}
1627+
1628+
public Path getWorkPath() {
1629+
return path;
1630+
}
1631+
1632+
@Override
1633+
public void setupJob(JobContext jobContext) throws IOException {
1634+
1635+
}
1636+
1637+
@Override
1638+
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
1639+
1640+
}
1641+
1642+
@Override
1643+
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
1644+
return false;
1645+
}
1646+
1647+
@Override
1648+
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
1649+
1650+
}
1651+
1652+
@Override
1653+
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
1654+
1655+
}
1656+
}
1657+
16021658
private static class ConfigurationCaptorConnection implements Connection {
16031659
private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
16041660

0 commit comments

Comments
 (0)