From 31ead24485432eaa5e389ad7db33cbfd83f792ed Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 8 Mar 2024 12:07:28 +0800 Subject: [PATCH] [Bug] Fix OrcWriteStrategy/ParquetWriteStrategy doesn't login with kerberos --- release-note.md | 1 + .../seatunnel/file/config/HadoopConf.java | 18 +++++ .../file/hadoop/HadoopFileSystemProxy.java | 79 +++++-------------- .../sink/writer/AbstractWriteStrategy.java | 15 +--- .../file/sink/writer/OrcWriteStrategy.java | 1 + .../sink/writer/ParquetWriteStrategy.java | 46 ++++++----- 6 files changed, 66 insertions(+), 94 deletions(-) diff --git a/release-note.md b/release-note.md index b399c161e39..4f8f54f2134 100644 --- a/release-note.md +++ b/release-note.md @@ -52,6 +52,7 @@ - [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710) - [Connector-v2] [CDC] Fix jdbc connection leak for mysql (#5037) - [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546 +- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy ### Zeta(ST-Engine) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java index b60ab47c52c..11bbe4d3ab6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; import lombok.Data; @@ -26,6 +27,11 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED; +import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS; +import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96; +import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; + @Data public class HadoopConf implements Serializable { private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; @@ -60,4 +66,16 @@ public void setExtraOptionsForConfiguration(Configuration configuration) { configuration.addResource(new Path(hdfsSitePath)); } } + + public Configuration toConfiguration() { + Configuration configuration = new Configuration(); + configuration.setBoolean(READ_INT96_AS_FIXED, true); + configuration.setBoolean(WRITE_FIXED_AS_INT96, true); + configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false); + configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true); + configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey()); + configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl()); + return configuration; + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java index 61f4520e4b8..79df7ef3fa7 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java @@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -43,11 +42,6 @@ import java.util.ArrayList; import java.util.List; -import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED; -import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS; -import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96; -import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; - @Slf4j public class HadoopFileSystemProxy implements Serializable, Closeable { @@ -64,30 +58,19 @@ public HadoopFileSystemProxy(@NonNull HadoopConf hadoopConf) { } public boolean fileExist(@NonNull String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } - Path fileName = new Path(filePath); - return fileSystem.exists(fileName); + return getFileSystem().exists(new Path(filePath)); } public void createFile(@NonNull String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } - Path path = new Path(filePath); - if (!fileSystem.createNewFile(path)) { + if (!getFileSystem().createNewFile(new Path(filePath))) { throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath); } } public void deleteFile(@NonNull String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } Path path = new Path(filePath); - if (fileSystem.exists(path)) { - if (!fileSystem.delete(path, true)) { + if (getFileSystem().exists(path)) { + if (!getFileSystem().delete(path, true)) { throw CommonError.fileOperationFailed("SeaTunnel", "delete", filePath); } } @@ -98,9 +81,6 @@ public void renameFile( @NonNull String newFilePath, boolean removeWhenNewFilePathExist) throws IOException { - if (fileSystem == null) { - initialize(); - } Path oldPath = new Path(oldFilePath); Path newPath = new Path(newFilePath); @@ -116,7 +96,7 @@ public void renameFile( if (removeWhenNewFilePathExist) { if (fileExist(newFilePath)) { - fileSystem.delete(newPath, true); + getFileSystem().delete(newPath, true); log.info("Delete already file: {}", newPath); } } @@ -124,7 +104,7 @@ public void renameFile( createDir(newPath.getParent().toString()); } - if (fileSystem.rename(oldPath, newPath)) { + if (getFileSystem().rename(oldPath, newPath)) { log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish"); } else { throw CommonError.fileOperationFailed( @@ -133,26 +113,20 @@ public void renameFile( } public void createDir(@NonNull String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } Path dfs = new Path(filePath); - if (!fileSystem.mkdirs(dfs)) { + if (!getFileSystem().mkdirs(dfs)) { throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath); } } public List listFile(String path) throws IOException { - if (fileSystem == null) { - initialize(); - } List fileList = new ArrayList<>(); if (!fileExist(path)) { return fileList; } Path fileName = new Path(path); RemoteIterator locatedFileStatusRemoteIterator = - fileSystem.listFiles(fileName, false); + getFileSystem().listFiles(fileName, false); while (locatedFileStatusRemoteIterator.hasNext()) { fileList.add(locatedFileStatusRemoteIterator.next()); } @@ -160,15 +134,12 @@ public List listFile(String path) throws IOException { } public List getAllSubFiles(@NonNull String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } List pathList = new ArrayList<>(); if (!fileExist(filePath)) { return pathList; } Path fileName = new Path(filePath); - FileStatus[] status = fileSystem.listStatus(fileName); + FileStatus[] status = getFileSystem().listStatus(fileName); if (status != null) { for (FileStatus fileStatus : status) { if (fileStatus.isDirectory()) { @@ -180,31 +151,26 @@ public List getAllSubFiles(@NonNull String filePath) throws IOException { } public FileStatus[] listStatus(String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } - return fileSystem.listStatus(new Path(filePath)); + return getFileSystem().listStatus(new Path(filePath)); } public FileStatus getFileStatus(String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } - return fileSystem.getFileStatus(new Path(filePath)); + return getFileSystem().getFileStatus(new Path(filePath)); } public FSDataOutputStream getOutputStream(String filePath) throws IOException { - if (fileSystem == null) { - initialize(); - } - return fileSystem.create(new Path(filePath), true); + return getFileSystem().create(new Path(filePath), true); } public FSDataInputStream getInputStream(String filePath) throws IOException { + return getFileSystem().open(new Path(filePath)); + } + + public FileSystem getFileSystem() { if (fileSystem == null) { initialize(); } - return fileSystem.open(new Path(filePath)); + return fileSystem; } @SneakyThrows @@ -258,16 +224,7 @@ private void initialize() { } private Configuration createConfiguration() { - Configuration configuration = new Configuration(); - configuration.setBoolean(READ_INT96_AS_FIXED, true); - configuration.setBoolean(WRITE_FIXED_AS_INT96, true); - configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false); - configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true); - configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); - configuration.setBoolean( - String.format("fs.%s.impl.disable.cache", hadoopConf.getSchema()), true); - configuration.set( - String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); + Configuration configuration = hadoopConf.toConfiguration(); hadoopConf.setExtraOptionsForConfiguration(configuration); return configuration; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index ab88b2256dc..68476488a55 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -36,7 +36,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,11 +57,6 @@ import java.util.regex.Matcher; import java.util.stream.Collectors; -import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED; -import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS; -import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96; -import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; - public abstract class AbstractWriteStrategy implements WriteStrategy { protected final Logger log = LoggerFactory.getLogger(this.getClass()); protected final FileSinkConfig fileSinkConfig; @@ -148,14 +142,7 @@ protected SeaTunnelRowType buildSchemaWithRowType( */ @Override public Configuration getConfiguration(HadoopConf hadoopConf) { - Configuration configuration = new Configuration(); - configuration.setBoolean(READ_INT96_AS_FIXED, true); - configuration.setBoolean(WRITE_FIXED_AS_INT96, true); - configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false); - configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false); - configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); - configuration.set( - String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); + Configuration configuration = hadoopConf.toConfiguration(); this.hadoopConf.setExtraOptionsForConfiguration(configuration); return configuration; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index 3a800b942ae..79ca193ea3d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -122,6 +122,7 @@ private Writer getOrCreateWriter(@NonNull String filePath) { .compress(compressFormat.getOrcCompression()) // use orc version 0.12 .version(OrcFile.Version.V_0_12) + .fileSystem(hadoopFileSystemProxy.getFileSystem()) .overwrite(true); Writer newWriter = OrcFile.createWriter(path, options); this.beingWrittenWriter.put(filePath, newWriter); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java index 84ad0a75408..95343ae571f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java @@ -139,25 +139,33 @@ private ParquetWriter getOrCreateWriter(@NonNull String filePath) dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); if (writer == null) { Path path = new Path(filePath); - try { - HadoopOutputFile outputFile = - HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf)); - ParquetWriter newWriter = - AvroParquetWriter.builder(outputFile) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .withDataModel(dataModel) - // use parquet v1 to improve compatibility - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) - .withCompressionCodec(compressFormat.getParquetCompression()) - .withSchema(schema) - .build(); - this.beingWrittenWriter.put(filePath, newWriter); - return newWriter; - } catch (IOException e) { - String errorMsg = String.format("Get parquet writer for file [%s] error", filePath); - throw new FileConnectorException( - CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e); - } + // initialize the kerberos login + return hadoopFileSystemProxy.doWithHadoopAuth( + (configuration, userGroupInformation) -> { + try { + HadoopOutputFile outputFile = + HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf)); + ParquetWriter newWriter = + AvroParquetWriter.builder(outputFile) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withDataModel(dataModel) + // use parquet v1 to improve compatibility + .withWriterVersion( + ParquetProperties.WriterVersion.PARQUET_1_0) + .withCompressionCodec( + compressFormat.getParquetCompression()) + .withSchema(schema) + .build(); + this.beingWrittenWriter.put(filePath, newWriter); + return newWriter; + } catch (IOException e) { + String errorMsg = + String.format( + "Get parquet writer for file [%s] error", filePath); + throw new FileConnectorException( + CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e); + } + }); } return writer; }