Skip to content

Commit

Permalink
[BugFix] [InfluxDBSource] Resolve invalid SQL in initColumnsIndex met…
Browse files Browse the repository at this point in the history
…hod caused by direct QUERY_LIMIT appendage with 'tz' function. (apache#4829)
  • Loading branch information
zhengyuan-cn authored Nov 27, 2023
1 parent a6523ba commit deed9c6
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
Expand Down Expand Up @@ -128,7 +130,16 @@ public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> restoreEn

private List<Integer> initColumnsIndex(InfluxDB influxdb) {
// query one row to get column info
String query = sourceConfig.getSql() + QUERY_LIMIT;
String sql = sourceConfig.getSql();
String query = sql + QUERY_LIMIT;
// if sql contains tz(), can't be append QUERY_LIMIT at last . see bug #4231
int start = containTzFunction(sql.toLowerCase());
if (start > 0) {
StringBuilder tmpSql = new StringBuilder(sql);
tmpSql.insert(start - 1, QUERY_LIMIT).append(" ");
query = tmpSql.toString();
}

try {
QueryResult queryResult = influxdb.query(new Query(query, sourceConfig.getDatabase()));

Expand All @@ -145,4 +156,14 @@ private List<Integer> initColumnsIndex(InfluxDB influxdb) {
e);
}
}

private static int containTzFunction(String sql) {
Pattern pattern = Pattern.compile("tz\\(.*\\)");
Matcher matcher = pattern.matcher(sql);
if (matcher.find()) {
int start = matcher.start();
return start;
}
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,40 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte
}
}

@TestTemplate
public void testInfluxdbWithTz(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/influxdb-to-influxdb-with-tz.conf");
Assertions.assertEquals(0, execResult.getExitCode());
String sourceSql =
String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT);
String sinkSql = String.format("select * from %s order by time", INFLUXDB_SINK_MEASUREMENT);
QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, INFLUXDB_DATABASE));
QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE));
// assert data count
Assertions.assertEquals(
sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size());
// assert data values
List<List<Object>> sourceValues =
sourceQueryResult.getResults().get(0).getSeries().get(0).getValues();
List<List<Object>> sinkValues =
sinkQueryResult.getResults().get(0).getSeries().get(0).getValues();
int rowSize = sourceValues.size();
int colSize = sourceValues.get(0).size();

for (int row = 0; row < rowSize; row++) {
for (int col = 0; col < colSize; col++) {
Object sourceColValue = sourceValues.get(row).get(col);
Object sinkColValue = sinkValues.get(row).get(col);

if (!Objects.deepEquals(sourceColValue, sinkColValue)) {
Assertions.assertEquals(sourceColValue, sinkColValue);
}
}
}
}

private void initializeInfluxDBClient() throws ConnectException {
InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl);
influxDB = InfluxDBClient.getInfluxDB(influxDBConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
InfluxDB {
url = "http://influxdb-host:8086"
sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source tz('Asia/Shanghai')"
database = "test"
schema {
fields {
label = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
}
}

transform {
}

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}

0 comments on commit deed9c6

Please sign in to comment.