diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java index a58472a119da..81fced03f2fc 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java @@ -48,8 +48,7 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) { @Override public RecordReader getRecordReader( InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { - FileStoreTable table = createFileStoreTable(jobConf); PaimonInputSplit split = (PaimonInputSplit) inputSplit; - return createRecordReader(table, split, jobConf); + return createRecordReader(split, jobConf); } } diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java index 5b639808f86a..64cae0aef998 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java @@ -21,7 +21,9 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataInputDeserializer; import org.apache.paimon.io.DataOutputSerializer; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.InstantiationUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; @@ -41,12 +43,15 @@ public class PaimonInputSplit extends FileSplit { private String path; private DataSplit split; + private FileStoreTable table; + // public no-argument constructor for deserialization public PaimonInputSplit() {} - public PaimonInputSplit(String path, DataSplit split) { + public PaimonInputSplit(String path, DataSplit split, FileStoreTable table) { this.path = path; this.split = split; + this.table = table; } public DataSplit split() { @@ -73,6 +78,10 @@ public String[] getLocations() { return ANYWHERE; } + public FileStoreTable getTable() { + return table; + } + @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(path); @@ -80,6 +89,17 @@ public void write(DataOutput dataOutput) throws IOException { split.serialize(out); dataOutput.writeInt(out.length()); dataOutput.write(out.getCopyOfBuffer()); + writeFileStoreTable(dataOutput); + } + + private void writeFileStoreTable(DataOutput dataOutput) throws IOException { + if (table == null) { + dataOutput.writeInt(0); + } else { + byte[] bytes = InstantiationUtil.serializeObject(table); + dataOutput.writeInt(bytes.length); + dataOutput.write(bytes); + } } @Override @@ -89,6 +109,22 @@ public void readFields(DataInput dataInput) throws IOException { byte[] bytes = new byte[length]; dataInput.readFully(bytes); split = DataSplit.deserialize(new DataInputDeserializer(bytes)); + readFileStoreTable(dataInput); + } + + private void readFileStoreTable(DataInput dataInput) throws IOException { + int length = dataInput.readInt(); + if (length > 0) { + byte[] bytes = new byte[length]; + dataInput.readFully(bytes); + try { + table = + InstantiationUtil.deserializeObject( + bytes, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } } @Override @@ -105,11 +141,13 @@ public boolean equals(Object o) { return false; } PaimonInputSplit that = (PaimonInputSplit) o; - return Objects.equals(path, that.path) && Objects.equals(split, that.split); + return Objects.equals(path, that.path) + && Objects.equals(split, that.split) + && Objects.equals(table, that.table); } @Override public int hashCode() { - return Objects.hash(path, split); + return Objects.hash(path, split, table); } } diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java index 857e3e80584c..beb868c5be7a 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java @@ -174,7 +174,8 @@ public float getProgress() throws IOException { } public static RecordReader createRecordReader( - FileStoreTable table, PaimonInputSplit split, JobConf jobConf) throws IOException { + PaimonInputSplit split, JobConf jobConf) throws IOException { + FileStoreTable table = split.getTable(); ReadBuilder readBuilder = table.newReadBuilder(); createPredicate(table.schema(), jobConf, true).ifPresent(readBuilder::withFilter); List paimonColumns = table.schema().fieldNames(); diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java index 32b1e4745d41..6dab4d4af91c 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java @@ -96,7 +96,10 @@ public static InputSplit[] generateSplits(FileStoreTable table, JobConf jobConf) scan.plan() .splits() .forEach( - split -> splits.add(new PaimonInputSplit(location, (DataSplit) split))); + split -> + splits.add( + new PaimonInputSplit( + location, (DataSplit) split, table))); } return splits.toArray(new InputSplit[0]); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java index 908fce7c1a84..7d152c444778 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java @@ -19,8 +19,20 @@ package org.apache.paimon.hive.mapred; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileTestDataGenerator; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -29,8 +41,11 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -62,8 +77,12 @@ public void testWriteAndRead() throws Exception { .map(d -> d.meta) .collect(Collectors.toList())) .build(); - PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), dataSplit); + PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), dataSplit, null); + assertPaimonInputSplitSerialization(split); + } + + private void assertPaimonInputSplitSerialization(PaimonInputSplit split) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(baos); split.write(output); @@ -75,4 +94,37 @@ public void testWriteAndRead() throws Exception { actual.readFields(input); assertThat(actual).isEqualTo(split); } + + @Test + public void testWriteAndReadWithTable() throws Exception { + Path path = new Path(tempDir.toString()); + SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), path); + schemaManager.createTable( + new Schema( + RowType.of(VarCharType.STRING_TYPE).getFields(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyMap(), + "")); + + FileStoreTable fileStoreTable = FileStoreTableFactory.create(LocalFileIO.create(), path); + writeData(fileStoreTable); + + DataSplit split = (DataSplit) fileStoreTable.newScan().plan().splits().get(0); + + PaimonInputSplit paimonInputSplit = + new PaimonInputSplit(path.toString(), split, fileStoreTable); + + assertPaimonInputSplitSerialization(paimonInputSplit); + } + + private void writeData(FileStoreTable fileStoreTable) throws Exception { + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl tableWrite = fileStoreTable.newWrite(commitUser); + tableWrite.write(GenericRow.of(BinaryString.fromString("1111"))); + TableCommitImpl commit = fileStoreTable.newCommit(commitUser); + commit.commit(0, tableWrite.prepareCommit(true, 0)); + tableWrite.close(); + commit.close(); + } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java index 8597a84dc8b7..97cdc89ebb2c 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java @@ -155,7 +155,7 @@ private PaimonRecordReader read( List originalColumns = ((FileStoreTable) table).schema().fieldNames(); return new PaimonRecordReader( table.newReadBuilder(), - new PaimonInputSplit(tempDir.toString(), dataSplit), + new PaimonInputSplit(tempDir.toString(), dataSplit, (FileStoreTable) table), originalColumns, originalColumns, selectedColumns,