Skip to content

Commit

Permalink
KYLIN-3707 add configuration for setting isolation-level for sqoop
Browse files Browse the repository at this point in the history
  • Loading branch information
woyumen4597 authored and shaofengshi committed Dec 15, 2018
1 parent 32a3150 commit fb54f38
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;

public class JdbcConnector implements Closeable {
Expand Down Expand Up @@ -175,8 +174,7 @@ public String getPropertyValue(String key) {
return jdbcDs.getPropertyValue(key);
}

@VisibleForTesting
SqlConverter getSqlConverter() {
public SqlConverter getSqlConverter() {
return sqlConverter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ public String fixAfterDefaultConvert(String orig) {
if (this.adaptor == null) {
return orig;
}
// fix problem of case sensitive when generate sql.
// if (isCaseSensitive()) {
// orig = adaptor.fixCaseSensitiveSql(orig);
// }
return adaptor.fixSql(orig);
}

Expand Down Expand Up @@ -134,4 +130,9 @@ public String fixIdentifierCaseSensitve(String orig) {
}
return adaptor.fixIdentifierCaseSensitve(orig);
}

@Override
public String getTransactionIsolationLevel() {
return dsDef.getPropertyValue("transaction.isolation-level");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,11 @@ public interface IConfigurer {
boolean enableQuote();

String fixIdentifierCaseSensitve(String orig);

/**
* Only support following 3 types
* TRANSACTION_READ_COMMITTED,TRANSACTION_READ_UNCOMMITTED,TRANSACTION_READ_COMMITTED
*/
String getTransactionIsolationLevel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,20 @@ public boolean enableQuote() {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}

@Override
public String getTransactionIsolationLevel() {
return null;
}
}, master);

// escape default keywords
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select * from \"DEFAULT\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"Default\".\"FACT\"",
converter.convertSql("select * from \"Default\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"default\".\"FACT\"",
converter.convertSql("select * from \"default\".FACT"));
}

@Test
Expand Down Expand Up @@ -189,6 +197,11 @@ public boolean enableQuote() {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}

@Override
public String getTransactionIsolationLevel() {
return null;
}
}, master);

// normal cases
Expand All @@ -203,30 +216,35 @@ public String fixIdentifierCaseSensitve(String orig) {

// escape default keywords
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from DEFAULT.FACT"));
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"DEFAULT\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"Default\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"", converter.convertSql("select * from \"default\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select * from \"DEFAULT\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select * from \"Default\".FACT"));
Assert.assertEquals("SELECT *\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select * from \"default\".FACT"));

// function mapping
Assert.assertEquals("SELECT EXTRACT(DOY FROM \"PART_DT\")\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select DAYOFYEAR(PART_DT) from \"DEFAULT\".FACT"));
Assert.assertEquals(
"SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - " +
"CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n" +
"FROM \"DEFAULT\".\"FACT\"",
"SELECT 12 * (EXTRACT(YEAR FROM \"DT1\") - EXTRACT(YEAR FROM \"DT2\")) + EXTRACT(MONTH FROM \"DT1\") - EXTRACT(MONTH FROM \"DT2\") - "
+ "CASE WHEN EXTRACT(DAY FROM \"DT2\") > EXTRACT(DAY FROM \"DT1\") THEN 1 ELSE 0 END\n"
+ "FROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select TIMESTAMPDIFF(month,DT2, DT1) from \"DEFAULT\".FACT"));
Assert.assertEquals("SELECT TRUNC(\"ID\")\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql("select cast(ID as INT) from \"DEFAULT\".FACT"));
Assert.assertEquals("SELECT 1\nFROM \"A\"\nWHERE 1 BETWEEN ASYMMETRIC 0 AND 2",
converter.convertSql("select 1 from a where 1 BETWEEN 0 and 2"));
Assert.assertEquals("SELECT \"CURRENT_DATE\", TEST_CURR_TIME()",
converter.convertSql("select CURRENT_DATE, CURRENT_TIME"));
Assert.assertEquals("SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
Assert.assertEquals(
"SELECT EXP(AVG(LN(EXTRACT(DOY FROM CAST('2018-03-20' AS DATE)))))\nFROM \"DEFAULT\".\"FACT\"",
converter.convertSql(
"select exp(avg(ln(dayofyear(cast('2018-03-20' as date))))) from \"DEFAULT\".FACT"));

// over function
Assert.assertEquals("SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
Assert.assertEquals(
"SELECT STDDEVP(\"C1\") OVER (ORDER BY \"C1\")\nFROM \"TEST_SUITE\"\nFETCH NEXT 1 ROWS ONLY",
converter.convertSql("select stddev_pop(c1) over(order by c1) from test_suite limit 1"));

// type mapping
Expand Down Expand Up @@ -332,6 +350,11 @@ public boolean enableQuote() {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}

@Override
public String getTransactionIsolationLevel() {
return null;
}
}, master);

Assert.assertEquals("SELECT 1\nORDER BY 2\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
Expand All @@ -347,6 +370,7 @@ public String fixIdentifierCaseSensitve(String orig) {
Assert.assertEquals("SELECT 1\nORDER BY 1\nOFFSET 0 ROWS\nFETCH NEXT 1 ROWS ONLY",
converter.convertSql("SELECT 1 LIMIT 1"));
}

@Test
public void testConvertQuotedSqlWithEscape() throws SQLException {
DataSourceDefProvider provider = DataSourceDefProvider.getInstance();
Expand Down Expand Up @@ -417,6 +441,11 @@ public boolean enableQuote() {
public String fixIdentifierCaseSensitve(String orig) {
return orig;
}

@Override
public String getTransactionIsolationLevel() {
return null;
}
}, master);

Assert.assertEquals("SELECT SUM(\"A\"), COUNT(\"A\") AS \"AB\"\nFROM \"DEFAULT\".\"CUBE\"",
Expand All @@ -425,8 +454,10 @@ public String fixIdentifierCaseSensitve(String orig) {
converter.convertSql("select A(), B(`A`), cast(`PRICE@@` as `DDD`) from DEFAULT.`CUBE`"));
Assert.assertEquals("SELECT A(), B(\"A\"), CAST(\"PRICE@@\" AS DDD)\nFROM \"DEFAULT\".\"CUBE\"",
converter.convertSql("select A(), B(\"A\"), cast(\"PRICE@@\" as \"DDD\") from \"DEFAULT\".\"CUBE\""));
Assert.assertEquals("SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
converter.convertSql("select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50"));
Assert.assertEquals(
"SELECT \"kylin_sales\".\"price_@@\", \"kylin_sales\".\"count\"\nFROM \"cube\".\"kylin_sales\"\nWHERE \"kylin_sales\".\"price_@@\" > 1 AND \"kylin_sales\".\"count\" < 50",
converter.convertSql(
"select `kylin_sales`.`price_@@`, `kylin_sales`.`count` from `cube`.`kylin_sales` where `kylin_sales`.`price_@@` > 1 and `kylin_sales`.`count` < 50"));
Assert.assertEquals("SELECT COUNT(DISTINCT \"price_#@\")\nFROM \"cube\".\"kylin_sales\"",
converter.convertSql("select count(distinct `price_#@`) from `cube`.`kylin_sales`"));

Expand Down Expand Up @@ -502,6 +533,11 @@ public boolean enableQuote() {
public String fixIdentifierCaseSensitve(String orig) {
return orig.toUpperCase(Locale.ROOT);
}

@Override
public String getTransactionIsolationLevel() {
return null;
}
}, master);

Assert.assertEquals("\"TEST\".\"AA\"", converter.convertColumn("`test`.`aa`", "`"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
import org.apache.kylin.sdk.datasource.framework.conv.SqlConverter;
import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,15 +83,15 @@ protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, Str
String filedDelimiter = config.getJdbcSourceFieldDelimiter();
int mapperNum = config.getSqoopMapperNum();

String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
splitDatabase, splitTable, splitTableAlias);
String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn,
splitColumn, splitDatabase, splitTable, splitTableAlias);
bquery = dataSource.convertSql(bquery);
if (partitionDesc.isPartitioned()) {
SegmentRange segRange = flatDesc.getSegRange();
if (segRange != null && !segRange.isInfinite()) {
if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
&& (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
flatDesc.getSegment(), segRange),
Expand All @@ -110,6 +111,11 @@ protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, Str
dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), dataSource.getJdbcUser(),
dataSource.getJdbcPassword(), selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
filedDelimiter, mapperNum);
SqlConverter.IConfigurer configurer = dataSource.getSqlConverter().getConfigurer();
if (configurer.getTransactionIsolationLevel() != null) {
cmd = cmd + " --relaxed-isolation --metadata-transaction-isolation-level "
+ configurer.getTransactionIsolationLevel();
}
logger.debug("sqoop cmd: {}", cmd);

SqoopCmdStep step = new SqoopCmdStep();
Expand Down

0 comments on commit fb54f38

Please sign in to comment.