Skip to content

Commit

Permalink
Support using multiple hadoop account
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 30, 2023
1 parent fc91475 commit 624cee9
Show file tree
Hide file tree
Showing 32 changed files with 485 additions and 219 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Read data from hdfs file system.
| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd`.Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd` |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` .default `yyyy-MM-dd HH:mm:ss` |
| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS`.default `HH:mm:ss` |
| remote_user | string | no | - | The login user used to connect to hadoop login name. It is intended to be used for remote users in RPC, it won't have any credentials. |
| kerberos_principal | string | no | - | The principal of kerberos |
| kerberos_keytab_path | string | no | - | The keytab path of kerberos |
| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv.For example, set like following:`skip_header_row_number = 2`.then Seatunnel will skip the first 2 lines from source files |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(
pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(HdfsSourceConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(HdfsSourceConfig.REMOTE_USER.key()));
}

if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(
pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key()));
Expand All @@ -73,6 +74,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
hadoopConf.setKerberosKeytabPath(
pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key()));
}
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
readStrategy.init(hadoopConf);
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<parquet-avro.version>1.12.3</parquet-avro.version>
<poi.version>4.1.2</poi.version>
<poi-ooxml.version>4.1.2</poi-ooxml.version>
<hadoop-minikdc.version>3.1.4</hadoop-minikdc.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -136,6 +137,13 @@
<artifactId>poi-ooxml</artifactId>
<version>${poi-ooxml.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop-minikdc.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ public class BaseSinkConfig {
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> REMOTE_USER =
Options.key("remote_user")
.stringType()
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ public class BaseSourceConfig {
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> REMOTE_USER =
Options.key("remote_user")
.stringType()
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class HadoopConf implements Serializable {
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;

protected String remoteUser;

protected String kerberosPrincipal;
protected String kerberosKeytabPath;

Expand Down
Loading

0 comments on commit 624cee9

Please sign in to comment.