Skip to content

Commit

Permalink
check connection preferences
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Nov 28, 2022
1 parent 524dd4e commit 6ed3996
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
Expand All @@ -28,6 +29,7 @@
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
import io.airbyte.api.model.generated.SourceCoreConfig;
import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId;
import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead;
Expand Down Expand Up @@ -366,15 +368,16 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog();
connectionRead.getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
ConnectionStatus connectionStatus;
if (envVariableFeatureFlags.autoDetectSchema() && containsBreakingChange) {
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference())) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = ConnectionStatus.ACTIVE;
Expand All @@ -385,6 +388,14 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered

}

private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return false;
}

return containsBreakingChange || preference == NonBreakingChangesPreference.DISABLE;
}

private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<StandardCheckConnectionOutput> response) {
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead()
.jobInfo(jobConverter.getSynchronousJobRead(response));
Expand Down

0 comments on commit 6ed3996

Please sign in to comment.