diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/plugin.properties b/plugins/jobtype/jobtypes/hdfsToTeradata/plugin.properties index f2174328..961c77cb 100644 --- a/plugins/jobtype/jobtypes/hdfsToTeradata/plugin.properties +++ b/plugins/jobtype/jobtypes/hdfsToTeradata/plugin.properties @@ -7,4 +7,8 @@ hadoop-inject.mapreduce.job.user.classpath.first=true #Default value to drop error table td.error.table.drop=true -hadoop.config=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true' -Dmapreduce.job.user.classpath.first=true \ No newline at end of file +#Hive token +obtain.hcat.token=true + +hadoop.config.jvm=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true' +hadoop.config.1=-Dmapreduce.job.user.classpath.first=true \ No newline at end of file diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/private.properties b/plugins/jobtype/jobtypes/hdfsToTeradata/private.properties index e91729ef..04e3edde 100644 --- a/plugins/jobtype/jobtypes/hdfsToTeradata/private.properties +++ b/plugins/jobtype/jobtypes/hdfsToTeradata/private.properties @@ -1,5 +1,6 @@ jobtype.class=azkaban.jobtype.connectors.teradata.HdfsToTeradataJob -jobtype.classpath=lib/* +jobtype.classpath=lib/*,tdch_hive_lib/* +azkaban.no.user.classpath=true jobtype.lib.dir=${plugin.dir}/lib jobtype.tdwallet.jar=${jobtype.lib.dir}/teradata-connector-1.4.2.jar @@ -7,4 +8,7 @@ jobtype.tdwallet.jar=${jobtype.lib.dir}/teradata-connector-1.4.2.jar libjars.home=${plugin.dir}/libjars libjars=${jobtype.lib.dir}/teradata-connector-1.4.2.jar,${libjars.home}/* -whitelist.file.path=/resources/azkaban/hdfsToTeradata/whitelist.txt \ No newline at end of file +#Separate libjar with hive job type as it makes version conflict. tdch_hive_lib contains cherry picked hive jars as brining all hive jars makes version conflict. +libjars.hive=${libjars},${plugin.dir}/tdch_hive_lib/* + +whitelist.file.path=/resources/azkaban/hdfsToTeradata/whitelist.txt diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/antlr-runtime-3.4.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/antlr-runtime-3.4.jar new file mode 100644 index 00000000..865a537b Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/antlr-runtime-3.4.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/commons-dbcp-1.4.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/commons-dbcp-1.4.jar new file mode 100644 index 00000000..c4c1c4f2 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/commons-dbcp-1.4.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-api-jdo-3.2.6.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-api-jdo-3.2.6.jar new file mode 100644 index 00000000..8c98aca1 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-api-jdo-3.2.6.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-core-3.2.10.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-core-3.2.10.jar new file mode 100644 index 00000000..7078bce0 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-core-3.2.10.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-rdbms-3.2.9.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-rdbms-3.2.9.jar new file mode 100644 index 00000000..a23c982f Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/datanucleus-rdbms-3.2.9.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/derby-10.12.1.1.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/derby-10.12.1.1.jar new file mode 100644 index 00000000..5841555f Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/derby-10.12.1.1.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-cli-0.13.1.62.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-cli-0.13.1.62.jar new file mode 100644 index 00000000..3a85c6fa Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-cli-0.13.1.62.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-exec-0.13.1.62.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-exec-0.13.1.62.jar new file mode 100644 index 00000000..c2607762 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-exec-0.13.1.62.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-metastore-0.13.1.62.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-metastore-0.13.1.62.jar new file mode 100644 index 00000000..20f45273 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/hive-metastore-0.13.1.62.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/jdo-api-3.0.1.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/jdo-api-3.0.1.jar new file mode 100644 index 00000000..6318d329 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/jdo-api-3.0.1.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/libfb303-0.9.0.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/libfb303-0.9.0.jar new file mode 100644 index 00000000..abccf347 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/libfb303-0.9.0.jar differ diff --git a/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/libthrift-0.9.0.jar b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/libthrift-0.9.0.jar new file mode 100644 index 00000000..58d16357 Binary files /dev/null and b/plugins/jobtype/jobtypes/hdfsToTeradata/tdch_hive_lib/libthrift-0.9.0.jar differ diff --git a/plugins/jobtype/jobtypes/teradataToHdfs/plugin.properties b/plugins/jobtype/jobtypes/teradataToHdfs/plugin.properties index 2d69a3ee..12ccb79f 100644 --- a/plugins/jobtype/jobtypes/teradataToHdfs/plugin.properties +++ b/plugins/jobtype/jobtypes/teradataToHdfs/plugin.properties @@ -4,4 +4,5 @@ mr.listener.visualizer=false hadoop-inject.hadoop-conf.mapreduce.job.user.classpath.first=true hadoop-inject.mapreduce.job.user.classpath.first=true -hadoop.config=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true' -Dmapreduce.job.user.classpath.first=true \ No newline at end of file +hadoop.config.jvm=-Dmapreduce.map.child.java.opts='-Xmx1G -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true' +hadoop.config.1=-Dmapreduce.job.user.classpath.first=true \ No newline at end of file diff --git a/plugins/jobtype/jobtypes/teradataToHdfs/private.properties b/plugins/jobtype/jobtypes/teradataToHdfs/private.properties index 88ab0af9..627cb06a 100644 --- a/plugins/jobtype/jobtypes/teradataToHdfs/private.properties +++ b/plugins/jobtype/jobtypes/teradataToHdfs/private.properties @@ -1,5 +1,6 @@ jobtype.class=azkaban.jobtype.connectors.teradata.TeradataToHdfsJob jobtype.classpath=lib/* +azkaban.no.user.classpath=true jobtype.lib.dir=${plugin.dir}/lib jobtype.tdwallet.jar=${jobtype.lib.dir}/teradata-connector-1.4.2.jar diff --git a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJob.java b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJob.java index f54a83d0..d9dd9beb 100644 --- a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJob.java +++ b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJob.java @@ -16,7 +16,10 @@ package azkaban.jobtype.connectors.teradata; +import static azkaban.jobtype.connectors.teradata.TdchConstants.*; + import org.apache.log4j.Logger; + import azkaban.utils.Props; /** @@ -27,6 +30,7 @@ public class HdfsToTeradataJob extends TeradataJob { public HdfsToTeradataJob(String jobid, Props sysProps, Props jobProps, Logger log) { super(jobid, sysProps, jobProps, log); + getJobProps().put(LIB_JARS_HIVE_KEY, sysProps.get(LIB_JARS_HIVE_KEY)); } @Override diff --git a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJobRunnerMain.java b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJobRunnerMain.java index ec5a491b..ca35c5d4 100644 --- a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJobRunnerMain.java +++ b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/HdfsToTeradataJobRunnerMain.java @@ -25,9 +25,11 @@ import java.util.List; import java.util.Properties; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import azkaban.jobExecutor.AbstractProcessJob; @@ -59,16 +61,19 @@ public HdfsToTeradataJobRunnerMain() throws FileNotFoundException, IOException { } private HdfsToTeradataJobRunnerMain(Properties jobProps) throws FileNotFoundException, IOException { - this(jobProps, - new Whitelist(new Props(null, jobProps), FileSystem.get(new Configuration())), - new Decryptions()); + this(jobProps, new Decryptions()); } @VisibleForTesting - HdfsToTeradataJobRunnerMain(Properties jobProps, Whitelist whitelist, Decryptions decryptions) throws FileNotFoundException, IOException { + HdfsToTeradataJobRunnerMain(Properties jobProps, Decryptions decryptions) throws FileNotFoundException, IOException { _logger = JobUtils.initJobLogger(); - _jobProps = jobProps; + _logger.info("Job properties: " + jobProps); + String logLevel = jobProps.getProperty(TdchConstants.TDCH_LOG_LEVEL); + if(!StringUtils.isEmpty(logLevel)) { + _logger.setLevel(Level.toLevel(logLevel)); + } + _jobProps = jobProps; Props props = new Props(null, _jobProps); HadoopConfigurationInjector.injectResources(props); @@ -76,7 +81,7 @@ private HdfsToTeradataJobRunnerMain(Properties jobProps) throws FileNotFoundExce UserGroupInformation.setConfiguration(conf); if (props.containsKey(Whitelist.WHITE_LIST_FILE_PATH_KEY)) { - whitelist.validateWhitelisted(props); + new Whitelist(props, FileSystem.get(conf)).validateWhitelisted(props); } String encryptedCredential = _jobProps.getProperty(TdchConstants.TD_ENCRYPTED_CREDENTIAL_KEY); @@ -88,28 +93,37 @@ private HdfsToTeradataJobRunnerMain(Properties jobProps) throws FileNotFoundExce } _params = TdchParameters.builder() - .mrParams(_jobProps.getProperty(TdchConstants.HADOOP_CONFIG_KEY)) - .libJars(props.getString(TdchConstants.LIB_JARS_KEY)) + .mrParams(props.getMapByPrefix(TdchConstants.HADOOP_CONFIG_PREFIX_KEY).values()) + .libJars(createLibJarStr(props)) .tdJdbcClassName(TdchConstants.TERADATA_JDBCDRIVER_CLASSNAME) .teradataHostname(props.getString(TdchConstants.TD_HOSTNAME_KEY)) .fileFormat(_jobProps.getProperty(TdchConstants.HDFS_FILE_FORMAT_KEY)) .fieldSeparator(_jobProps.getProperty(TdchConstants.HDFS_FIELD_SEPARATOR_KEY)) - .jobType(TdchConstants.TDCH_JOB_TYPE) + .jobType(props.getString(TdchConstants.TDCH_JOB_TYPE, TdchConstants.DEFAULT_TDCH_JOB_TYPE)) .userName(props.getString(TdchConstants.TD_USERID_KEY)) .credentialName(_jobProps.getProperty(TdchConstants.TD_CREDENTIAL_NAME_KEY)) .password(password) .avroSchemaPath(_jobProps.getProperty(TdchConstants.AVRO_SCHEMA_PATH_KEY)) .avroSchemaInline(_jobProps.getProperty(TdchConstants.AVRO_SCHEMA_INLINE_KEY)) - .sourceHdfsPath(props.getString(TdchConstants.SOURCE_HDFS_PATH_KEY)) + .sourceHdfsPath(_jobProps.getProperty(TdchConstants.SOURCE_HDFS_PATH_KEY)) .targetTdTableName(props.getString(TdchConstants.TARGET_TD_TABLE_NAME_KEY)) .errorTdDatabase(_jobProps.getProperty(TdchConstants.ERROR_DB_KEY)) .errorTdTableName(_jobProps.getProperty(TdchConstants.ERROR_TABLE_KEY)) .tdInsertMethod(_jobProps.getProperty(TdchConstants.TD_INSERT_METHOD_KEY)) - .numMapper(TdchConstants.DEFAULT_NO_MAPPERS) + .numMapper(props.getInt(TdchConstants.TD_NUM_MAPPERS, TdchConstants.DEFAULT_NO_MAPPERS)) + .hiveSourceDatabase(_jobProps.getProperty(TdchConstants.SOURCE_HIVE_DATABASE_NAME_KEY)) + .hiveSourceTable(_jobProps.getProperty(TdchConstants.SOURCE_HIVE_TABLE_NAME_KEY)) + .hiveConfFile(_jobProps.getProperty(TdchConstants.TDCH_HIVE_CONF_KEY)) .otherProperties(_jobProps.getProperty(TdchConstants.TD_OTHER_PROPERTIES_HOCON_KEY)) .build(); } + private String createLibJarStr(Props props) { + if (TdchConstants.TDCH_HIVE_JOB_TYPE.equals(props.getString(TdchConstants.TDCH_JOB_TYPE, TdchConstants.DEFAULT_TDCH_JOB_TYPE))) { + return props.getString(TdchConstants.LIB_JARS_HIVE_KEY); + } + return props.getString(TdchConstants.LIB_JARS_KEY); + } public void run() throws IOException, InterruptedException { String jobName = System.getenv(AbstractProcessJob.JOB_NAME_ENV); diff --git a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchConstants.java b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchConstants.java index 05fd0897..4c6328a4 100644 --- a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchConstants.java +++ b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchConstants.java @@ -14,14 +14,19 @@ public interface TdchConstants { public static final String TERADATA_JDBCDRIVER_CLASSNAME = "com.teradata.jdbc.TeraDriver"; public static final String TD_WALLET_FORMAT = "$tdwallet(%s)"; - public static final String TDCH_JOB_TYPE = "hdfs"; + public static final String DEFAULT_TDCH_JOB_TYPE = "hdfs"; + public static final String TDCH_HIVE_JOB_TYPE = "hive"; public static final String AVRO_FILE_FORMAT = "avrofile"; public static final String LIB_JAR_DELIMITER = ","; public static final int DEFAULT_NO_MAPPERS = 8; //Keys for the properties + public static final String TDCH_JOB_TYPE = "tdch.jobtype"; + public static final String TDCH_LOG_LEVEL = "tdch.log.level"; + public static final String TD_WALLET_JAR = "jobtype.tdwallet.jar"; public static final String LIB_JARS_KEY = "libjars"; + public static final String LIB_JARS_HIVE_KEY = "libjars.hive"; public static final String TD_HOSTNAME_KEY = "td.hostname"; public static final String TD_USERID_KEY = "td.userid"; @Deprecated @@ -31,6 +36,7 @@ public interface TdchConstants { public static final String AVRO_SCHEMA_PATH_KEY = "avro.schema.path"; public static final String AVRO_SCHEMA_INLINE_KEY = "avro.schema.inline"; + public static final String TD_NUM_MAPPERS = "tdch.num.mappers"; public static final String TD_INSERT_METHOD_KEY = "tdch.insert.method"; public static final String SOURCE_HDFS_PATH_KEY = "source.hdfs.path"; public static final String TARGET_TD_TABLE_NAME_KEY = "target.td.tablename"; @@ -42,6 +48,7 @@ public interface TdchConstants { public static final String HDFS_FILE_FORMAT_KEY = "hdfs.fileformat"; public static final String HDFS_FIELD_SEPARATOR_KEY = "hdfs.separator"; public static final String HADOOP_CONFIG_KEY = "hadoop.config"; + public static final String HADOOP_CONFIG_PREFIX_KEY = "hadoop.config."; public static final String TD_RETRIEVE_METHOD_KEY = "tdch.retrieve.method"; public static final String SOURCE_TD_TABLE_NAME_KEY = "source.td.tablename"; @@ -49,4 +56,8 @@ public interface TdchConstants { public static final String TARGET_HDFS_PATH_KEY = "target.hdfs.path"; public static final String TD_OTHER_PROPERTIES_HOCON_KEY = "tdch.other.properties.hocon"; public static final String JOB_OUTPUT_PROPERTIES_KEY = "output.property.keys"; + + public static final String SOURCE_HIVE_DATABASE_NAME_KEY = "source.hive.databasename"; + public static final String SOURCE_HIVE_TABLE_NAME_KEY = "source.hive.tablename"; + public static final String TDCH_HIVE_CONF_KEY = "tdch.hive.conf"; } \ No newline at end of file diff --git a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchParameters.java b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchParameters.java index e49a3dbd..29d05c80 100644 --- a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchParameters.java +++ b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TdchParameters.java @@ -42,7 +42,7 @@ public class TdchParameters { private static final String DEFAULT_RETRIEVE_METHOD = "split.by.amp"; private static final int ERROR_TABLE_NAME_LENGTH_LIMIT = 24; - private final String _mrParams; + private final List _mrParams; private final String _libJars; private final String _tdJdbcClassName; private final String _tdUrl; @@ -64,6 +64,10 @@ public class TdchParameters { private final String _targetTdTableName; private final Optional _targetTdDatabaseName; + private final String _hiveSourceDatabase; + private final String _hiveSourceTable; + private final Optional _hiveConfFile; + private final Optional _tdErrorDatabase; private final Optional _tdErrorTableName; private final Optional _tdInsertMethod; @@ -109,15 +113,20 @@ private TdchParameters(Builder builder) { this._sourceTdTableName = Optional.fromNullable(builder._sourceTdTableName); this._targetHdfsPath = builder._targetHdfsPath; this._tdRetrieveMethod = Optional.fromNullable(builder._tdRetrieveMethod); + + this._hiveSourceDatabase = builder._hiveSourceDatabase; + this._hiveSourceTable = builder._hiveSourceTable; + this._hiveConfFile = Optional.fromNullable(builder._hiveconfFile); } private enum TdchType { HDFS_TO_TERADATA, + HIVE_TO_TERADATA, TERADATA_TO_HDFS } public static class Builder { - private String _mrParams; + private List _mrParams; private String _libJars; private String _tdJdbcClassName; private String _tdHostName; @@ -148,8 +157,12 @@ public static class Builder { private String _targetHdfsPath; private String _tdRetrieveMethod; - public Builder mrParams(String mrParams) { - this._mrParams = mrParams; + private String _hiveSourceDatabase; + private String _hiveSourceTable; + private String _hiveconfFile; + + public Builder mrParams(Collection mrParams) { + this._mrParams = ImmutableList.builder().addAll(mrParams).build(); return this; } @@ -227,6 +240,11 @@ public Builder numMapper(int numMappers) { /** * Takes HOCON notation: https://github.com/typesafehub/config/blob/master/HOCON.md + * To override TDCH parameter, add "key1=value1,key2=value2" at hoconInput where key should be the one + * supported by TDCH itself. + * To remove TDCH parameter, add key="" into hoconInput. + * + * Override and removal provides total control the final input parameter that TDCH receives. * * @param hoconInput * @return @@ -281,14 +299,34 @@ public Builder tdRetrieveMethod(String tdRetrieveMethod) { return this; } + public Builder hiveSourceDatabase(String hiveSourceDatabase) { + this._hiveSourceDatabase = hiveSourceDatabase; + return this; + } + + public Builder hiveSourceTable(String hiveSourceTable) { + this._hiveSourceTable = hiveSourceTable; + return this; + } + + public Builder hiveConfFile(String hiveConfFile) { + this._hiveconfFile = hiveConfFile; + return this; + } + public TdchParameters build() { validate(); - if (TdchType.HDFS_TO_TERADATA.equals(_tdchType)) { + if (TdchType.HDFS_TO_TERADATA.equals(_tdchType) + || TdchType.HIVE_TO_TERADATA.equals(_tdchType)) { assignErrorTbl(); } return new TdchParameters(this); } + /** + * Unless error table name is specified, it will use target table name as prefix of error table where TDCH will add suffix into it. + * If target table name is too long, it will leave error table as blank so that TDCH will choose random error table name. + */ private void assignErrorTbl() { if (!StringUtils.isEmpty(_tdErrorTableName)) { return; @@ -345,6 +383,36 @@ private void validate() { String charSet = StringUtils.isEmpty(_tdCharSet) ? DEFAULT_CHARSET : _tdCharSet; _tdUrl = TERADATA_JDBC_URL_PREFIX + _tdHostName + TERADATA_JDBC_URL_CHARSET_KEY + charSet; + validateJobtype(); + + if (!StringUtils.isEmpty(_tdErrorTableName)) { + Preconditions.checkArgument(!new DatabaseTable(_tdErrorTableName).database.isPresent(), + "Error table name cannot have database prefix. Use " + TdchConstants.ERROR_DB_KEY); + Preconditions.checkArgument(_tdErrorTableName.length() <= ERROR_TABLE_NAME_LENGTH_LIMIT, + "Error table name cannot exceed " + ERROR_TABLE_NAME_LENGTH_LIMIT + " chracters."); + } + } + + private void validateJobtype() { + if ("hdfs".equals(_jobType)) { + validateHdfsJobtype(); + } else if ("hive".equals(_jobType)) { + validateHiveJobtype(); + } else { + throw new IllegalArgumentException("Job type " + _jobType + " is not supported"); + } + + } + + private void validateHiveJobtype() { + ValidationUtils.validateNotEmpty(_targetTdTableName, "targetTdTableName"); + ValidationUtils.validateNotEmpty(_hiveSourceDatabase, "hiveSourceDatabase"); + ValidationUtils.validateNotEmpty(_hiveSourceTable, "hiveSourceTable"); + Preconditions.checkArgument(StringUtils.isEmpty(_sourceHdfsPath), "sourceHdfsPath should be empty for hive job."); + _tdchType = TdchType.HIVE_TO_TERADATA; + } + + private void validateHdfsJobtype() { boolean isHdfsToTd = !StringUtils.isEmpty(_sourceHdfsPath) && !StringUtils.isEmpty(_targetTdTableName); boolean isTdToHdfs = !(StringUtils.isEmpty(_sourceTdTableName) && StringUtils.isEmpty(_sourceQuery)) && !StringUtils.isEmpty(_targetHdfsPath); @@ -373,20 +441,13 @@ private void validate() { } else { _tdchType = TdchType.TERADATA_TO_HDFS; } - - if (!StringUtils.isEmpty(_tdErrorTableName)) { - Preconditions.checkArgument(!new DatabaseTable(_tdErrorTableName).database.isPresent(), - "Error table name cannot have database prefix. Use " + TdchConstants.ERROR_DB_KEY); - Preconditions.checkArgument(_tdErrorTableName.length() <= ERROR_TABLE_NAME_LENGTH_LIMIT, - "Error table name cannot exceed " + ERROR_TABLE_NAME_LENGTH_LIMIT + " chracters."); - } } } public String[] toTdchParams() { ImmutableList.Builder listBuilder = ImmutableList.builder(); - if(!StringUtils.isEmpty(_mrParams)) { - listBuilder.add(_mrParams); + if(_mrParams != null && !_mrParams.isEmpty()) { + listBuilder.addAll(_mrParams); } Map keyValParams = buildKeyValParams(); @@ -432,9 +493,7 @@ private Map buildKeyValParams() { map.put("-separator", _fieldSeparator.get()); } - if(TdchType.HDFS_TO_TERADATA.equals(_tdchType)) { - map.put("-sourcepaths", _sourceHdfsPath); - + if(TdchType.HDFS_TO_TERADATA.equals(_tdchType) || TdchType.HIVE_TO_TERADATA.equals(_tdchType)) { if(_targetTdDatabaseName.isPresent()) { map.put("-targettable", String.format(TeradataCommands.DATABASE_TABLE_FORMAT, _targetTdDatabaseName.get(), _targetTdTableName)); @@ -453,6 +512,16 @@ private Map buildKeyValParams() { if(_tdInsertMethod.isPresent()) { map.put("-method", _tdInsertMethod.get()); } + + if (TdchType.HDFS_TO_TERADATA.equals(_tdchType)) { + map.put("-sourcepaths", _sourceHdfsPath); + } else { + map.put("-sourcedatabase", _hiveSourceDatabase); + map.put("-sourcetable", _hiveSourceTable); + if (_hiveConfFile.isPresent()) { + map.put("-hiveconf", _hiveConfFile.get()); + } + } } else if (TdchType.TERADATA_TO_HDFS.equals(_tdchType)){ map.put("-targetpaths",_targetHdfsPath); @@ -484,12 +553,14 @@ private Map buildKeyValParams() { Map.Entry entry = it.next(); String key = "-" + entry.getKey(); if (map.containsKey(key)) { - _logger.warn("Duplicate entry detected on key: " + entry.getKey() - + " . Skipping value from duplicate: " + entry.getValue().asText() - + " . Proceed with existing value: " + map.get(key)); - continue; + _logger.info("Duplicate entry detected on key: " + entry.getKey() + + " . Overwriting value from " + map.get(key) + " to " + entry.getValue().asText()); + } + if (StringUtils.isEmpty(entry.getValue().asText())) { + map.remove(key); + } else { + map.put(key, entry.getValue().asText()); } - map.put(key, entry.getValue().asText()); } } catch (Exception e) { throw new RuntimeException(e); @@ -570,6 +641,9 @@ public String toString() { .append(", _sourceTdTableName=").append(_sourceTdTableName) .append(", _tdRetrieveMethod=").append(_tdRetrieveMethod) .append(", _targetHdfsPath=").append(_targetHdfsPath) + .append(", _hiveSourceDatabase=").append(_hiveSourceDatabase) + .append(", _hiveSourceTable=").append(_hiveSourceTable) + .append(", _hiveConfFile=").append(_hiveConfFile) .append(", _otherProperties").append(_otherProperties) .append("]"); return builder.toString(); diff --git a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TeradataToHdfsJobRunnerMain.java b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TeradataToHdfsJobRunnerMain.java index 5c305e23..06c0a85e 100644 --- a/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TeradataToHdfsJobRunnerMain.java +++ b/plugins/jobtype/src/azkaban/jobtype/connectors/teradata/TeradataToHdfsJobRunnerMain.java @@ -20,11 +20,14 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Properties; + +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import azkaban.utils.Props; @@ -34,6 +37,7 @@ import azkaban.jobtype.javautils.JobUtils; import azkaban.jobtype.javautils.Whitelist; +import com.google.common.annotations.VisibleForTesting; import com.teradata.hadoop.tool.TeradataImportTool; import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION; @@ -44,8 +48,24 @@ public class TeradataToHdfsJobRunnerMain { private final Logger _logger; public TeradataToHdfsJobRunnerMain() throws FileNotFoundException, IOException { + this(HadoopSecureWrapperUtils.loadAzkabanProps()); + } + + private TeradataToHdfsJobRunnerMain(Properties jobProps) throws FileNotFoundException, IOException { + this(jobProps, new Decryptions()); + } + + @VisibleForTesting + TeradataToHdfsJobRunnerMain(Properties jobProps, Decryptions decryptions) throws FileNotFoundException, IOException { _logger = JobUtils.initJobLogger(); - _jobProps = HadoopSecureWrapperUtils.loadAzkabanProps(); + _logger.info("Job properties: " + jobProps); + + _jobProps = jobProps; + + String logLevel = jobProps.getProperty(TdchConstants.TDCH_LOG_LEVEL); + if(!StringUtils.isEmpty(logLevel)) { + _logger.setLevel(Level.toLevel(logLevel)); + } Props props = new Props(null, _jobProps); HadoopConfigurationInjector.injectResources(props); @@ -63,13 +83,13 @@ public TeradataToHdfsJobRunnerMain() throws FileNotFoundException, IOException { } _params = TdchParameters.builder() - .mrParams(_jobProps.getProperty(TdchConstants.HADOOP_CONFIG_KEY)) + .mrParams(props.getMapByPrefix(TdchConstants.HADOOP_CONFIG_PREFIX_KEY).values()) .libJars(props.getString(TdchConstants.LIB_JARS_KEY)) .tdJdbcClassName(TdchConstants.TERADATA_JDBCDRIVER_CLASSNAME) .teradataHostname(props.getString(TdchConstants.TD_HOSTNAME_KEY)) .fileFormat(_jobProps.getProperty(TdchConstants.HDFS_FILE_FORMAT_KEY)) .fieldSeparator(_jobProps.getProperty(TdchConstants.HDFS_FIELD_SEPARATOR_KEY)) - .jobType(TdchConstants.TDCH_JOB_TYPE) + .jobType(props.getString(TdchConstants.TDCH_JOB_TYPE, TdchConstants.DEFAULT_TDCH_JOB_TYPE)) .userName(props.getString(TdchConstants.TD_USERID_KEY)) .credentialName(_jobProps.getProperty(TdchConstants.TD_CREDENTIAL_NAME_KEY)) .password(password) diff --git a/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestHdfsToTeradata.java b/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestHdfsToTeradata.java index dc7290fc..ca1fafc1 100644 --- a/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestHdfsToTeradata.java +++ b/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestHdfsToTeradata.java @@ -28,12 +28,10 @@ import azkaban.crypto.Decryptions; import azkaban.jobtype.connectors.jdbc.JdbcCommands; -import azkaban.jobtype.javautils.Whitelist; public class TestHdfsToTeradata { private Properties properties; - private Whitelist whitelist; private Decryptions decryptions; @Before @@ -47,7 +45,6 @@ public void initialize() throws IOException { properties.put(SOURCE_HDFS_PATH_KEY, "test"); properties.put(LIB_JARS_KEY, "test"); - whitelist = mock(Whitelist.class); decryptions = mock(Decryptions.class); when(decryptions.decrypt(any(), any(), any())).thenReturn("password"); } @@ -57,7 +54,7 @@ public void testDropErrorTbl() throws FileNotFoundException, IOException, Interr properties.put(TARGET_TD_TABLE_NAME_KEY, "db.target_table"); properties.put(DROP_ERROR_TABLE_KEY, Boolean.toString(true)); - HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, whitelist, decryptions)); + HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, decryptions)); Connection conn = mock(Connection.class); doReturn(conn).when(job).newConnection(); @@ -84,7 +81,7 @@ public void skipDropErrorTbl() throws FileNotFoundException, IOException, Interr properties.put(TARGET_TD_TABLE_NAME_KEY, "db.target_table"); properties.put(DROP_ERROR_TABLE_KEY, Boolean.toString(false)); - HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, whitelist, decryptions)); + HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, decryptions)); Connection conn = mock(Connection.class); doReturn(conn).when(job).newConnection(); @@ -109,7 +106,7 @@ public void errorTblNotExist() throws FileNotFoundException, IOException, Interr properties.put(TARGET_TD_TABLE_NAME_KEY, "db.target_table"); properties.put(DROP_ERROR_TABLE_KEY, Boolean.toString(true)); - HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, whitelist, decryptions)); + HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, decryptions)); Connection conn = mock(Connection.class); doReturn(conn).when(job).newConnection(); @@ -134,7 +131,7 @@ public void truncateTable() throws FileNotFoundException, IOException, Interrupt properties.put(TARGET_TD_TABLE_NAME_KEY, "db.target_table"); properties.put(REPLACE_TARGET_TABLE_KEY, Boolean.toString(true)); - HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, whitelist, decryptions)); + HdfsToTeradataJobRunnerMain job = spy(new HdfsToTeradataJobRunnerMain(properties, decryptions)); Connection conn = mock(Connection.class); doReturn(conn).when(job).newConnection(); diff --git a/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestTdchParameters.java b/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestTdchParameters.java index 5c87accd..321bba9a 100644 --- a/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestTdchParameters.java +++ b/plugins/jobtype/test/azkaban/jobtype/connectors/teradata/TestTdchParameters.java @@ -14,16 +14,22 @@ import static azkaban.jobtype.connectors.teradata.TdchConstants.*; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Set; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import azkaban.jobtype.connectors.teradata.TdchParameters.Builder; +//@Ignore public class TestTdchParameters { private static String PASSWORD = "password"; private Builder builder; @@ -31,11 +37,11 @@ public class TestTdchParameters { @Before public void setup() { builder = TdchParameters.builder() - .mrParams("mrParams") + .mrParams(Collections.emptyList()) .tdJdbcClassName(TdchConstants.TERADATA_JDBCDRIVER_CLASSNAME) .teradataHostname("teradataHostname") .fileFormat(AVRO_FILE_FORMAT) - .jobType(TdchConstants.TDCH_JOB_TYPE) + .jobType(TdchConstants.DEFAULT_TDCH_JOB_TYPE) .userName("userName") .avroSchemaPath("avroSchemaPath") .sourceHdfsPath("sourceHdfsPath") @@ -51,36 +57,80 @@ public void testToTdchParam() { List expected = getExpectedTdchParams(); - Assert.assertEquals(expected, Arrays.asList(params.toTdchParams())); + assertEqual(expected, Arrays.asList(params.toTdchParams())); } - private ImmutableList getExpectedTdchParams() { - ImmutableList expected = ImmutableList.builder() - .add("mrParams") - .add("-url") - .add("jdbc:teradata://teradataHostname/CHARSET=UTF8") - .add("-classname") - .add("com.teradata.jdbc.TeraDriver") - .add("-fileformat") - .add("avrofile") - .add("-jobtype") - .add("hdfs") - .add("-username") - .add("userName") - .add("-nummappers") - .add(Integer.toString(DEFAULT_NO_MAPPERS)) - .add("-password") - .add("password") - .add("-avroschemafile") - .add("avroSchemaPath") - .add("-sourcepaths") - .add("sourceHdfsPath") - .add("-targettable") - .add("db.target") - .add("-errortablename") - .add("target") - .build(); + @Test + public void testHiveToTdchParam() { + builder = TdchParameters.builder() + .mrParams(Collections.emptyList()) + .tdJdbcClassName(TdchConstants.TERADATA_JDBCDRIVER_CLASSNAME) + .teradataHostname("teradataHostname") + .fileFormat("orcfile") + .jobType(TdchConstants.TDCH_HIVE_JOB_TYPE) + .userName("userName") + .hiveSourceDatabase("hive_database") + .hiveSourceTable("hive_table") + .hiveConfFile("hive_conf") + .numMapper(TdchConstants.DEFAULT_NO_MAPPERS); + + String targetTableName = "db.target"; + TdchParameters params = builder.targetTdTableName(targetTableName) + .password(PASSWORD) + .build(); + + List expected = getExpectedTdchParams("-avroschemafile", "-sourcepaths", "-jobtype", "-fileformat"); + expected.add("-sourcedatabase"); + expected.add("hive_database"); + expected.add("-sourcetable"); + expected.add("hive_table"); + expected.add("-hiveconf"); + expected.add("hive_conf"); + expected.add("-jobtype"); + expected.add("hive"); + expected.add("-fileformat"); + expected.add("orcfile"); + assertEqual(expected, Arrays.asList(params.toTdchParams())); + } + + private List getExpectedTdchParams(String... exceptKeys) { + List expected = Lists.newArrayList("-url" , + "jdbc:teradata://teradataHostname/CHARSET=UTF8", + "-classname", + "com.teradata.jdbc.TeraDriver", + "-fileformat", + "avrofile", + "-jobtype", + "hdfs", + "-username", + "userName", + "-nummappers", + Integer.toString(DEFAULT_NO_MAPPERS), + "-password", + "password", + "-avroschemafile", + "avroSchemaPath", + "-sourcepaths", + "sourceHdfsPath", + "-targettable", + "db.target", + "-errortablename", + "target"); + + if (exceptKeys != null) { + Set removalKeys = Sets.newHashSet(exceptKeys); + Iterator it = expected.iterator(); + expected = Lists.newArrayList(); + while (it.hasNext()) { + String s = it.next(); + if (removalKeys.contains(s)) { + it.next(); + continue; + } + expected.add(s); + } + } return expected; } @@ -94,13 +144,15 @@ public void testOtherParams() { .otherProperties(otherParams) .build(); - ImmutableList expected = getExpectedTdchParams(); + List expected = getExpectedTdchParams("-nummappers"); expected = ImmutableList.builder().addAll(expected) .add("-testKey1") .add("testVal1") + .add("-nummappers") + .add("24") .build(); - Assert.assertEquals(expected, Arrays.asList(params.toTdchParams())); + assertEqual(expected, Arrays.asList(params.toTdchParams())); } @Test @@ -113,7 +165,28 @@ public void testOtherParamsWithDuplicateKey() { .otherProperties(otherParams) .build(); - ImmutableList expected = getExpectedTdchParams(); + List expected = getExpectedTdchParams(); + expected = ImmutableList.builder().addAll(expected) + .add("-testKey1") + .add("testVal1") + .add("-testKey2") + .add("testVal2") + .build(); + + assertEqual(expected, Arrays.asList(params.toTdchParams())); + } + + @Test + public void testOtherParamsForRemoval() { + String targetTableName = "db.target"; + String otherParams = "testKey1=testVal1,testKey2=testVal2,errortablename=\"\""; + + TdchParameters params = builder.targetTdTableName(targetTableName) + .password(PASSWORD) + .otherProperties(otherParams) + .build(); + + List expected = getExpectedTdchParams("-errortablename"); expected = ImmutableList.builder().addAll(expected) .add("-testKey1") .add("testVal1") @@ -121,7 +194,7 @@ public void testOtherParamsWithDuplicateKey() { .add("testVal2") .build(); - Assert.assertEquals(expected, Arrays.asList(params.toTdchParams())); + assertEqual(expected, Arrays.asList(params.toTdchParams())); } @Test @@ -189,4 +262,9 @@ public void testFailWithMultipleCredentials() { Assert.assertEquals("Please use either credential name or password, not all of them.", e.getMessage()); } } + + private void assertEqual(List expected, List actual) { + Assert.assertTrue("Should not be null. Expected: " + expected + " , actual: " + actual, expected != null && actual != null); + Assert.assertTrue("Expected: " + expected + " , actual: " + actual, expected.size() == actual.size() && expected.containsAll(actual)); + } }