Skip to content

Commit

Permalink
[Bug] Fix OrcWriteStrategy/ParquetWriteStrategy doesn't login with ke…
Browse files Browse the repository at this point in the history
…rberos
  • Loading branch information
ruanwenjun committed Mar 8, 2024
1 parent 0100bda commit 31ead24
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 94 deletions.
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710)
- [Connector-v2] [CDC] Fix jdbc connection leak for mysql (#5037)
- [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy

### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;

import lombok.Data;
Expand All @@ -26,6 +27,11 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

@Data
public class HadoopConf implements Serializable {
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
Expand Down Expand Up @@ -60,4 +66,16 @@ public void setExtraOptionsForConfiguration(Configuration configuration) {
configuration.addResource(new Path(hdfsSitePath));
}
}

public Configuration toConfiguration() {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl());
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -43,11 +42,6 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

@Slf4j
public class HadoopFileSystemProxy implements Serializable, Closeable {

Expand All @@ -64,30 +58,19 @@ public HadoopFileSystemProxy(@NonNull HadoopConf hadoopConf) {
}

public boolean fileExist(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path fileName = new Path(filePath);
return fileSystem.exists(fileName);
return getFileSystem().exists(new Path(filePath));
}

public void createFile(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path path = new Path(filePath);
if (!fileSystem.createNewFile(path)) {
if (!getFileSystem().createNewFile(new Path(filePath))) {
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
}
}

public void deleteFile(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path path = new Path(filePath);
if (fileSystem.exists(path)) {
if (!fileSystem.delete(path, true)) {
if (getFileSystem().exists(path)) {
if (!getFileSystem().delete(path, true)) {
throw CommonError.fileOperationFailed("SeaTunnel", "delete", filePath);
}
}
Expand All @@ -98,9 +81,6 @@ public void renameFile(
@NonNull String newFilePath,
boolean removeWhenNewFilePathExist)
throws IOException {
if (fileSystem == null) {
initialize();
}
Path oldPath = new Path(oldFilePath);
Path newPath = new Path(newFilePath);

Expand All @@ -116,15 +96,15 @@ public void renameFile(

if (removeWhenNewFilePathExist) {
if (fileExist(newFilePath)) {
fileSystem.delete(newPath, true);
getFileSystem().delete(newPath, true);
log.info("Delete already file: {}", newPath);
}
}
if (!fileExist(newPath.getParent().toString())) {
createDir(newPath.getParent().toString());
}

if (fileSystem.rename(oldPath, newPath)) {
if (getFileSystem().rename(oldPath, newPath)) {
log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
} else {
throw CommonError.fileOperationFailed(
Expand All @@ -133,42 +113,33 @@ public void renameFile(
}

public void createDir(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
Path dfs = new Path(filePath);
if (!fileSystem.mkdirs(dfs)) {
if (!getFileSystem().mkdirs(dfs)) {
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
}
}

public List<LocatedFileStatus> listFile(String path) throws IOException {
if (fileSystem == null) {
initialize();
}
List<LocatedFileStatus> fileList = new ArrayList<>();
if (!fileExist(path)) {
return fileList;
}
Path fileName = new Path(path);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
fileSystem.listFiles(fileName, false);
getFileSystem().listFiles(fileName, false);
while (locatedFileStatusRemoteIterator.hasNext()) {
fileList.add(locatedFileStatusRemoteIterator.next());
}
return fileList;
}

public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
List<Path> pathList = new ArrayList<>();
if (!fileExist(filePath)) {
return pathList;
}
Path fileName = new Path(filePath);
FileStatus[] status = fileSystem.listStatus(fileName);
FileStatus[] status = getFileSystem().listStatus(fileName);
if (status != null) {
for (FileStatus fileStatus : status) {
if (fileStatus.isDirectory()) {
Expand All @@ -180,31 +151,26 @@ public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
}

public FileStatus[] listStatus(String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
return fileSystem.listStatus(new Path(filePath));
return getFileSystem().listStatus(new Path(filePath));
}

public FileStatus getFileStatus(String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
return fileSystem.getFileStatus(new Path(filePath));
return getFileSystem().getFileStatus(new Path(filePath));
}

public FSDataOutputStream getOutputStream(String filePath) throws IOException {
if (fileSystem == null) {
initialize();
}
return fileSystem.create(new Path(filePath), true);
return getFileSystem().create(new Path(filePath), true);
}

public FSDataInputStream getInputStream(String filePath) throws IOException {
return getFileSystem().open(new Path(filePath));
}

public FileSystem getFileSystem() {
if (fileSystem == null) {
initialize();
}
return fileSystem.open(new Path(filePath));
return fileSystem;
}

@SneakyThrows
Expand Down Expand Up @@ -258,16 +224,7 @@ private void initialize() {
}

private Configuration createConfiguration() {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.setBoolean(
String.format("fs.%s.impl.disable.cache", hadoopConf.getSchema()), true);
configuration.set(
String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
Configuration configuration = hadoopConf.toConfiguration();
hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -58,11 +57,6 @@
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

public abstract class AbstractWriteStrategy implements WriteStrategy {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final FileSinkConfig fileSinkConfig;
Expand Down Expand Up @@ -148,14 +142,7 @@ protected SeaTunnelRowType buildSchemaWithRowType(
*/
@Override
public Configuration getConfiguration(HadoopConf hadoopConf) {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.set(
String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
Configuration configuration = hadoopConf.toConfiguration();
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private Writer getOrCreateWriter(@NonNull String filePath) {
.compress(compressFormat.getOrcCompression())
// use orc version 0.12
.version(OrcFile.Version.V_0_12)
.fileSystem(hadoopFileSystemProxy.getFileSystem())
.overwrite(true);
Writer newWriter = OrcFile.createWriter(path, options);
this.beingWrittenWriter.put(filePath, newWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,33 @@ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath)
dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
if (writer == null) {
Path path = new Path(filePath);
try {
HadoopOutputFile outputFile =
HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
ParquetWriter<GenericRecord> newWriter =
AvroParquetWriter.<GenericRecord>builder(outputFile)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withDataModel(dataModel)
// use parquet v1 to improve compatibility
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(compressFormat.getParquetCompression())
.withSchema(schema)
.build();
this.beingWrittenWriter.put(filePath, newWriter);
return newWriter;
} catch (IOException e) {
String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
throw new FileConnectorException(
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
}
// initialize the kerberos login
return hadoopFileSystemProxy.doWithHadoopAuth(
(configuration, userGroupInformation) -> {
try {
HadoopOutputFile outputFile =
HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
ParquetWriter<GenericRecord> newWriter =
AvroParquetWriter.<GenericRecord>builder(outputFile)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withDataModel(dataModel)
// use parquet v1 to improve compatibility
.withWriterVersion(
ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(
compressFormat.getParquetCompression())
.withSchema(schema)
.build();
this.beingWrittenWriter.put(filePath, newWriter);
return newWriter;
} catch (IOException e) {
String errorMsg =
String.format(
"Get parquet writer for file [%s] error", filePath);
throw new FileConnectorException(
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
}
});
}
return writer;
}
Expand Down

0 comments on commit 31ead24

Please sign in to comment.