Skip to content

Commit

Permalink
Hive (ORC) support on HdfsToTeradata job type. Other properties (tdch…
Browse files Browse the repository at this point in the history
….other.properties.hocon) now support full override and removal of input parameters. Bug fix on passing hadoop parameter. Whitelist now can be disabled by simply removing it from the config
  • Loading branch information
jinhyukchang committed Jul 29, 2016
1 parent a4674be commit 945f36d
Show file tree
Hide file tree
Showing 23 changed files with 289 additions and 81 deletions.
6 changes: 5 additions & 1 deletion plugins/jobtype/jobtypes/hdfsToTeradata/plugin.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ hadoop-inject.mapreduce.job.user.classpath.first=true
#Default value to drop error table
td.error.table.drop=true

hadoop.config=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true' -Dmapreduce.job.user.classpath.first=true
#Hive token
obtain.hcat.token=true

hadoop.config.jvm=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true'
hadoop.config.1=-Dmapreduce.job.user.classpath.first=true
8 changes: 6 additions & 2 deletions plugins/jobtype/jobtypes/hdfsToTeradata/private.properties
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
jobtype.class=azkaban.jobtype.connectors.teradata.HdfsToTeradataJob
jobtype.classpath=lib/*
jobtype.classpath=lib/*,tdch_hive_lib/*
azkaban.no.user.classpath=true

jobtype.lib.dir=${plugin.dir}/lib
jobtype.tdwallet.jar=${jobtype.lib.dir}/teradata-connector-1.4.2.jar

libjars.home=${plugin.dir}/libjars
libjars=${jobtype.lib.dir}/teradata-connector-1.4.2.jar,${libjars.home}/*

whitelist.file.path=/resources/azkaban/hdfsToTeradata/whitelist.txt
#Separate libjar with hive job type as it makes version conflict. tdch_hive_lib contains cherry picked hive jars as brining all hive jars makes version conflict.
libjars.hive=${libjars},${plugin.dir}/tdch_hive_lib/*

whitelist.file.path=/resources/azkaban/hdfsToTeradata/whitelist.txt
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
3 changes: 2 additions & 1 deletion plugins/jobtype/jobtypes/teradataToHdfs/plugin.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ mr.listener.visualizer=false
hadoop-inject.hadoop-conf.mapreduce.job.user.classpath.first=true
hadoop-inject.mapreduce.job.user.classpath.first=true

hadoop.config=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true' -Dmapreduce.job.user.classpath.first=true
hadoop.config.jvm=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true'
hadoop.config.1=-Dmapreduce.job.user.classpath.first=true
1 change: 1 addition & 0 deletions plugins/jobtype/jobtypes/teradataToHdfs/private.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
jobtype.class=azkaban.jobtype.connectors.teradata.TeradataToHdfsJob
jobtype.classpath=lib/*
azkaban.no.user.classpath=true

jobtype.lib.dir=${plugin.dir}/lib
jobtype.tdwallet.jar=${jobtype.lib.dir}/teradata-connector-1.4.2.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package azkaban.jobtype.connectors.teradata;

import static azkaban.jobtype.connectors.teradata.TdchConstants.*;

import org.apache.log4j.Logger;

import azkaban.utils.Props;

/**
Expand All @@ -27,6 +30,7 @@ public class HdfsToTeradataJob extends TeradataJob {

public HdfsToTeradataJob(String jobid, Props sysProps, Props jobProps, Logger log) {
super(jobid, sysProps, jobProps, log);
getJobProps().put(LIB_JARS_HIVE_KEY, sysProps.get(LIB_JARS_HIVE_KEY));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.util.List;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import azkaban.jobExecutor.AbstractProcessJob;
Expand Down Expand Up @@ -59,24 +61,27 @@ public HdfsToTeradataJobRunnerMain() throws FileNotFoundException, IOException {
}

private HdfsToTeradataJobRunnerMain(Properties jobProps) throws FileNotFoundException, IOException {
this(jobProps,
new Whitelist(new Props(null, jobProps), FileSystem.get(new Configuration())),
new Decryptions());
this(jobProps, new Decryptions());
}

@VisibleForTesting
HdfsToTeradataJobRunnerMain(Properties jobProps, Whitelist whitelist, Decryptions decryptions) throws FileNotFoundException, IOException {
HdfsToTeradataJobRunnerMain(Properties jobProps, Decryptions decryptions) throws FileNotFoundException, IOException {
_logger = JobUtils.initJobLogger();
_jobProps = jobProps;
_logger.info("Job properties: " + jobProps);

String logLevel = jobProps.getProperty(TdchConstants.TDCH_LOG_LEVEL);
if(!StringUtils.isEmpty(logLevel)) {
_logger.setLevel(Level.toLevel(logLevel));
}
_jobProps = jobProps;
Props props = new Props(null, _jobProps);

HadoopConfigurationInjector.injectResources(props);
Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);

if (props.containsKey(Whitelist.WHITE_LIST_FILE_PATH_KEY)) {
whitelist.validateWhitelisted(props);
new Whitelist(props, FileSystem.get(conf)).validateWhitelisted(props);
}

String encryptedCredential = _jobProps.getProperty(TdchConstants.TD_ENCRYPTED_CREDENTIAL_KEY);
Expand All @@ -88,28 +93,37 @@ private HdfsToTeradataJobRunnerMain(Properties jobProps) throws FileNotFoundExce
}

_params = TdchParameters.builder()
.mrParams(_jobProps.getProperty(TdchConstants.HADOOP_CONFIG_KEY))
.libJars(props.getString(TdchConstants.LIB_JARS_KEY))
.mrParams(props.getMapByPrefix(TdchConstants.HADOOP_CONFIG_PREFIX_KEY).values())
.libJars(createLibJarStr(props))
.tdJdbcClassName(TdchConstants.TERADATA_JDBCDRIVER_CLASSNAME)
.teradataHostname(props.getString(TdchConstants.TD_HOSTNAME_KEY))
.fileFormat(_jobProps.getProperty(TdchConstants.HDFS_FILE_FORMAT_KEY))
.fieldSeparator(_jobProps.getProperty(TdchConstants.HDFS_FIELD_SEPARATOR_KEY))
.jobType(TdchConstants.TDCH_JOB_TYPE)
.jobType(props.getString(TdchConstants.TDCH_JOB_TYPE, TdchConstants.DEFAULT_TDCH_JOB_TYPE))
.userName(props.getString(TdchConstants.TD_USERID_KEY))
.credentialName(_jobProps.getProperty(TdchConstants.TD_CREDENTIAL_NAME_KEY))
.password(password)
.avroSchemaPath(_jobProps.getProperty(TdchConstants.AVRO_SCHEMA_PATH_KEY))
.avroSchemaInline(_jobProps.getProperty(TdchConstants.AVRO_SCHEMA_INLINE_KEY))
.sourceHdfsPath(props.getString(TdchConstants.SOURCE_HDFS_PATH_KEY))
.sourceHdfsPath(_jobProps.getProperty(TdchConstants.SOURCE_HDFS_PATH_KEY))
.targetTdTableName(props.getString(TdchConstants.TARGET_TD_TABLE_NAME_KEY))
.errorTdDatabase(_jobProps.getProperty(TdchConstants.ERROR_DB_KEY))
.errorTdTableName(_jobProps.getProperty(TdchConstants.ERROR_TABLE_KEY))
.tdInsertMethod(_jobProps.getProperty(TdchConstants.TD_INSERT_METHOD_KEY))
.numMapper(TdchConstants.DEFAULT_NO_MAPPERS)
.numMapper(props.getInt(TdchConstants.TD_NUM_MAPPERS, TdchConstants.DEFAULT_NO_MAPPERS))
.hiveSourceDatabase(_jobProps.getProperty(TdchConstants.SOURCE_HIVE_DATABASE_NAME_KEY))
.hiveSourceTable(_jobProps.getProperty(TdchConstants.SOURCE_HIVE_TABLE_NAME_KEY))
.hiveConfFile(_jobProps.getProperty(TdchConstants.TDCH_HIVE_CONF_KEY))
.otherProperties(_jobProps.getProperty(TdchConstants.TD_OTHER_PROPERTIES_HOCON_KEY))
.build();
}

private String createLibJarStr(Props props) {
if (TdchConstants.TDCH_HIVE_JOB_TYPE.equals(props.getString(TdchConstants.TDCH_JOB_TYPE, TdchConstants.DEFAULT_TDCH_JOB_TYPE))) {
return props.getString(TdchConstants.LIB_JARS_HIVE_KEY);
}
return props.getString(TdchConstants.LIB_JARS_KEY);
}

public void run() throws IOException, InterruptedException {
String jobName = System.getenv(AbstractProcessJob.JOB_NAME_ENV);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
public interface TdchConstants {
public static final String TERADATA_JDBCDRIVER_CLASSNAME = "com.teradata.jdbc.TeraDriver";
public static final String TD_WALLET_FORMAT = "$tdwallet(%s)";
public static final String TDCH_JOB_TYPE = "hdfs";
public static final String DEFAULT_TDCH_JOB_TYPE = "hdfs";
public static final String TDCH_HIVE_JOB_TYPE = "hive";
public static final String AVRO_FILE_FORMAT = "avrofile";
public static final String LIB_JAR_DELIMITER = ",";
public static final int DEFAULT_NO_MAPPERS = 8;

//Keys for the properties
public static final String TDCH_JOB_TYPE = "tdch.jobtype";
public static final String TDCH_LOG_LEVEL = "tdch.log.level";

public static final String TD_WALLET_JAR = "jobtype.tdwallet.jar";
public static final String LIB_JARS_KEY = "libjars";
public static final String LIB_JARS_HIVE_KEY = "libjars.hive";
public static final String TD_HOSTNAME_KEY = "td.hostname";
public static final String TD_USERID_KEY = "td.userid";
@Deprecated
Expand All @@ -31,6 +36,7 @@ public interface TdchConstants {
public static final String AVRO_SCHEMA_PATH_KEY = "avro.schema.path";
public static final String AVRO_SCHEMA_INLINE_KEY = "avro.schema.inline";

public static final String TD_NUM_MAPPERS = "tdch.num.mappers";
public static final String TD_INSERT_METHOD_KEY = "tdch.insert.method";
public static final String SOURCE_HDFS_PATH_KEY = "source.hdfs.path";
public static final String TARGET_TD_TABLE_NAME_KEY = "target.td.tablename";
Expand All @@ -42,11 +48,16 @@ public interface TdchConstants {
public static final String HDFS_FILE_FORMAT_KEY = "hdfs.fileformat";
public static final String HDFS_FIELD_SEPARATOR_KEY = "hdfs.separator";
public static final String HADOOP_CONFIG_KEY = "hadoop.config";
public static final String HADOOP_CONFIG_PREFIX_KEY = "hadoop.config.";

public static final String TD_RETRIEVE_METHOD_KEY = "tdch.retrieve.method";
public static final String SOURCE_TD_TABLE_NAME_KEY = "source.td.tablename";
public static final String SOURCE_TD_QUERY_NAME_KEY = "source.td.sourcequery";
public static final String TARGET_HDFS_PATH_KEY = "target.hdfs.path";
public static final String TD_OTHER_PROPERTIES_HOCON_KEY = "tdch.other.properties.hocon";
public static final String JOB_OUTPUT_PROPERTIES_KEY = "output.property.keys";

public static final String SOURCE_HIVE_DATABASE_NAME_KEY = "source.hive.databasename";
public static final String SOURCE_HIVE_TABLE_NAME_KEY = "source.hive.tablename";
public static final String TDCH_HIVE_CONF_KEY = "tdch.hive.conf";
}
Loading

0 comments on commit 945f36d

Please sign in to comment.