Skip to content

Commit b09462e

Browse files
committed
Merge pull request confluentinc#51 from criccomini/add-validation-config
Add config to disable non-null checks.
2 parents c0747fc + 1b7b16a commit b09462e

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

src/main/java/io/confluent/connect/jdbc/JdbcSourceConnectorConfig.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
107107
"Prefix to prepend to table names to generate the name of the Kafka topic to publish data "
108108
+ "to, or in the case of a custom query, the full name of the topic to publish to.";
109109

110+
public static final String VALIDATE_NON_NULL_CONFIG = "validate.non.null";
111+
private static final String VALIDATE_NON_NULL_DOC =
112+
"By default, the JDBC connector will validate that all incrementing and timestamp tables have NOT NULL set for "
113+
+ "the columns being used as their ID/timestamp. If the tables don't, JDBC connector will fail to start. Setting "
114+
+ "this to false will disable these checks.";
115+
public static final boolean VALIDATE_NON_NULL_DEFAULT = true;
116+
110117
public static ConfigDef baseConfigDef() {
111118
return new ConfigDef()
112119
.define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, CONNECTION_URL_DOC)
@@ -132,7 +139,9 @@ public static ConfigDef baseConfigDef() {
132139
.define(QUERY_CONFIG, Type.STRING, QUERY_DEFAULT,
133140
Importance.MEDIUM, QUERY_DOC)
134141
.define(TOPIC_PREFIX_CONFIG, Type.STRING,
135-
Importance.HIGH, TOPIC_PREFIX_DOC);
142+
Importance.HIGH, TOPIC_PREFIX_DOC)
143+
.define(VALIDATE_NON_NULL_CONFIG, Type.BOOLEAN, VALIDATE_NON_NULL_DEFAULT,
144+
Importance.LOW, VALIDATE_NON_NULL_DOC);
136145
}
137146

138147
static ConfigDef config = baseConfigDef();

src/main/java/io/confluent/connect/jdbc/JdbcSourceTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,16 @@ public void start(Map<String, String> properties) {
123123
= config.getString(JdbcSourceTaskConfig.INCREMENTING_COLUMN_NAME_CONFIG);
124124
String timestampColumn
125125
= config.getString(JdbcSourceTaskConfig.TIMESTAMP_COLUMN_NAME_CONFIG);
126+
boolean validateNonNulls
127+
= config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG);
126128

127129
for (String tableOrQuery : tablesOrQuery) {
128130
final Map<String, String> partition;
129131
switch (queryMode) {
130132
case TABLE:
131-
validateNonNullable(mode, tableOrQuery, incrementingColumn, timestampColumn);
133+
if (validateNonNulls) {
134+
validateNonNullable(mode, tableOrQuery, incrementingColumn, timestampColumn);
135+
}
132136
partition = Collections.singletonMap(
133137
JdbcSourceConnectorConstants.TABLE_NAME_KEY, tableOrQuery);
134138
break;

0 commit comments

Comments
 (0)