Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination bigquery: upgrade cdk #35315

Merged
merged 10 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove state handling
  • Loading branch information
edgao authored and gisripa committed Mar 4, 2024
commit 564f57b4077723f0cae570c7cf8d9e9d3fd5e690
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
Expand Down Expand Up @@ -239,7 +238,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride);
final BigQuery bigquery = getBigQuery(config);
final TyperDeduper typerDeduper =
buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe, rawNamespaceOverride);
buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);

AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config);
final JsonNode serviceAccountKey = config.get(BigQueryConsts.CONFIG_CREDS);
Expand Down Expand Up @@ -446,14 +445,13 @@ private TyperDeduper buildTyperDeduper(final BigQuerySqlGenerator sqlGenerator,
final ParsedCatalog parsedCatalog,
final BigQuery bigquery,
final String datasetLocation,
final boolean disableTypeDedupe,
final Optional<String> rawNamespaceOverride) {
final boolean disableTypeDedupe) {
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(
bigquery,
datasetLocation,
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
datasetLocation
);

if (disableTypeDedupe) {
return new NoOpTyperDeduperWithV1V2Migrations<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.toDialectType;
import static java.util.stream.Collectors.toMap;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
Expand All @@ -24,20 +23,16 @@
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Streams;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
Expand All @@ -47,7 +42,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -61,31 +56,21 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.text.StringSubstitutor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO this stuff almost definitely exists somewhere else in our codebase.
public class BigQueryDestinationHandler implements DestinationHandler<BigqueryState> {
public class BigQueryDestinationHandler implements DestinationHandler<MinimumDestinationState.Impl> {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationHandler.class);

private static final String DESTINATION_STATE_TABLE_NAME = "_airbyte_destination_state";
private static final String DESTINATION_STATE_TABLE_COLUMN_NAME = "name";
private static final String DESTINATION_STATE_TABLE_COLUMN_NAMESPACE = "namespace";
private static final String DESTINATION_STATE_TABLE_COLUMN_STATE = "destination_state";
private static final String DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT = "updated_at";

private final BigQuery bq;
private final String datasetLocation;
private final String rawTableDataset;

public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocation, String rawTableDataset) {
public BigQueryDestinationHandler(final BigQuery bq, final String datasetLocation) {
this.bq = bq;
this.datasetLocation = datasetLocation;
this.rawTableDataset = rawTableDataset;
}

public Optional<TableDefinition> findExistingTable(final StreamId id) {
Expand Down Expand Up @@ -206,83 +191,27 @@ public void execute(final Sql sql) throws InterruptedException {
}

@Override
public List<DestinationInitialState<BigqueryState>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
// Would be nice to use bq.create(), but it doesn't support `create table if not exists`.
bq.query(QueryJobConfiguration.newBuilder(
"CREATE TABLE IF NOT EXISTS " + getStateTableName() + " ("
+ DESTINATION_STATE_TABLE_COLUMN_NAME + " STRING, "
+ DESTINATION_STATE_TABLE_COLUMN_NAMESPACE + " STRING, "
+ DESTINATION_STATE_TABLE_COLUMN_STATE + " JSON, "
+ DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT + " TIMESTAMP)").build());

Map<AirbyteStreamNameNamespacePair, BigqueryState> destinationStates = StreamSupport.stream(
bq.query(QueryJobConfiguration.newBuilder(
"SELECT * FROM " + getStateTableName()).build()).iterateAll().spliterator(),
false).collect(
toMap(
fvList -> {
final FieldValue nameFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAME);
final FieldValue namespaceFieldValue = fvList.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE);
return new AirbyteStreamNameNamespacePair(
nameFieldValue.isNull() ? null : nameFieldValue.getStringValue(),
namespaceFieldValue.isNull() ? null : namespaceFieldValue.getStringValue());
},
fvList -> {
JsonNode json = Jsons.deserialize(fvList.get(DESTINATION_STATE_TABLE_COLUMN_STATE).getStringValue());
return toBigqueryState(json);
}));

final List<DestinationInitialState<BigqueryState>> initialStates = new ArrayList<>();
public List<DestinationInitialState<MinimumDestinationState.Impl>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
final List<DestinationInitialState<MinimumDestinationState.Impl>> initialStates = new ArrayList<>();
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.id();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableState rawTableState = getInitialRawTableState(id);
final BigqueryState bigqueryState = destinationStates.getOrDefault(streamConfig.id().asPair(), toBigqueryState(Jsons.emptyObject()));
initialStates.add(new DestinationInitialState<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
!finalTable.isPresent() || isFinalTableEmpty(id),
bigqueryState));
// Return a default state blob since we don't actually track state.
new MinimumDestinationState.Impl(false)));
}
return initialStates;
}

@Override
public void commitDestinationStates(Map<StreamId, BigqueryState> destinationStates) throws Exception {
if (destinationStates.isEmpty()) {
return;
}

final String deleteStates = "DELETE FROM " + getStateTableName() + " WHERE "
+ destinationStates.keySet().stream()
.map(streamId -> String.format("(%s = ? AND %s = ?)",
DESTINATION_STATE_TABLE_COLUMN_NAME,
DESTINATION_STATE_TABLE_COLUMN_NAMESPACE))
.collect(Collectors.joining(" OR "));
final QueryJobConfiguration.Builder deleteQueryConfig = QueryJobConfiguration.newBuilder(deleteStates);
destinationStates.forEach((key, value) -> {
deleteQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalName()));
deleteQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalNamespace()));
});
bq.query(deleteQueryConfig.build());

final String insertStates = "INSERT INTO " + getStateTableName() + " ("
+ DESTINATION_STATE_TABLE_COLUMN_NAME + ", "
+ DESTINATION_STATE_TABLE_COLUMN_NAMESPACE + ", "
+ DESTINATION_STATE_TABLE_COLUMN_STATE + ", "
+ DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT + ") VALUES "
+ destinationStates.keySet().stream()
.map(streamId -> "(?, ?, ?, CURRENT_TIMESTAMP)")
.collect(Collectors.joining(", "));
final QueryJobConfiguration.Builder insertQueryConfig = QueryJobConfiguration.newBuilder(insertStates);
destinationStates.forEach((key, value) -> {
insertQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalName()));
insertQueryConfig.addPositionalParameter(QueryParameterValue.string(key.originalNamespace()));
insertQueryConfig.addPositionalParameter(QueryParameterValue.json(Jsons.serialize(value)));
});
bq.query(insertQueryConfig.build());
public void commitDestinationStates(Map<StreamId, MinimumDestinationState.Impl> destinationStates) throws Exception {
// Intentionally do nothing. Bigquery doesn't actually support destination states.
}

private boolean existingSchemaMatchesStreamConfig(final StreamConfig stream,
Expand Down Expand Up @@ -390,15 +319,4 @@ private static Set<String> getPks(final StreamConfig stream) {
return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet();
}

@NotNull
private String getStateTableName() {
return QUOTE + rawTableDataset + QUOTE + "." + DESTINATION_STATE_TABLE_NAME;
}

@NotNull
private static BigqueryState toBigqueryState(JsonNode json) {
return new BigqueryState(
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.destination.bigquery.BigQueryConsts;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
Expand All @@ -56,7 +57,7 @@
import org.slf4j.LoggerFactory;

@Execution(ExecutionMode.CONCURRENT)
public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest<BigqueryState> {
public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest<MinimumDestinationState.Impl> {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class);

Expand All @@ -81,7 +82,7 @@ protected BigQuerySqlGenerator getSqlGenerator() {

@Override
protected BigQueryDestinationHandler getDestinationHandler() {
return new BigQueryDestinationHandler(bq, "US", namespace);
return new BigQueryDestinationHandler(bq, "US");
}

@Override
Expand Down Expand Up @@ -362,7 +363,7 @@ public void testCreateTableIncremental() throws Exception {

@Test
public void testCreateTableInOtherRegion() throws InterruptedException {
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1", namespace);
final BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1");
// We're creating the dataset in the wrong location in the @BeforeEach block. Explicitly delete it.
bq.getDataset(namespace).delete();
final var sqlGenerator = new BigQuerySqlGenerator(projectId, "asia-east1");
Expand Down