Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,9 @@ protected String buildQuerySql(InputSplit inputSplit) {

querySql = buildQuerySqlBySplit(jdbcInputSplit, whereList);

if (!jdbcConfig.isPolling()) {
querySql = querySql + SqlUtil.buildOrderSql(jdbcConfig, jdbcDialect, "ASC");
}
log.info("Executing sql is: '{}'", querySql);
return querySql;
String finalSql = SqlUtil.buildOrderSql(querySql, jdbcConfig, jdbcDialect, "ASC");
log.info("Executing sql is: '{}'", finalSql);
return finalSql;
}

/**
Expand Down Expand Up @@ -691,17 +689,19 @@ protected void executeQuery(String startLocation) throws SQLException {
builder.append(" WHERE ");
}
builder.append(jdbcDialect.quoteIdentifier(jdbcConfig.getIncreColumn()))
.append(" > ?")
.append(SqlUtil.buildOrderSql(jdbcConfig, jdbcDialect, "ASC"));
jdbcConfig.setQuerySql(builder.toString());
.append(" > ?");

String querySQL =
SqlUtil.buildOrderSql(builder.toString(), jdbcConfig, jdbcDialect, "ASC");
jdbcConfig.setQuerySql(querySQL);
initPrepareStatement(jdbcConfig.getQuerySql());
log.info("update querySql, sql = {}", jdbcConfig.getQuerySql());
} else {
// if the job have startLocation
// sql will be like "select ... from ... where increColumn > ?"
jdbcConfig.setQuerySql(
jdbcConfig.getQuerySql()
+ SqlUtil.buildOrderSql(jdbcConfig, jdbcDialect, "ASC"));
SqlUtil.buildOrderSql(
jdbcConfig.getQuerySql(), jdbcConfig, jdbcDialect, "ASC"));
initPrepareStatement(jdbcConfig.getQuerySql());
queryForPolling(startLocation);
state = restoreKeyUtil.transLocationStrToSqlValue(startLocation);
Expand Down Expand Up @@ -736,7 +736,7 @@ public void initPrepareStatement(String querySql) throws SQLException {
protected void queryPollingWithOutStartLocation() throws SQLException {
// add order by to query SQL avoid duplicate data
initPrepareStatement(
jdbcConfig.getQuerySql() + SqlUtil.buildOrderSql(jdbcConfig, jdbcDialect, "ASC"));
SqlUtil.buildOrderSql(jdbcConfig.getQuerySql(), jdbcConfig, jdbcDialect, "ASC"));
resultSet = ps.executeQuery();
hasNext = resultSet.next();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,22 @@ public static String buildQuerySqlBySplit(
return querySql;
}

/**
* build order sql
*
* @param sortRule
* @return
*/
public static String buildOrderSql(
JdbcConfig jdbcConfig, JdbcDialect jdbcDialect, String sortRule) {
String originalSql, JdbcConfig jdbcConf, JdbcDialect jdbcDialect, String sortRule) {
String column;
// 增量任务
if (jdbcConfig.isIncrement()) {
column = jdbcConfig.getIncreColumn();
if (jdbcConf.isIncrement() && jdbcConf.isOrderBy() && !originalSql.contains("ORDER BY")) {
column = jdbcConf.getIncreColumn();
} else {
column = jdbcConfig.getOrderByColumn();
column = jdbcConf.getOrderByColumn();
}
return StringUtils.isBlank(column)
? ""
: String.format(" ORDER BY %s %s", jdbcDialect.quoteIdentifier(column), sortRule);

String additional =
StringUtils.isBlank(column)
? ""
: String.format(
" ORDER BY %s %s", jdbcDialect.quoteIdentifier(column), sortRule);
return originalSql + additional;
}

/* 是否添加自定义函数column 作为分片key ***/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ public void buildQuerySqlTest() {
.thenCallRealMethod();
when(SqlUtil.buildQuerySqlBySplit(any(), any(), anyList(), anyList(), any()))
.thenAnswer(invocation -> "select id from table where id > 10");
when(SqlUtil.buildOrderSql(jdbcConfig, jdbcDialect, "ASC"))
when(SqlUtil.buildOrderSql(
jdbcInputFormat.buildQuerySql(inputSplit), jdbcConfig, jdbcDialect, "ASC"))
.thenAnswer(invocation -> " order by id ASC");
String except = "select id from table where id > 10 order by id ASC";
Assert.assertEquals(except, jdbcInputFormat.buildQuerySql(inputSplit));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected void queryPollingWithOutStartLocation() throws SQLException {
// , the query will report an error after the method
// #setFetchDirection(ResultSet.FETCH_REVERSE) is called.
String querySql =
jdbcConfig.getQuerySql() + SqlUtil.buildOrderSql(jdbcConfig, jdbcDialect, "ASC");
SqlUtil.buildOrderSql(jdbcConfig.getQuerySql(), jdbcConfig, jdbcDialect, "ASC");
ps =
dbConn.prepareStatement(
querySql, ResultSet.TYPE_SCROLL_INSENSITIVE, resultSetConcurrency);
Expand Down