Skip to content

Commit

Permalink
[Improve][Connector-v2] The hive connector support multiple filesystem (
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai authored Apr 18, 2024
1 parent 79bb701 commit 8a4c01f
Show file tree
Hide file tree
Showing 40 changed files with 1,992 additions and 25 deletions.
177 changes: 177 additions & 0 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| hive.hadoop.conf | Map | no | - |
| hive.hadoop.conf-path | string | no | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
Expand All @@ -57,6 +59,16 @@ The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### hive_site_path [string]

The path of `hive-site.xml`

### hive.hadoop.conf [map]

Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')

### hive.hadoop.conf-path [string]

The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files

### krb5_path [string]

The path of `krb5.conf`, used to authentication kerberos
Expand Down Expand Up @@ -162,6 +174,171 @@ sink {
Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
hive.hadoop.conf = {
bucket = "s3a://mybucket"
}
}
```

## Hive on s3

### Step 1

Create the lib dir for hive of emr.

```shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 2

Get the jars from maven center to the lib.

```shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
```

### Step 3

Copy the jars from your environment on emr to the lib dir.

```shell
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 4

Run the case.

```shell
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_s3"
metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
}
}
}
```

## Hive on oss

### Step 1

Create the lib dir for hive of emr.

```shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
```

### Step 2

Get the jars from maven center to the lib.

```shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
```

### Step 3

Copy the jars from your environment on emr to the lib dir and delete the conflicting jar.

```shell
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
```

### Step 4

Run the case.

```shell
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_oss"
metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
hive.hadoop.conf = {
bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
}
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;

import java.util.Objects;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;

public abstract class BaseHdfsFileSink extends BaseFileSink {
Expand All @@ -44,7 +46,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
getPluginName(), PluginType.SINK, result.getMsg()));
}
super.prepare(pluginConfig);
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
// Avoid overwriting hadoopConf for subclass initialization. If a subclass is initialized,
// it is not initialized here.
if (Objects.isNull(hadoopConf)) {
hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
}
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;

import java.io.IOException;
import java.util.Objects;

public abstract class BaseHdfsFileSource extends BaseFileSource {

Expand All @@ -56,8 +57,13 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
String path = pluginConfig.getString(HdfsSourceConfigOptions.FILE_PATH.key());
hadoopConf =
new HadoopConf(pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
// Avoid overwriting hadoopConf for subclass initialization. If a subclass is initialized,
// it is not initialized here.
if (Objects.isNull(hadoopConf)) {
hadoopConf =
new HadoopConf(
pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfigOptions.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(HdfsSourceConfigOptions.HDFS_SITE_PATH.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,50 @@ public String getSchema() {

public void setExtraOptionsForConfiguration(Configuration configuration) {
if (!extraOptions.isEmpty()) {
removeUnwantedOverwritingProps(extraOptions);
extraOptions.forEach(configuration::set);
}
if (hdfsSitePath != null) {
configuration.addResource(new Path(hdfsSitePath));
Configuration hdfsSiteConfiguration = new Configuration();
hdfsSiteConfiguration.addResource(new Path(hdfsSitePath));
unsetUnwantedOverwritingProps(hdfsSiteConfiguration);
configuration.addResource(hdfsSiteConfiguration);
}
}

private void removeUnwantedOverwritingProps(Map extraOptions) {
extraOptions.remove(getFsDefaultNameKey());
extraOptions.remove(getHdfsImplKey());
extraOptions.remove(getHdfsImplDisableCacheKey());
}

public void unsetUnwantedOverwritingProps(Configuration hdfsSiteConfiguration) {
hdfsSiteConfiguration.unset(getFsDefaultNameKey());
hdfsSiteConfiguration.unset(getHdfsImplKey());
hdfsSiteConfiguration.unset(getHdfsImplDisableCacheKey());
}

public Configuration toConfiguration() {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true);
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl());
configuration.setBoolean(getHdfsImplDisableCacheKey(), true);
configuration.set(getFsDefaultNameKey(), getHdfsNameKey());
configuration.set(getHdfsImplKey(), getFsHdfsImpl());
return configuration;
}

public String getFsDefaultNameKey() {
return CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
}

public String getHdfsImplKey() {
return String.format("fs.%s.impl", getSchema());
}

public String getHdfsImplDisableCacheKey() {
return String.format("fs.%s.impl.disable.cache", getSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
public class S3Conf extends HadoopConf {
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_SCHEMA = "s3a";
private static final String DEFAULT_SCHEMA = "s3n";
protected static final String S3A_SCHEMA = "s3a";
protected static final String DEFAULT_SCHEMA = "s3n";
private String schema = DEFAULT_SCHEMA;

@Override
Expand All @@ -47,7 +47,7 @@ public void setSchema(String schema) {
this.schema = schema;
}

private S3Conf(String hdfsNameKey) {
protected S3Conf(String hdfsNameKey) {
super(hdfsNameKey);
}

Expand Down Expand Up @@ -80,7 +80,7 @@ public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig)
return buildWithConfig(config);
}

private String switchHdfsImpl() {
protected String switchHdfsImpl() {
switch (this.schema) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
Expand Down
15 changes: 15 additions & 0 deletions seatunnel-connectors-v2/connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-s3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-oss</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-cos</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.Table;

import java.util.HashMap;
import java.util.Map;

public class HiveConfig {
public static final Option<String> TABLE_NAME =
Options.key("table_name")
Expand All @@ -51,6 +54,19 @@ public class HiveConfig {
.noDefaultValue()
.withDescription("The path of hive-site.xml");

public static final Option<Map<String, String>> HADOOP_CONF =
Options.key("hive.hadoop.conf")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("Properties in hadoop conf");

public static final Option<String> HADOOP_CONF_PATH =
Options.key("hive.hadoop.conf-path")
.stringType()
.noDefaultValue()
.withDescription(
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml' files");

public static final String TEXT_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.mapred.TextInputFormat";
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
Expand Down
Loading

0 comments on commit 8a4c01f

Please sign in to comment.