Skip to content

Merge StorageWriteAPi Upsert to Main #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

abhisheksingh94
Copy link

@abhisheksingh94 abhisheksingh94 commented Jul 11, 2025

The prototype Upsert feature is working correctly. I tested it locally by merging the prototype Branch to Master branch. Data is being streamed from the source to the BigQuery table using the connector release with prototype commits and records are being upserted and deleted successfully via the StorageWrite API.

@abhisheksingh94 abhisheksingh94 changed the title Merge StreamWriteAPi Upsert to Main Merge StorageWriteAPi Upsert to Main Jul 11, 2025
Copy link
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for driving this! I don't have the bandwidth to shepherd the PR through the whole review process but wanted to call out some high-level thoughts for the new maintainers to give a picture of what sort of work might need to be done before we can complete the prototype and merge into main.

@@ -57,11 +58,18 @@
public abstract class StorageWriteApiBase {

private static final Logger logger = LoggerFactory.getLogger(StorageWriteApiBase.class);
// TODO: This should be private
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we address?

protected final int retry;
protected final long retryWait;
private final boolean autoCreateTables;
private final BigQueryWriteSettings writeSettings;
private final boolean attemptSchemaUpdate;

// TODO: These should be private
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we address?

BigQueryWriteClient writeClient = getWriteClient();

// Copied from JsonStreamWriter::newBuilder
// TODO: Extract logic into superclass and leverage in StorageWriteApiBatchApplicationStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we address? Otherwise, this feature only works for the default stream, right?

keySchema.getFields()).setMode(Field.Mode.NULLABLE).build();
result.add(kafkaKeyField);
if (kafkaKeyFieldName.get().isEmpty()) {
// TODO: Gracefully handle collisions with value/TPO field names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we address?

private static final ConfigDef.Importance KAFKA_KEY_FIELD_NAME_IMPORTANCE = ConfigDef.Importance.LOW;
private static final String KAFKA_KEY_FIELD_NAME_DOC = "The name of the field of Kafka key. "
+ "Default to be null, which means Kafka Key Field will not be included.";
+ "Default to be null, which means Kafka Key Field will not be included. "
+ "To include all fields from the key in the top-level record, specify a blank string for this property.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a hack. Do we maybe want to consider some other options for enabling this kind of functionality? Specially-defined behavior reliant on an empty string being explicitly specified in a config file is a bit of an anti-pattern.

@@ -60,6 +60,9 @@
<compiler.plugin.version>3.8.1</compiler.plugin.version>
<exec.plugin.version>3.2.0</exec.plugin.version>
<google.cloud.bom.version>26.33.0</google.cloud.bom.version>
<kafka.connect.plugin.version>0.11.1</kafka.connect.plugin.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need this, unless Aiven is publishing this connector to Confluent Hub (which considering the lineage of this project would be... interesting)

import org.slf4j.LoggerFactory;

@Tag("integration")
public class StorageWriteApiUpsertDeleteIT extends BaseConnectorIT {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have more unit testing coverage for the new logic introduced in this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants