Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination bigquery: upgrade cdk #35315

Merged
merged 10 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix compilation post rebase
  • Loading branch information
gisripa committed Mar 4, 2024
commit c001be8246ef401ce95ca9f397b7b606390e5098
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
return new BigQueryRecordStandardConsumer(
outputRecordCollector,
() -> {
typerDeduper.prepareSchemasAndRawTables();
typerDeduper.prepareSchemasAndRunMigrations();

// Set up our raw tables
writeConfigs.get().forEach((streamId, uploader) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
final TyperDeduper typerDeduper) {
return () -> {
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size());
typerDeduper.prepareSchemasAndRawTables();
typerDeduper.prepareSchemasAndRunMigrations();

for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) {
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState.Impl;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -82,11 +83,11 @@ public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.finalNamespace(), id.finalName())).getNumRows());
}

public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.rawNamespace(), id.rawName()));
if (rawTable == null) {
// Table doesn't exist. There are no unprocessed records, and no timestamp.
return new InitialRawTableState(false, false, Optional.empty());
return new InitialRawTableStatus(false, false, Optional.empty());
}

final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
Expand All @@ -102,7 +103,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
// If it's not null, then we can return immediately - we've found some unprocessed records and their
// timestamp.
if (!unloadedRecordTimestamp.isNull()) {
return new InitialRawTableState(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, true, Optional.of(unloadedRecordTimestamp.getTimestampInstant()));
}

final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
Expand All @@ -116,10 +117,10 @@ SELECT MAX(_airbyte_extracted_at)
// So we just need to get the timestamp of the most recent record.
if (loadedRecordTimestamp.isNull()) {
// Null timestamp because the table is empty. T+D can process the entire raw table during this sync.
return new InitialRawTableState(true, false, Optional.empty());
return new InitialRawTableStatus(true, false, Optional.empty());
} else {
// The raw table already has some records. T+D can skip all records with timestamp <= this value.
return new InitialRawTableState(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
return new InitialRawTableStatus(true, false, Optional.of(loadedRecordTimestamp.getTimestampInstant()));
}
}

Expand Down Expand Up @@ -191,18 +192,18 @@ public void execute(final Sql sql) throws InterruptedException {
}

@Override
public List<DestinationInitialState<MinimumDestinationState.Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialState<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
public List<DestinationInitialStatus<Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialStatus<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.id();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableState rawTableState = getInitialRawTableState(id);
initialStates.add(new DestinationInitialState<>(
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
initialStates.add(new DestinationInitialStatus<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
!finalTable.isPresent() || isFinalTableEmpty(id),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
new MinimumDestinationState.Impl(false)));
}
Expand Down