-
Notifications
You must be signed in to change notification settings - Fork 9
Merge StorageWriteAPi Upsert to Main #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
I'm too lazy to figure out our new GitHub security flow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for driving this! I don't have the bandwidth to shepherd the PR through the whole review process but wanted to call out some high-level thoughts for the new maintainers to give a picture of what sort of work might need to be done before we can complete the prototype and merge into main.
@@ -57,11 +58,18 @@ | |||
public abstract class StorageWriteApiBase { | |||
|
|||
private static final Logger logger = LoggerFactory.getLogger(StorageWriteApiBase.class); | |||
// TODO: This should be private |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we address?
protected final int retry; | ||
protected final long retryWait; | ||
private final boolean autoCreateTables; | ||
private final BigQueryWriteSettings writeSettings; | ||
private final boolean attemptSchemaUpdate; | ||
|
||
// TODO: These should be private |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we address?
BigQueryWriteClient writeClient = getWriteClient(); | ||
|
||
// Copied from JsonStreamWriter::newBuilder | ||
// TODO: Extract logic into superclass and leverage in StorageWriteApiBatchApplicationStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we address? Otherwise, this feature only works for the default stream, right?
keySchema.getFields()).setMode(Field.Mode.NULLABLE).build(); | ||
result.add(kafkaKeyField); | ||
if (kafkaKeyFieldName.get().isEmpty()) { | ||
// TODO: Gracefully handle collisions with value/TPO field names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we address?
private static final ConfigDef.Importance KAFKA_KEY_FIELD_NAME_IMPORTANCE = ConfigDef.Importance.LOW; | ||
private static final String KAFKA_KEY_FIELD_NAME_DOC = "The name of the field of Kafka key. " | ||
+ "Default to be null, which means Kafka Key Field will not be included."; | ||
+ "Default to be null, which means Kafka Key Field will not be included. " | ||
+ "To include all fields from the key in the top-level record, specify a blank string for this property."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of a hack. Do we maybe want to consider some other options for enabling this kind of functionality? Specially-defined behavior reliant on an empty string being explicitly specified in a config file is a bit of an anti-pattern.
@@ -60,6 +60,9 @@ | |||
<compiler.plugin.version>3.8.1</compiler.plugin.version> | |||
<exec.plugin.version>3.2.0</exec.plugin.version> | |||
<google.cloud.bom.version>26.33.0</google.cloud.bom.version> | |||
<kafka.connect.plugin.version>0.11.1</kafka.connect.plugin.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we need this, unless Aiven is publishing this connector to Confluent Hub (which considering the lineage of this project would be... interesting)
import org.slf4j.LoggerFactory; | ||
|
||
@Tag("integration") | ||
public class StorageWriteApiUpsertDeleteIT extends BaseConnectorIT { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have more unit testing coverage for the new logic introduced in this PR?
The prototype Upsert feature is working correctly. I tested it locally by merging the prototype Branch to Master branch. Data is being streamed from the source to the BigQuery table using the connector release with prototype commits and records are being upserted and deleted successfully via the StorageWrite API.