Skip to content

Commit 3ba64f0

Browse files
committed
Merge pull request confluentinc#22 from confluentinc/nullable-column-check
Add check for NOT NULL columns on target tables when specified as increasing or timestamp column for offsets.
2 parents b1e1ab0 + 740da48 commit 3ba64f0

File tree

5 files changed

+104
-12
lines changed

5 files changed

+104
-12
lines changed

src/main/java/io/confluent/connect/jdbc/JdbcSourceConnectorConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,14 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
6464
public static final String INCREASING_COLUMN_NAME_CONFIG = "increasing.column.name";
6565
private static final String INCREASING_COLUMN_NAME_DOC =
6666
"The name of the strictly increasing column to use to detect new rows. Any empty value "
67-
+ "indicates the column should be autodetected by looking for an auto-incrementing column.";
67+
+ "indicates the column should be autodetected by looking for an auto-incrementing column. "
68+
+ "This column may not be nullable.";
6869
public static final String INCREASING_COLUMN_NAME_DEFAULT = "";
6970

7071
public static final String TIMESTAMP_COLUMN_NAME_CONFIG = "timestamp.column.name";
7172
private static final String TIMESTAMP_COLUMN_NAME_DOC =
72-
"The name of the timestamp column to use to detect new or modified rows.";
73+
"The name of the timestamp column to use to detect new or modified rows. This column may "
74+
+ "not be nullable.";
7375
public static final String TIMESTAMP_COLUMN_NAME_DEFAULT = "";
7476

7577
public static final String TABLE_POLL_INTERVAL_MS_CONFIG = "table.poll.interval.ms";

src/main/java/io/confluent/connect/jdbc/JdbcSourceTask.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,27 @@ public void start(Map<String, String> properties) {
108108
offsets = context.offsetStorageReader().offsets(partitions);
109109
}
110110

111+
// Must setup the connection now to validate NOT NULL columns. At this point we've already
112+
// caught any easy-to-find errors so deferring the connection creation won't save any effort
113+
String dbUrl = config.getString(JdbcSourceTaskConfig.CONNECTION_URL_CONFIG);
114+
log.debug("Trying to connect to {}", dbUrl);
115+
try {
116+
db = DriverManager.getConnection(dbUrl);
117+
} catch (SQLException e) {
118+
log.error("Couldn't open connection to {}: {}", dbUrl, e);
119+
throw new ConnectException(e);
120+
}
121+
111122
String increasingColumn
112123
= config.getString(JdbcSourceTaskConfig.INCREASING_COLUMN_NAME_CONFIG);
113124
String timestampColumn
114125
= config.getString(JdbcSourceTaskConfig.TIMESTAMP_COLUMN_NAME_CONFIG);
126+
115127
for (String tableOrQuery : tablesOrQuery) {
116128
final Map<String, String> partition;
117129
switch (queryMode) {
118130
case TABLE:
131+
validateNonNullable(mode, tableOrQuery, increasingColumn, timestampColumn);
119132
partition = Collections.singletonMap(
120133
JdbcSourceConnectorConstants.TABLE_NAME_KEY, tableOrQuery);
121134
break;
@@ -149,15 +162,6 @@ public void start(Map<String, String> properties) {
149162
}
150163
}
151164

152-
String dbUrl = config.getString(JdbcSourceTaskConfig.CONNECTION_URL_CONFIG);
153-
log.debug("Trying to connect to {}", dbUrl);
154-
try {
155-
db = DriverManager.getConnection(dbUrl);
156-
} catch (SQLException e) {
157-
log.error("Couldn't open connection to {}: {}", dbUrl, e);
158-
throw new ConnectException(e);
159-
}
160-
161165
stop = new AtomicBoolean(false);
162166
}
163167

@@ -234,4 +238,29 @@ public List<SourceRecord> poll() throws InterruptedException {
234238
return null;
235239
}
236240

241+
private void validateNonNullable(String incrementalMode, String table, String increasingColumn,
242+
String timestampColumn) {
243+
try {
244+
// Validate that requested columns for offsets are NOT NULL. Currently this is only performed
245+
// for table-based copying because custom query mode doesn't allow this to be looked up
246+
// without a query or parsing the query since we don't have a table name.
247+
if ((incrementalMode.equals(JdbcSourceConnectorConfig.MODE_INCREASING) ||
248+
incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREASING)) &&
249+
JdbcUtils.isColumnNullable(db, table, increasingColumn)) {
250+
throw new ConnectException("Cannot make incremental queries using incrementing column " +
251+
increasingColumn + " on " + table + " because this column is "
252+
+ "nullable.");
253+
}
254+
if ((incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP) ||
255+
incrementalMode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREASING)) &&
256+
JdbcUtils.isColumnNullable(db, table, timestampColumn)) {
257+
throw new ConnectException("Cannot make incremental queries using timestamp column " +
258+
timestampColumn + " on " + table + " because this column is "
259+
+ "nullable.");
260+
}
261+
} catch (SQLException e) {
262+
throw new ConnectException("Failed trying to validate that columns used for offsets are NOT"
263+
+ " NULL", e);
264+
}
265+
}
237266
}

src/main/java/io/confluent/connect/jdbc/JdbcUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class JdbcUtils {
5555
private static final int GET_TABLES_NAME_COLUMN = 3;
5656

5757
private static final int GET_COLUMNS_COLUMN_NAME = 4;
58+
private static final int GET_COLUMNS_IS_NULLABLE = 18;
5859
private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23;
5960

6061

@@ -147,6 +148,21 @@ public static String getAutoincrementColumn(Connection conn, String table) throw
147148
return (matches == 1 ? result : null);
148149
}
149150

151+
public static boolean isColumnNullable(Connection conn, String table, String column)
152+
throws SQLException {
153+
ResultSet rs = conn.getMetaData().getColumns(null, null, table, column);
154+
if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) {
155+
// Should only be one match
156+
if (!rs.next()) {
157+
return false;
158+
}
159+
String val = rs.getString(GET_COLUMNS_IS_NULLABLE);
160+
return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES");
161+
}
162+
163+
return false;
164+
}
165+
150166
/**
151167
* Format the given Date assuming UTC timezone in a format supported by SQL.
152168
* @param date the date to convert to a String

src/test/java/io/confluent/connect/jdbc/JdbcSourceTaskUpdateTest.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.confluent.connect.jdbc;
1818

1919
import org.apache.kafka.connect.data.Struct;
20+
import org.apache.kafka.connect.errors.ConnectException;
2021
import org.apache.kafka.connect.source.SourceRecord;
2122
import org.junit.After;
2223
import org.junit.Test;
@@ -75,9 +76,36 @@ public void testBulkPeriodicLoad() throws Exception {
7576
assertRecordsTopic(records, TOPIC_PREFIX + SINGLE_TABLE_NAME);
7677
}
7778

79+
@Test(expected = ConnectException.class)
80+
public void testIncreasingInvalidColumn() throws Exception {
81+
expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION));
82+
83+
PowerMock.replayAll();
84+
85+
// Increasing column must be NOT NULL
86+
db.createTable(SINGLE_TABLE_NAME, "id", "INT");
87+
88+
startTask(null, "id", null);
89+
90+
PowerMock.verifyAll();
91+
}
92+
93+
@Test(expected = ConnectException.class)
94+
public void testTimestampInvalidColumn() throws Exception {
95+
expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION));
96+
97+
PowerMock.replayAll();
98+
99+
// Timestamp column must be NOT NULL
100+
db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP");
101+
102+
startTask("modified", null, null);
103+
104+
PowerMock.verifyAll();
105+
}
106+
78107
@Test
79108
public void testManualIncreasing() throws Exception {
80-
81109
expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION));
82110

83111
PowerMock.replayAll();

src/test/java/io/confluent/connect/jdbc/JdbcUtilsTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import java.util.HashSet;
2626

2727
import static org.junit.Assert.assertEquals;
28+
import static org.junit.Assert.assertFalse;
2829
import static org.junit.Assert.assertNull;
30+
import static org.junit.Assert.assertTrue;
2931

3032
public class JdbcUtilsTest {
3133

@@ -84,4 +86,19 @@ public void testGetAutoincrement() throws Exception {
8486
"bar", "INTEGER");
8587
assertEquals("id", JdbcUtils.getAutoincrementColumn(db.getConnection(), "mixed"));
8688
}
89+
90+
@Test
91+
public void testIsColumnNullable() throws Exception {
92+
db.createTable("test", "id", "INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY", "bar", "INTEGER");
93+
assertFalse(JdbcUtils.isColumnNullable(db.getConnection(), "test", "id"));
94+
assertTrue(JdbcUtils.isColumnNullable(db.getConnection(), "test", "bar"));
95+
96+
// Derby does not seem to allow null
97+
db.createTable("tstest", "ts", "TIMESTAMP NOT NULL", "tsdefault", "TIMESTAMP",
98+
"tsnull", "TIMESTAMP DEFAULT NULL");
99+
assertFalse(JdbcUtils.isColumnNullable(db.getConnection(), "tstest", "ts"));
100+
// The default for TIMESTAMP columns can vary between databases, but for Derby it is nullable
101+
assertTrue(JdbcUtils.isColumnNullable(db.getConnection(), "tstest", "tsdefault"));
102+
assertTrue(JdbcUtils.isColumnNullable(db.getConnection(), "tstest", "tsnull"));
103+
}
87104
}

0 commit comments

Comments
 (0)