-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-io] Add a Pulsar IO connector for InfluxDB sink #4017
Conversation
|
||
@FieldDoc( | ||
required = false, | ||
defaultValue = "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have sensitive=true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
@FieldDoc( | ||
required = false, | ||
defaultValue = "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have sensitive=true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// create the database if not exists | ||
List<String> databases = influxDB.describeDatabases(); | ||
if (!databases.contains(influxDatabase)) { | ||
influxDB.createDatabase(influxDatabase); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a little suggestion:
if (!databases.contains(influxDatabase)) {
if(influxDBSinkConfig.isCreateDatabase()) {
influxDB.createDatabase(influxDatabase);
}
else {
throw new IllegalArgumentException("This " + influxDBSinkConfig.getDatabase() + " database does not exist!");
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ambition119 Thanks for the reviewing, it's not necessary to do this as field database
is required and has been checked in method validate()
.
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
Provides a builtin InfluxDB sink Connector, in order to persist pulsar messages to a InfluxDB database.
Modifications
Add a InfluxDB Sink and some unit tests.
Verifying this change
This change can be verified as follows:
influx
CLI to query messages in the specified database