Skip to content

Commit

Permalink
KYLIN-3363 fix wrong partition condition appended in JDBC Source
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongsjtu committed May 27, 2018
1 parent e20e244 commit faf7064
Showing 1 changed file with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
import org.apache.kylin.metadata.model.TableRef;
Expand All @@ -53,7 +54,7 @@ public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSid
public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
super(flatDesc);
}

private KylinConfig getConfig() {
return flatDesc.getDataModel().getConfig();
}
Expand Down Expand Up @@ -140,20 +141,19 @@ private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, Strin
KylinConfig config = getConfig();
PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
String partCol = null;
String partitionString = null;

if (partitionDesc.isPartitioned()) {
partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
flatDesc.getSegment(), flatDesc.getSegRange());
}

String splitTable;
String splitTableAlias;
String splitColumn;
String splitDatabase;
TblColRef splitColRef = determineSplitColumn();
splitTable = splitColRef.getTableRef().getTableName();
splitColumn = splitColRef.getName();
splitTableAlias = splitColRef.getTableAlias();
splitColumn = splitColRef.getExpressionInSourceDB();
splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();

//using sqoop to extract data from jdbc source and dump them to hive
Expand All @@ -167,22 +167,29 @@ private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, Strin
String filedDelimiter = config.getJdbcSourceFieldDelimiter();
int mapperNum = config.getSqoopMapperNum();

String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase,
splitTable);
if (partitionString != null) {
bquery += " WHERE " + partitionString;
String bquery = String.format("SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn, splitColumn,
splitDatabase, splitTable, splitTableAlias);
if (partitionDesc.isPartitioned()) {
SegmentRange segRange = flatDesc.getSegRange();
if (segRange != null && !segRange.isInfinite()) {
if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
&& (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
bquery += " WHERE " + partitionDesc.getPartitionConditionBuilder()
.buildDateRangeCondition(partitionDesc, flatDesc.getSegment(), segRange);
}
}
}

//related to "kylin.engine.mr.config-override.mapreduce.job.queuename"
String queueName = getSqoopJobQueueName(config);
String cmd = String.format(
"%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+ "-Dmapreduce.job.queuename=%s "
+ "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
+ "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
+ "--fields-terminated-by '%s' --num-mappers %d",
sqoopHome, queueName, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
splitTable, splitColumn, bquery, filedDelimiter, mapperNum);
String cmd = String.format("%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
+ "-Dmapreduce.job.queuename=%s "
+ "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
+ "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
+ "--fields-terminated-by '%s' --num-mappers %d", sqoopHome, queueName, connectionUrl, driverClass,
jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, splitTable, splitColumn, bquery,
filedDelimiter, mapperNum);
logger.debug(String.format("sqoop cmd:%s", cmd));
CmdStep step = new CmdStep();
step.setCmd(cmd);
Expand Down

0 comments on commit faf7064

Please sign in to comment.