diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 791de53354658..cf605d97da516 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2935,8 +2935,6 @@ components: $ref: "#/components/schemas/CatalogDiff" breakingChange: type: boolean - connectionStatus: - $ref: "#/components/schemas/ConnectionStatus" SourceSearch: type: object properties: diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index f4f3fd9eaf31c..e74c910944183 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -7,7 +7,6 @@ import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; -import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.temporal.ConnectionManagerUtils; @@ -214,8 +213,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final TrackingClient trackingClient = TrackingClientSingleton.get(); final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); - final EnvVariableFeatureFlags envVariableFeatureFlags = new EnvVariableFeatureFlags(); - final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl()); final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs); final JobErrorReporter jobErrorReporter = @@ -289,8 +286,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, configs.getWorkerEnvironment(), configs.getLogConfigs(), eventRunner, - connectionsHandler, - envVariableFeatureFlags); + connectionsHandler); final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 7621cb4baf936..49a4a75940e91 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -15,8 +15,6 @@ import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum; import io.airbyte.api.model.generated.ConnectionIdRequestBody; -import io.airbyte.api.model.generated.ConnectionRead; -import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; @@ -29,7 +27,6 @@ import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.LogRead; -import io.airbyte.api.model.generated.NonBreakingChangesPreference; import io.airbyte.api.model.generated.SourceCoreConfig; import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead; @@ -42,7 +39,6 @@ import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; -import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.ErrorCode; import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult; @@ -99,7 +95,6 @@ public class SchedulerHandler { private final JobPersistence jobPersistence; private final JobConverter jobConverter; private final EventRunner eventRunner; - private final EnvVariableFeatureFlags envVariableFeatureFlags; public SchedulerHandler(final ConfigRepository configRepository, final SecretsRepositoryReader secretsRepositoryReader, @@ -109,8 +104,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, final EventRunner eventRunner, - final ConnectionsHandler connectionsHandler, - final EnvVariableFeatureFlags envVariableFeatureFlags) { + final ConnectionsHandler connectionsHandler) { this( configRepository, secretsRepositoryWriter, @@ -120,8 +114,7 @@ public SchedulerHandler(final ConfigRepository configRepository, jobPersistence, eventRunner, new JobConverter(workerEnvironment, logConfigs), - connectionsHandler, - envVariableFeatureFlags); + connectionsHandler); } @VisibleForTesting @@ -133,8 +126,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final JobPersistence jobPersistence, final EventRunner eventRunner, final JobConverter jobConverter, - final ConnectionsHandler connectionsHandler, - final EnvVariableFeatureFlags envVariableFeatureFlags) { + final ConnectionsHandler connectionsHandler) { this.configRepository = configRepository; this.secretsRepositoryWriter = secretsRepositoryWriter; this.synchronousSchedulerClient = synchronousSchedulerClient; @@ -144,7 +136,6 @@ public SchedulerHandler(final ConfigRepository configRepository, this.eventRunner = eventRunner; this.jobConverter = jobConverter; this.connectionsHandler = connectionsHandler; - this.envVariableFeatureFlags = envVariableFeatureFlags; } public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody) @@ -369,32 +360,16 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered throws JsonValidationException, ConfigNotFoundException, IOException { final Optional catalogUsedToMakeConfiguredCatalog = connectionsHandler .getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); - final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()); final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = - connectionRead.getSyncCatalog(); + connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog(); CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), CatalogConverter.toProtocol(currentAirbyteCatalog)); boolean containsBreakingChange = containsBreakingChange(diff); ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); - ConnectionStatus connectionStatus; - if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) { - connectionStatus = ConnectionStatus.INACTIVE; - } else { - connectionStatus = ConnectionStatus.ACTIVE; - } - updateObject.status(connectionStatus); connectionsHandler.updateConnection(updateObject); - discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus); - - } - - private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) { - if (!envVariableFeatureFlags.autoDetectSchema()) { - return false; - } + discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange); - return containsBreakingChange || (preference == NonBreakingChangesPreference.DISABLE && !diff.getTransforms().isEmpty()); } private CheckConnectionRead reportConnectionStatus(final SynchronousResponse response) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index fa4822a6da676..a76d0e6bf61af 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -353,7 +353,6 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti */ diff = refreshedCatalog.get().getCatalogDiff(); connection.setBreakingChange(refreshedCatalog.get().getBreakingChange()); - connection.setStatus(refreshedCatalog.get().getConnectionStatus()); } else if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // reconstructs a full picture of the full schema at the time the catalog was configured. syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index f06b4d6a8d695..2eb3e9be56d7c 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -25,7 +25,6 @@ import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.ConnectionIdRequestBody; import io.airbyte.api.model.generated.ConnectionRead; -import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationCoreConfig; import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId; @@ -35,7 +34,6 @@ import io.airbyte.api.model.generated.FieldTransform; import io.airbyte.api.model.generated.JobIdRequestBody; import io.airbyte.api.model.generated.JobInfoRead; -import io.airbyte.api.model.generated.NonBreakingChangesPreference; import io.airbyte.api.model.generated.SourceCoreConfig; import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId; import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead; @@ -47,7 +45,6 @@ import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; -import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.temporal.ErrorCode; @@ -109,11 +106,9 @@ class SchedulerHandlerTest { private static final String DESTINATION_PROTOCOL_VERSION = "0.7.9"; private static final String NAME = "name"; private static final String DOGS = "dogs"; - private static final String SHOES = "shoes"; - private static final String SKU = "sku"; - private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog(SHOES, - Field.of(SKU, JsonSchemaType.STRING)); + private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog("shoes", + Field.of("sku", JsonSchemaType.STRING)); private static final SourceConnection SOURCE = new SourceConnection() .withName("my postgres db") @@ -150,7 +145,6 @@ class SchedulerHandlerTest { private EventRunner eventRunner; private JobConverter jobConverter; private ConnectionsHandler connectionsHandler; - private EnvVariableFeatureFlags envVariableFeatureFlags; @BeforeEach void setup() { @@ -168,7 +162,6 @@ void setup() { jobPersistence = mock(JobPersistence.class); eventRunner = mock(EventRunner.class); connectionsHandler = mock(ConnectionsHandler.class); - envVariableFeatureFlags = mock(EnvVariableFeatureFlags.class); jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY)); @@ -181,8 +174,7 @@ void setup() { jobPersistence, eventRunner, jobConverter, - connectionsHandler, - envVariableFeatureFlags); + connectionsHandler); } @Test @@ -563,7 +555,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); @@ -585,110 +577,6 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreaking() throws IOExcept assertEquals(actual.getCatalog(), expectedActorCatalog); } - @Test - void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceNoFeatureFlag() - throws IOException, JsonValidationException, ConfigNotFoundException { - final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - final UUID connectionId = UUID.randomUUID(); - final UUID discoveredCatalogId = UUID.randomUUID(); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; - final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); - final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) - .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); - final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); - when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(false); - when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) - .thenReturn(new StandardSourceDefinition() - .withDockerRepository(SOURCE_DOCKER_REPO) - .withDockerImageTag(SOURCE_DOCKER_TAG) - .withProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withSourceDefinitionId(source.getSourceDefinitionId())); - when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) - .thenReturn(discoverResponse); - - when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); - - final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), - CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); - - final ConnectionRead connectionRead = - new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( - NonBreakingChangesPreference.DISABLE); - when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); - when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); - - final ActorCatalog actorCatalog = new ActorCatalog() - .withCatalog(Jsons.jsonNode(airbyteCatalog)) - .withCatalogHash("") - .withId(discoveredCatalogId); - when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); - - final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), - io.airbyte.protocol.models.AirbyteCatalog.class); - final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); - - final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); - assertEquals(actual.getCatalogDiff(), catalogDiff); - assertEquals(actual.getCatalog(), expectedActorCatalog); - assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); - } - - @Test - void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceFeatureFlag() - throws IOException, JsonValidationException, ConfigNotFoundException { - final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - final UUID connectionId = UUID.randomUUID(); - final UUID discoveredCatalogId = UUID.randomUUID(); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; - final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); - final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.REMOVE_STREAM) - .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)); - final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); - when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); - when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) - .thenReturn(new StandardSourceDefinition() - .withDockerRepository(SOURCE_DOCKER_REPO) - .withDockerImageTag(SOURCE_DOCKER_TAG) - .withProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withSourceDefinitionId(source.getSourceDefinitionId())); - when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) - .thenReturn(discoverResponse); - - when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); - - final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), - CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); - - final ConnectionRead connectionRead = - new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( - NonBreakingChangesPreference.DISABLE); - when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); - when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); - - final ActorCatalog actorCatalog = new ActorCatalog() - .withCatalog(Jsons.jsonNode(airbyteCatalog)) - .withCatalogHash("") - .withId(discoveredCatalogId); - when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); - - final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), - io.airbyte.protocol.models.AirbyteCatalog.class); - final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); - - final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); - assertEquals(actual.getCatalogDiff(), catalogDiff); - assertEquals(actual.getCatalog(), expectedActorCatalog); - assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); - } - @Test void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException, JsonValidationException, ConfigNotFoundException { final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); @@ -715,7 +603,7 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), + CatalogHelpers.createAirbyteStream("shoes", Field.of("sku", JsonSchemaType.STRING)), CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); @@ -731,119 +619,14 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); - final ConnectionUpdate expectedConnectionUpdate = - new ConnectionUpdate().connectionId(connectionId).breakingChange(true).status(ConnectionStatus.ACTIVE); + final ConnectionUpdate expectedConnectionUpdate = new ConnectionUpdate().connectionId(connectionId).breakingChange(true); final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); assertEquals(actual.getCatalogDiff(), catalogDiff); assertEquals(actual.getCatalog(), expectedActorCatalog); - assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); verify(connectionsHandler).updateConnection(expectedConnectionUpdate); } - @Test - void testDiscoverSchemaFromSourceIdWithConnectionIdBreakingFeatureFlagOn() throws IOException, JsonValidationException, ConfigNotFoundException { - final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - final UUID connectionId = UUID.randomUUID(); - final UUID discoveredCatalogId = UUID.randomUUID(); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; - final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); - final StreamTransform streamTransform = new StreamTransform().transformType(TransformTypeEnum.UPDATE_STREAM) - .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name(DOGS)).addUpdateStreamItem(new FieldTransform().transformType( - FieldTransform.TransformTypeEnum.REMOVE_FIELD).breaking(true)); - final CatalogDiff catalogDiff = new CatalogDiff().addTransformsItem(streamTransform); - when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); - when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) - .thenReturn(new StandardSourceDefinition() - .withDockerRepository(SOURCE_DOCKER_REPO) - .withDockerImageTag(SOURCE_DOCKER_TAG) - .withProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withSourceDefinitionId(source.getSourceDefinitionId())); - when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) - .thenReturn(discoverResponse); - - when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); - - final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), - CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); - - final ConnectionRead connectionRead = new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)); - when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); - when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); - - final ActorCatalog actorCatalog = new ActorCatalog() - .withCatalog(Jsons.jsonNode(airbyteCatalog)) - .withCatalogHash("") - .withId(discoveredCatalogId); - when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); - - final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), - io.airbyte.protocol.models.AirbyteCatalog.class); - final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); - final ConnectionUpdate expectedConnectionUpdate = - new ConnectionUpdate().connectionId(connectionId).breakingChange(true).status(ConnectionStatus.INACTIVE); - - final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); - assertEquals(actual.getCatalogDiff(), catalogDiff); - assertEquals(actual.getCatalog(), expectedActorCatalog); - assertEquals(actual.getConnectionStatus(), ConnectionStatus.INACTIVE); - verify(connectionsHandler).updateConnection(expectedConnectionUpdate); - } - - @Test - void testDiscoverSchemaFromSourceIdWithConnectionIdNonBreakingDisableConnectionPreferenceFeatureFlagNoDiff() - throws IOException, JsonValidationException, ConfigNotFoundException { - final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - final UUID connectionId = UUID.randomUUID(); - final UUID discoveredCatalogId = UUID.randomUUID(); - final SynchronousResponse discoverResponse = (SynchronousResponse) jobResponse; - final SourceDiscoverSchemaRequestBody request = - new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId()).connectionId(connectionId).disableCache(true); - final CatalogDiff catalogDiff = new CatalogDiff(); - when(envVariableFeatureFlags.autoDetectSchema()).thenReturn(true); - when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) - .thenReturn(new StandardSourceDefinition() - .withDockerRepository(SOURCE_DOCKER_REPO) - .withDockerImageTag(SOURCE_DOCKER_TAG) - .withProtocolVersion(SOURCE_PROTOCOL_VERSION) - .withSourceDefinitionId(source.getSourceDefinitionId())); - when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); - when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION))) - .thenReturn(discoverResponse); - - when(discoverResponse.isSuccess()).thenReturn(true); - when(discoverResponse.getOutput()).thenReturn(discoveredCatalogId); - - final AirbyteCatalog airbyteCatalogCurrent = new AirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createAirbyteStream(SHOES, Field.of(SKU, JsonSchemaType.STRING)), - CatalogHelpers.createAirbyteStream(DOGS, Field.of(NAME, JsonSchemaType.STRING)))); - - final ConnectionRead connectionRead = - new ConnectionRead().syncCatalog(CatalogConverter.toApi(airbyteCatalogCurrent)).nonBreakingChangesPreference( - NonBreakingChangesPreference.DISABLE); - when(connectionsHandler.getConnection(request.getConnectionId())).thenReturn(connectionRead); - when(connectionsHandler.getDiff(any(), any(), any())).thenReturn(catalogDiff); - - final ActorCatalog actorCatalog = new ActorCatalog() - .withCatalog(Jsons.jsonNode(airbyteCatalog)) - .withCatalogHash("") - .withId(discoveredCatalogId); - when(configRepository.getActorCatalogById(discoveredCatalogId)).thenReturn(actorCatalog); - - final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(), - io.airbyte.protocol.models.AirbyteCatalog.class); - final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog); - - final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request); - assertEquals(actual.getCatalogDiff(), catalogDiff); - assertEquals(actual.getCatalog(), expectedActorCatalog); - assertEquals(actual.getConnectionStatus(), ConnectionStatus.ACTIVE); - } - @Test void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException { final SourceConnection source = new SourceConnection() diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index b390d7a4c3fc6..78b9211352987 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -76,7 +76,6 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.Status; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition; @@ -174,10 +173,9 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, destinationDefinition); final StandardSync standardSync = - ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false, Status.ACTIVE); + ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false); final StandardSync brokenStandardSync = - ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true, Status.INACTIVE); - + ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true); when(configRepository.listWorkspaceStandardSyncs(sourceRead.getWorkspaceId(), false)) .thenReturn(Collections.singletonList(standardSync)); when(configRepository.getSourceAndDefinitionsFromSourceIds(Collections.singletonList(source.getSourceId()))) @@ -279,7 +277,7 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio .streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1")) .updateStream(null)))); - expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(brokenConnectionRead, sourceRead, destinationRead, + expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(connectionRead, sourceRead, destinationRead, new OperationReadList().operations(expected.getOperations()), SchemaChange.BREAKING, now, modifiedCatalog, null) .catalogDiff(new CatalogDiff().transforms(List.of( new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM) @@ -420,7 +418,7 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchema() throws ConfigNotFoun when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); SourceDiscoverSchemaRead schemaRead = new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) - .breakingChange(false).connectionStatus(ConnectionStatus.ACTIVE); + .breakingChange(false); when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, @@ -437,10 +435,10 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchemaBreakingChange() throws when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID())); SourceDiscoverSchemaRead schemaRead = new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog()) - .breakingChange(true).connectionStatus(ConnectionStatus.INACTIVE); + .breakingChange(true); when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead); - final WebBackendConnectionRead result = testWebBackendGetConnection(true, brokenConnectionRead, + final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead, operationReadList); assertEquals(expectedWithNewSchemaAndBreakingChange, result); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 908a5749e72fd..ab742a079b367 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -33,7 +33,6 @@ import io.airbyte.config.Schedule.TimeUnit; import io.airbyte.config.ScheduleData; import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.Status; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -108,10 +107,7 @@ public static StandardSync generateSyncWithDestinationId(final UUID destinationI .withManual(true); } - public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, - final UUID destinationId, - final boolean isBroken, - final Status status) { + public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, final UUID destinationId, final boolean isBroken) { final UUID connectionId = UUID.randomUUID(); return new StandardSync() @@ -120,7 +116,7 @@ public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sou .withNamespaceDefinition(NamespaceDefinitionType.SOURCE) .withNamespaceFormat(null) .withPrefix(STANDARD_SYNC_PREFIX) - .withStatus(status) + .withStatus(StandardSync.Status.ACTIVE) .withCatalog(generateBasicConfiguredAirbyteCatalog()) .withSourceCatalogId(UUID.randomUUID()) .withSourceId(sourceId) @@ -170,6 +166,7 @@ public static ConnectionRead generateExpectedConnectionRead(final UUID connectio .namespaceDefinition(io.airbyte.api.model.generated.NamespaceDefinitionType.SOURCE) .namespaceFormat(null) .prefix("presto_to_hudi") + .status(ConnectionStatus.ACTIVE) .schedule(generateBasicConnectionSchedule()) .scheduleType(ConnectionScheduleType.BASIC) .scheduleData(generateBasicConnectionScheduleData()) @@ -202,14 +199,6 @@ public static ConnectionRead generateExpectedConnectionRead(final StandardSync s .units(standardSync.getSchedule().getUnits())); } - if (standardSync.getStatus() == Status.INACTIVE) { - connectionRead.setStatus(ConnectionStatus.INACTIVE); - } else if (standardSync.getStatus() == Status.ACTIVE) { - connectionRead.setStatus(ConnectionStatus.ACTIVE); - } else if (standardSync.getStatus() == Status.DEPRECATED) { - connectionRead.setStatus(ConnectionStatus.DEPRECATED); - } - return connectionRead; } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index b1b26c98dd653..24555ce4519da 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -11891,7 +11891,6 @@

SourceDiscoverSchemaRead - <
catalogId (optional)
UUID format: uuid
catalogDiff (optional)
breakingChange (optional)
-
connectionStatus (optional)
diff --git a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml index 81d2de72e7895..84c985df000f4 100644 --- a/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml +++ b/tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml @@ -2493,8 +2493,6 @@ components: $ref: "#/components/schemas/CatalogDiff" breakingChange: type: boolean - connectionStatus: - $ref: "#/components/schemas/ConnectionStatus" required: - jobInfo type: object