diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index d4fadd52c3989..9d4dd02ce2102 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2214,26 +2214,6 @@ paths: application/json: schema: $ref: "#/components/schemas/InternalOperationResult" - /v1/attempt/save_sync_config: - post: - tags: - - attempt - - internal - summary: For worker to save the AttemptSyncConfig for an attempt. - operationId: saveSyncConfig - requestBody: - content: - application/json: - schema: - $ref: "#/components/schemas/SaveAttemptSyncConfigRequestBody" - required: true - responses: - "200": - description: Successful Operation - content: - application/json: - schema: - $ref: "#/components/schemas/InternalOperationResult" components: securitySchemes: @@ -5045,31 +5025,6 @@ components: type: array items: $ref: "#/components/schemas/AttemptStreamStats" - AttemptSyncConfig: - type: object - required: - - sourceConfiguration - - destinationConfiguration - properties: - sourceConfiguration: - $ref: "#/components/schemas/SourceConfiguration" - destinationConfiguration: - $ref: "#/components/schemas/DestinationConfiguration" - state: - $ref: "#/components/schemas/ConnectionState" - SaveAttemptSyncConfigRequestBody: - type: object - required: - - jobId - - attemptNumber - - syncConfig - properties: - jobId: - $ref: "#/components/schemas/JobId" - attemptNumber: - $ref: "#/components/schemas/AttemptNumber" - syncConfig: - $ref: "#/components/schemas/AttemptSyncConfig" InternalOperationResult: type: object required: diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java index c31d6acdb644c..032ec01e66c43 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java @@ -82,8 +82,7 @@ class BootloaderTest { // ⚠️ This line should change with every new migration to show that you meant to make a new // migration to the prod database - private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.40.28.001"; - private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.40.28.001"; + private static final String CURRENT_MIGRATION_VERSION = "0.40.28.001"; @BeforeEach void setup() { @@ -148,10 +147,10 @@ void testBootloaderAppBlankDb() throws Exception { bootloader.load(); val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); - assertEquals(CURRENT_JOBS_MIGRATION_VERSION, jobsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); - assertEquals(CURRENT_CONFIGS_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals(CURRENT_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion()); assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get()); assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get()); diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java index 713aefd7d7154..ea63090306765 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java @@ -5,14 +5,11 @@ package io.airbyte.commons.server.converters; import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements; -import io.airbyte.api.model.generated.AttemptSyncConfig; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.ConnectionSchedule; import io.airbyte.api.model.generated.ConnectionScheduleData; import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule; import io.airbyte.api.model.generated.ConnectionScheduleDataCron; -import io.airbyte.api.model.generated.ConnectionState; -import io.airbyte.api.model.generated.ConnectionStateType; import io.airbyte.api.model.generated.ConnectionStatus; import io.airbyte.api.model.generated.Geography; import io.airbyte.api.model.generated.JobType; @@ -25,12 +22,6 @@ import io.airbyte.config.BasicSchedule; import io.airbyte.config.Schedule; import io.airbyte.config.StandardSync; -import io.airbyte.config.State; -import io.airbyte.config.StateWrapper; -import io.airbyte.config.helpers.StateMessageHelper; -import io.airbyte.workers.helper.StateConverter; -import java.util.Optional; -import java.util.UUID; import java.util.stream.Collectors; public class ApiPojoConverters { @@ -51,42 +42,6 @@ public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefReso .collect(Collectors.toList())); } - public static io.airbyte.config.AttemptSyncConfig attemptSyncConfigToInternal(final AttemptSyncConfig attemptSyncConfig) { - if (attemptSyncConfig == null) { - return null; - } - - final io.airbyte.config.AttemptSyncConfig internalAttemptSyncConfig = new io.airbyte.config.AttemptSyncConfig() - .withSourceConfiguration(attemptSyncConfig.getSourceConfiguration()) - .withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration()); - - final ConnectionState connectionState = attemptSyncConfig.getState(); - if (connectionState != null && connectionState.getStateType() != ConnectionStateType.NOT_SET) { - final StateWrapper stateWrapper = StateConverter.toInternal(attemptSyncConfig.getState()); - final io.airbyte.config.State state = StateMessageHelper.getState(stateWrapper); - internalAttemptSyncConfig.setState(state); - } - - return internalAttemptSyncConfig; - } - - public static io.airbyte.api.client.model.generated.AttemptSyncConfig attemptSyncConfigToClient(final io.airbyte.config.AttemptSyncConfig attemptSyncConfig, - final UUID connectionId, - final boolean useStreamCapableState) { - if (attemptSyncConfig == null) { - return null; - } - - final State state = attemptSyncConfig.getState(); - final Optional optStateWrapper = state != null ? StateMessageHelper.getTypedState( - state.getState(), useStreamCapableState) : Optional.empty(); - - return new io.airbyte.api.client.model.generated.AttemptSyncConfig() - .sourceConfiguration(attemptSyncConfig.getSourceConfiguration()) - .destinationConfiguration(attemptSyncConfig.getDestinationConfiguration()) - .state(StateConverter.toClient(connectionId, optStateWrapper.orElse(null))); - } - public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) { if (actorDefResourceReqs == null) { return null; diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java index 0302bc63b37b1..c7132665fe927 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java @@ -5,10 +5,8 @@ package io.airbyte.commons.server.handlers; import io.airbyte.api.model.generated.InternalOperationResult; -import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody; import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; -import io.airbyte.commons.server.converters.ApiPojoConverters; import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.persistence.job.JobPersistence; @@ -65,17 +63,4 @@ public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody) return new InternalOperationResult().succeeded(true); } - public InternalOperationResult saveSyncConfig(final SaveAttemptSyncConfigRequestBody requestBody) { - try { - jobPersistence.writeAttemptSyncConfig( - requestBody.getJobId(), - requestBody.getAttemptNumber(), - ApiPojoConverters.attemptSyncConfigToInternal(requestBody.getSyncConfig())); - } catch (final IOException ioe) { - LOGGER.error("IOException when saving AttemptSyncConfig for attempt;", ioe); - return new InternalOperationResult().succeeded(false); - } - return new InternalOperationResult().succeeded(true); - } - } diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java index a3415ab24fff0..eeb2c0ff48c71 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java @@ -12,18 +12,9 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doThrow; -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.api.model.generated.AttemptSyncConfig; -import io.airbyte.api.model.generated.ConnectionState; -import io.airbyte.api.model.generated.ConnectionStateType; -import io.airbyte.api.model.generated.GlobalState; -import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.server.converters.ApiPojoConverters; import io.airbyte.persistence.job.JobPersistence; import java.io.IOException; -import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,7 +26,6 @@ class AttemptHandlerTest { JobPersistence jobPersistence; AttemptHandler handler; - private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final long JOB_ID = 10002L; private static final int ATTEMPT_NUMBER = 1; @@ -49,14 +39,14 @@ public void init() { @Test void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception { - final String workflowId = UUID.randomUUID().toString(); + String workflowId = UUID.randomUUID().toString(); final ArgumentCaptor attemptNumberCapture = ArgumentCaptor.forClass(Integer.class); final ArgumentCaptor jobIdCapture = ArgumentCaptor.forClass(Long.class); final ArgumentCaptor workflowIdCapture = ArgumentCaptor.forClass(String.class); final ArgumentCaptor queueCapture = ArgumentCaptor.forClass(String.class); - final SetWorkflowInAttemptRequestBody requestBody = + SetWorkflowInAttemptRequestBody requestBody = new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId) .processingTaskQueue(PROCESSING_TASK_QUEUE); @@ -73,7 +63,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception { @Test void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception { - final String workflowId = UUID.randomUUID().toString(); + String workflowId = UUID.randomUUID().toString(); doThrow(IOException.class).when(jobPersistence).setAttemptTemporalWorkflowInfo(anyLong(), anyInt(), any(), any()); @@ -83,7 +73,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception { final ArgumentCaptor workflowIdCapture = ArgumentCaptor.forClass(String.class); final ArgumentCaptor queueCapture = ArgumentCaptor.forClass(String.class); - final SetWorkflowInAttemptRequestBody requestBody = + SetWorkflowInAttemptRequestBody requestBody = new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId) .processingTaskQueue(PROCESSING_TASK_QUEUE); @@ -98,38 +88,4 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception { assertEquals(PROCESSING_TASK_QUEUE, queueCapture.getValue()); } - @Test - void testInternalHandlerSetsAttemptSyncConfig() throws Exception { - final ArgumentCaptor attemptNumberCapture = ArgumentCaptor.forClass(Integer.class); - final ArgumentCaptor jobIdCapture = ArgumentCaptor.forClass(Long.class); - final ArgumentCaptor attemptSyncConfigCapture = - ArgumentCaptor.forClass(io.airbyte.config.AttemptSyncConfig.class); - - final JsonNode sourceConfig = Jsons.jsonNode(Map.of("source_key", "source_val")); - final JsonNode destinationConfig = Jsons.jsonNode(Map.of("destination_key", "destination_val")); - final ConnectionState state = new ConnectionState() - .connectionId(CONNECTION_ID) - .stateType(ConnectionStateType.GLOBAL) - .streamState(null) - .globalState(new GlobalState().sharedState(Jsons.jsonNode(Map.of("state_key", "state_val")))); - - final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() - .destinationConfiguration(destinationConfig) - .sourceConfiguration(sourceConfig) - .state(state); - - final SaveAttemptSyncConfigRequestBody requestBody = - new SaveAttemptSyncConfigRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).syncConfig(attemptSyncConfig); - - assertTrue(handler.saveSyncConfig(requestBody).getSucceeded()); - - Mockito.verify(jobPersistence).writeAttemptSyncConfig(jobIdCapture.capture(), attemptNumberCapture.capture(), attemptSyncConfigCapture.capture()); - - final io.airbyte.config.AttemptSyncConfig expectedAttemptSyncConfig = ApiPojoConverters.attemptSyncConfigToInternal(attemptSyncConfig); - - assertEquals(ATTEMPT_NUMBER, attemptNumberCapture.getValue()); - assertEquals(JOB_ID, jobIdCapture.getValue()); - assertEquals(expectedAttemptSyncConfig, attemptSyncConfigCapture.getValue()); - } - } diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java index 81adf7f43ec05..3ab2582afdb45 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java @@ -157,7 +157,7 @@ private static AttemptRead toAttemptRead(final Attempt a) { } private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) { - return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, null, status, null, null, timestamps, timestamps, timestamps); + return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, null, timestamps, timestamps, timestamps); } @BeforeEach diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java index 9cf6a7b77e70f..e57d584961a32 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -16,7 +16,6 @@ import io.airbyte.commons.temporal.scheduling.SpecWorkflow; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; import io.airbyte.commons.temporal.scheduling.state.WorkflowState; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverCatalogConfig; @@ -373,11 +372,7 @@ public TemporalResponse submitDiscoverSchema(final UUID jobI () -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input)); } - public TemporalResponse submitSync(final long jobId, - final int attempt, - final JobSyncConfig config, - final AttemptSyncConfig attemptConfig, - final UUID connectionId) { + public TemporalResponse submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) { final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig() @@ -398,11 +393,11 @@ public TemporalResponse submitSync(final long jobId, .withNamespaceDefinition(config.getNamespaceDefinition()) .withNamespaceFormat(config.getNamespaceFormat()) .withPrefix(config.getPrefix()) - .withSourceConfiguration(attemptConfig.getSourceConfiguration()) - .withDestinationConfiguration(attemptConfig.getDestinationConfiguration()) + .withSourceConfiguration(config.getSourceConfiguration()) + .withDestinationConfiguration(config.getDestinationConfiguration()) .withOperationSequence(config.getOperationSequence()) .withCatalog(config.getConfiguredAirbyteCatalog()) - .withState(attemptConfig.getState()) + .withState(config.getState()) .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java index df7c7f100c82b..228c632cf8d6d 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -31,7 +31,6 @@ import io.airbyte.commons.temporal.scheduling.SpecWorkflow; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; import io.airbyte.commons.temporal.scheduling.state.WorkflowState; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.FailureReason; import io.airbyte.config.JobCheckConnectionConfig; @@ -272,27 +271,26 @@ void testSubmitSync() { final JobSyncConfig syncConfig = new JobSyncConfig() .withSourceDockerImage(IMAGE_NAME1) .withDestinationDockerImage(IMAGE_NAME2) + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(Jsons.emptyObject()) .withOperationSequence(List.of()) .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()); - final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() - .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(Jsons.emptyObject()); final StandardSyncInput input = new StandardSyncInput() .withNamespaceDefinition(syncConfig.getNamespaceDefinition()) .withNamespaceFormat(syncConfig.getNamespaceFormat()) .withPrefix(syncConfig.getPrefix()) - .withSourceConfiguration(attemptSyncConfig.getSourceConfiguration()) - .withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration()) + .withSourceConfiguration(syncConfig.getSourceConfiguration()) + .withDestinationConfiguration(syncConfig.getDestinationConfiguration()) .withOperationSequence(syncConfig.getOperationSequence()) .withCatalog(syncConfig.getConfiguredAirbyteCatalog()) - .withState(attemptSyncConfig.getState()); + .withState(syncConfig.getState()); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(JOB_ID)) .withAttemptId((long) ATTEMPT_ID) .withDockerImage(IMAGE_NAME2); - temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID); + temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID); discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input, CONNECTION_ID); verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.SYNC)); } @@ -342,17 +340,15 @@ void testforceCancelConnection() { doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class)); when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow); - final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() - .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(Jsons.emptyObject()); - final JobSyncConfig syncConfig = new JobSyncConfig() .withSourceDockerImage(IMAGE_NAME1) .withDestinationDockerImage(IMAGE_NAME2) + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(Jsons.emptyObject()) .withOperationSequence(List.of()) .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()); - temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID); + temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID); temporalClient.forceDeleteWorkflow(CONNECTION_ID); verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java index ca4d17460edc6..47f2c7cded307 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java @@ -9,12 +9,10 @@ import com.auth0.jwt.algorithms.Algorithm; import com.google.auth.oauth2.ServiceAccountCredentials; import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.generated.AttemptApi; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.DestinationApi; import io.airbyte.api.client.generated.JobsApi; import io.airbyte.api.client.generated.SourceApi; -import io.airbyte.api.client.generated.StateApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.commons.temporal.config.WorkerMode; @@ -95,16 +93,6 @@ public WorkspaceApi workspaceApi(final ApiClient apiClient) { return new WorkspaceApi(apiClient); } - @Singleton - public AttemptApi attemptApi(final ApiClient apiClient) { - return new AttemptApi(apiClient); - } - - @Singleton - public StateApi stateApi(final ApiClient apiClient) { - return new StateApi(apiClient); - } - @Singleton public HttpClient httpClient() { return HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java index 3132631738258..b06eb17ee0e91 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/ProtocolConverters.java @@ -26,9 +26,4 @@ public static io.airbyte.protocol.models.StreamDescriptor streamDescriptorToProt .withNamespace(apiStreamDescriptor.getNamespace()); } - public static io.airbyte.protocol.models.StreamDescriptor clientStreamDescriptorToProtocol(final io.airbyte.api.client.model.generated.StreamDescriptor clientStreamDescriptor) { - return new io.airbyte.protocol.models.StreamDescriptor().withName(clientStreamDescriptor.getName()) - .withNamespace(clientStreamDescriptor.getNamespace()); - } - } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java index 0765e242d93f0..73fe752537ce2 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/StateConverter.java @@ -13,7 +13,6 @@ import io.airbyte.config.StateWrapper; import io.airbyte.protocol.models.AirbyteGlobalState; import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.AirbyteStreamState; import java.util.List; import java.util.Optional; @@ -69,15 +68,6 @@ public static StateWrapper toInternal(final @Nullable ConnectionState apiConnect } - public static StateWrapper clientToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState clientConnectionState) { - return new StateWrapper() - .withStateType(clientConnectionState != null ? convertClientStateTypeToInternal(clientConnectionState.getStateType()) : null) - .withGlobal(clientGlobalStateToInternal(clientConnectionState).orElse(null)) - .withLegacyState(clientConnectionState != null ? clientConnectionState.getState() : null) - .withStateMessages(clientStreamStateToInternal(clientConnectionState).orElse(null)); - - } - public static StateType convertClientStateTypeToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionStateType connectionStateType) { if (connectionStateType == null || connectionStateType.equals(io.airbyte.api.client.model.generated.ConnectionStateType.NOT_SET)) { return null; @@ -201,23 +191,6 @@ private static Optional globalStateToInternal(final @Nullab } } - private static Optional clientGlobalStateToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState connectionState) { - if (connectionState != null - && connectionState.getStateType() == io.airbyte.api.client.model.generated.ConnectionStateType.GLOBAL - && connectionState.getGlobalState() != null) { - return Optional.of(new AirbyteStateMessage() - .withType(AirbyteStateType.GLOBAL) - .withGlobal(new AirbyteGlobalState() - .withSharedState(connectionState.getGlobalState().getSharedState()) - .withStreamStates(connectionState.getGlobalState().getStreamStates() - .stream() - .map(StateConverter::clientStreamStateStructToInternal) - .toList()))); - } else { - return Optional.empty(); - } - } - /** * If wrapper is of type stream state, returns API representation of stream state. Otherwise, empty * optional. @@ -278,19 +251,6 @@ private static Optional> streamStateToInternal(final @ } } - private static Optional> clientStreamStateToInternal(final @Nullable io.airbyte.api.client.model.generated.ConnectionState connectionState) { - if (connectionState != null && connectionState.getStateType() == io.airbyte.api.client.model.generated.ConnectionStateType.STREAM - && connectionState.getStreamState() != null) { - return Optional.ofNullable(connectionState.getStreamState() - .stream() - .map(StateConverter::clientStreamStateStructToInternal) - .map(s -> new AirbyteStateMessage().withType(AirbyteStateType.STREAM).withStream(s)) - .toList()); - } else { - return Optional.empty(); - } - } - private static StreamState streamStateStructToApi(final AirbyteStreamState streamState) { return new StreamState() .streamDescriptor(ProtocolConverters.streamDescriptorToApi(streamState.getStreamDescriptor())) @@ -309,10 +269,4 @@ private static AirbyteStreamState streamStateStructToInternal(final StreamState .withStreamState(streamState.getStreamState()); } - private static AirbyteStreamState clientStreamStateStructToInternal(final io.airbyte.api.client.model.generated.StreamState streamState) { - return new AirbyteStreamState() - .withStreamDescriptor(ProtocolConverters.clientStreamDescriptorToProtocol(streamState.getStreamDescriptor())) - .withStreamState(streamState.getStreamState()); - } - } diff --git a/airbyte-config/config-models/src/main/resources/types/AttemptSyncConfig.yaml b/airbyte-config/config-models/src/main/resources/types/AttemptSyncConfig.yaml deleted file mode 100644 index 7b28faea7bbf0..0000000000000 --- a/airbyte-config/config-models/src/main/resources/types/AttemptSyncConfig.yaml +++ /dev/null @@ -1,22 +0,0 @@ ---- -"$schema": http://json-schema.org/draft-07/schema# -"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/AttemptSyncConfig.yaml -title: AttemptSyncConfig -description: attempt sync config -type: object -additionalProperties: false -required: - - sourceConfiguration - - destinationConfiguration -properties: - sourceConfiguration: - description: Integration specific blob. Must be a valid JSON string. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode - destinationConfiguration: - description: Integration specific blob. Must be a valid JSON string. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode - state: - description: optional state of the previous run. this object is defined per integration. - "$ref": State.yaml diff --git a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml index 73dcd898f93c9..462a8ab1229d4 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml @@ -6,6 +6,7 @@ description: job reset connection config type: object additionalProperties: false required: + - destinationConfiguration - configuredAirbyteCatalog - destinationDockerImage properties: @@ -18,6 +19,10 @@ properties: prefix: description: Prefix that will be prepended to the name of each stream when it is written to the destination. type: string + destinationConfiguration: + description: Integration specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode configuredAirbyteCatalog: description: the configured airbyte catalog type: object @@ -44,6 +49,9 @@ properties: existingJavaType: io.airbyte.config.ResourceRequirements resetSourceConfiguration: "$ref": ResetSourceConfiguration.yaml + state: + description: optional current state of the connection + "$ref": State.yaml isSourceCustomConnector: description: determine if the running image of the source is a custom connector. type: boolean diff --git a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml index 7fe334ef5d0a5..652996a9b5c05 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobSyncConfig.yaml @@ -6,6 +6,8 @@ description: job sync config type: object additionalProperties: false required: + - sourceConfiguration + - destinationConfiguration - configuredAirbyteCatalog - sourceDockerImage - destinationDockerImage @@ -19,6 +21,14 @@ properties: prefix: description: Prefix that will be prepended to the name of each stream when it is written to the destination. type: string + sourceConfiguration: + description: Integration specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode + destinationConfiguration: + description: Integration specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode configuredAirbyteCatalog: description: the configured airbyte catalog type: object @@ -54,6 +64,9 @@ properties: description: The webhook operation configs belonging to this workspace. Must conform to WebhookOperationConfigs.yaml. type: object existingJavaType: com.fasterxml.jackson.databind.JsonNode + state: + description: optional state of the previous run. this object is defined per integration. + "$ref": State.yaml resourceRequirements: type: object description: optional resource requirements to run sync workers - this is used for containers other than the source/dest containers diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_28_001__AddAttemptSyncConfig.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_28_001__AddAttemptSyncConfig.java deleted file mode 100644 index 076e879368b4d..0000000000000 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_28_001__AddAttemptSyncConfig.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.instance.jobs.migrations; - -import org.flywaydb.core.api.migration.BaseJavaMigration; -import org.flywaydb.core.api.migration.Context; -import org.jooq.DSLContext; -import org.jooq.impl.DSL; -import org.jooq.impl.SQLDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class V0_40_28_001__AddAttemptSyncConfig extends BaseJavaMigration { - - private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_28_001__AddAttemptSyncConfig.class); - - @Override - public void migrate(final Context context) throws Exception { - LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); - - // Warning: please do not use any jOOQ generated code to write a migration. - // As database schema changes, the generated jOOQ code can be deprecated. So - // old migration may not compile if there is any generated code. - try (final DSLContext ctx = DSL.using(context.getConnection())) { - addAttemptSyncConfigToAttempts(ctx); - } - } - - private static void addAttemptSyncConfigToAttempts(final DSLContext ctx) { - ctx.alterTable("attempts") - .addColumnIfNotExists(DSL.field( - "attempt_sync_config", - SQLDataType.JSONB.nullable(true))) - .execute(); - } - -} diff --git a/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml b/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml index 5efdc4ef1097f..758f53c322f6f 100644 --- a/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml +++ b/airbyte-db/db-lib/src/main/resources/jobs_database/Attempts.yaml @@ -19,8 +19,6 @@ properties: type: number attempt_number: type: number - attempt_sync_config: - type: ["null", object] log_path: type: string output: diff --git a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt index 6e959d581f7af..100af44d08938 100644 --- a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt @@ -35,7 +35,6 @@ create table "public"."attempts"( "temporal_workflow_id" varchar(256) null, "failure_summary" jsonb null, "processing_task_queue" varchar(255) null, - "attempt_sync_config" jsonb null, constraint "attempts_pkey" primary key ("id") ); diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java index 4b02f2c518842..e09056525a282 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java @@ -19,6 +19,9 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.State; +import io.airbyte.config.helpers.StateMessageHelper; +import io.airbyte.config.persistence.StatePersistence; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.DestinationSyncMode; @@ -36,11 +39,14 @@ public class DefaultJobCreator implements JobCreator { private final JobPersistence jobPersistence; private final ResourceRequirements workerResourceRequirements; + private final StatePersistence statePersistence; public DefaultJobCreator(final JobPersistence jobPersistence, - final ResourceRequirements workerResourceRequirements) { + final ResourceRequirements workerResourceRequirements, + final StatePersistence statePersistence) { this.jobPersistence = jobPersistence; this.workerResourceRequirements = workerResourceRequirements; + this.statePersistence = statePersistence; } @Override @@ -79,11 +85,14 @@ public Optional createSyncJob(final SourceConnection source, .withPrefix(standardSync.getPrefix()) .withSourceDockerImage(sourceDockerImageName) .withSourceProtocolVersion(sourceProtocolVersion) + .withSourceConfiguration(source.getConfiguration()) .withDestinationDockerImage(destinationDockerImageName) .withDestinationProtocolVersion(destinationProtocolVersion) + .withDestinationConfiguration(destination.getConfiguration()) .withOperationSequence(standardSyncOperations) .withWebhookOperationConfigs(webhookOperationConfigs) .withConfiguredAirbyteCatalog(standardSync.getCatalog()) + .withState(null) .withResourceRequirements(mergedOrchestratorResourceReq) .withSourceResourceRequirements(mergedSrcResourceReq) .withDestinationResourceRequirements(mergedDstResourceReq) @@ -91,6 +100,8 @@ public Optional createSyncJob(final SourceConnection source, .withIsDestinationCustomConnector(destinationDefinition.getCustom()) .withWorkspaceId(workspaceId); + getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); + final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.SYNC) .withSync(jobSyncConfig); @@ -130,6 +141,7 @@ public Optional createResetConnectionJob(final DestinationConnection desti .withPrefix(standardSync.getPrefix()) .withDestinationDockerImage(destinationDockerImage) .withDestinationProtocolVersion(destinationProtocolVersion) + .withDestinationConfiguration(destination.getConfiguration()) .withOperationSequence(standardSyncOperations) .withConfiguredAirbyteCatalog(configuredAirbyteCatalog) .withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements( @@ -139,10 +151,16 @@ public Optional createResetConnectionJob(final DestinationConnection desti .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(isDestinationCustomConnector); + getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(resetConnectionConfig::withState); + final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.RESET_CONNECTION) .withResetConnection(resetConnectionConfig); return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig); } + private Optional getCurrentConnectionState(final UUID connectionId) throws IOException { + return statePersistence.getCurrentState(connectionId).map(StateMessageHelper::getState); + } + } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index e3d96022048ba..e3bbdc7362bd3 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -31,7 +31,6 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.commons.version.Version; import io.airbyte.config.AttemptFailureSummary; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.FailureReason; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; @@ -157,7 +156,6 @@ private static String jobSelectAndJoin(final String jobsSubquery) { + "jobs.created_at AS job_created_at,\n" + "jobs.updated_at AS job_updated_at,\n" + "attempts.attempt_number AS attempt_number,\n" - + "attempts.attempt_sync_config AS attempt_sync_config,\n" + "attempts.log_path AS log_path,\n" + "attempts.output AS attempt_output,\n" + "attempts.status AS attempt_status,\n" @@ -492,18 +490,6 @@ private static void saveToStreamStatsTable(final OffsetDateTime now, }); } - @Override - public void writeAttemptSyncConfig(final long jobId, final int attemptNumber, final AttemptSyncConfig attemptSyncConfig) throws IOException { - final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); - - jobDatabase.transaction( - ctx -> ctx.update(ATTEMPTS) - .set(ATTEMPTS.ATTEMPT_SYNC_CONFIG, JSONB.valueOf(Jsons.serialize(attemptSyncConfig))) - .set(ATTEMPTS.UPDATED_AT, now) - .where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber)) - .execute()); - } - @Override public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException { final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); @@ -958,8 +944,6 @@ private static Attempt getAttemptFromRecord(final Record record) { record.get(ATTEMPT_NUMBER, int.class), record.get(JOB_ID, Long.class), Path.of(record.get("log_path", String.class)), - record.get("attempt_sync_config", String.class) == null ? null - : Jsons.deserialize(record.get("attempt_sync_config", String.class), AttemptSyncConfig.class), attemptOutputString == null ? null : parseJobOutputFromString(attemptOutputString), Enums.toEnum(record.get("attempt_status", String.class), AttemptStatus.class).orElseThrow(), record.get("processing_task_queue", String.class), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index 98db2975077d8..da7b3a98474ea 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -8,7 +8,6 @@ import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.AttemptFailureSummary; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; @@ -192,17 +191,6 @@ void writeStats(long jobId, */ void writeAttemptFailureSummary(long jobId, int attemptNumber, AttemptFailureSummary failureSummary) throws IOException; - /** - * Writes the attempt-specific configuration used to build the sync input during the attempt. - * - * @param jobId job id - * @param attemptNumber attempt number - * @param attemptSyncConfig attempt-specific configuration used to build the sync input for this - * attempt - * @throws IOException exception due to interaction with persistence - */ - void writeAttemptSyncConfig(long jobId, int attemptNumber, AttemptSyncConfig attemptSyncConfig) throws IOException; - /** * @param configTypes - the type of config, e.g. sync * @param connectionId - ID of the connection for which the job count should be retrieved diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java index e0984bee6a774..5b585e42e39fe 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java @@ -5,7 +5,6 @@ package io.airbyte.persistence.job.models; import io.airbyte.config.AttemptFailureSummary; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobOutput; import java.nio.file.Path; import java.util.Objects; @@ -20,7 +19,6 @@ public class Attempt { private final AttemptStatus status; private final String processingTaskQueue; private final AttemptFailureSummary failureSummary; - private final AttemptSyncConfig syncConfig; private final Path logPath; private final long updatedAtInSecond; private final long createdAtInSecond; @@ -29,7 +27,6 @@ public class Attempt { public Attempt(final int attemptNumber, final long jobId, final Path logPath, - final @Nullable AttemptSyncConfig syncConfig, final @Nullable JobOutput output, final AttemptStatus status, final String processingTaskQueue, @@ -39,7 +36,6 @@ public Attempt(final int attemptNumber, final @Nullable Long endedAtInSecond) { this.attemptNumber = attemptNumber; this.jobId = jobId; - this.syncConfig = syncConfig; this.output = output; this.status = status; this.processingTaskQueue = processingTaskQueue; @@ -58,10 +54,6 @@ public long getJobId() { return jobId; } - public Optional getSyncConfig() { - return Optional.ofNullable(syncConfig); - } - public Optional getOutput() { return Optional.ofNullable(output); } @@ -111,7 +103,6 @@ public boolean equals(final Object o) { jobId == attempt.jobId && updatedAtInSecond == attempt.updatedAtInSecond && createdAtInSecond == attempt.createdAtInSecond && - Objects.equals(syncConfig, attempt.syncConfig) && Objects.equals(output, attempt.output) && status == attempt.status && Objects.equals(failureSummary, attempt.failureSummary) && @@ -121,8 +112,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(attemptNumber, jobId, syncConfig, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, - endedAtInSecond); + return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); } @Override @@ -130,7 +120,6 @@ public String toString() { return "Attempt{" + "id=" + attemptNumber + ", jobId=" + jobId + - ", syncConfig=" + syncConfig + ", output=" + output + ", status=" + status + ", failureSummary=" + failureSummary + diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java index 5911f28dcdb45..ada40c6ed08fe 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Job.java @@ -132,13 +132,6 @@ public Optional getLastAttempt() { .max(Comparator.comparing(Attempt::getCreatedAtInSecond)); } - public Optional getAttemptByNumber(final int attemptNumber) { - return getAttempts() - .stream() - .filter(a -> a.getAttemptNumber() == attemptNumber) - .findFirst(); - } - public boolean hasRunningAttempt() { return getAttempts().stream().anyMatch(a -> !Attempt.isAttemptInTerminalState(a)); } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java index 25dd7d570acbe..86c0b154891c6 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java @@ -11,12 +11,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import edu.umd.cs.findbugs.annotations.Nullable; import io.airbyte.analytics.TrackingClient; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.map.MoreMaps; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.StandardCheckConnectionOutput; @@ -29,7 +27,6 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.JobPersistence; import io.airbyte.persistence.job.WorkspaceHelper; -import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.Job; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -42,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.UUID; public class JobTracker { @@ -128,9 +124,6 @@ public void trackSync(final Job job, final JobState jobState) { final boolean allowedJob = configType == ConfigType.SYNC || configType == ConfigType.RESET_CONNECTION; Preconditions.checkArgument(allowedJob, "Job type " + configType + " is not allowed!"); final long jobId = job.getId(); - final Optional lastAttempt = job.getLastAttempt(); - final Optional attemptSyncConfig = lastAttempt.flatMap(Attempt::getSyncConfig); - final UUID connectionId = UUID.fromString(job.getScope()); final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId); final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId); @@ -143,7 +136,6 @@ public void trackSync(final Job job, final JobState jobState) { final Map stateMetadata = generateStateMetadata(jobState); final Map syncConfigMetadata = generateSyncConfigMetadata( job.getConfig(), - attemptSyncConfig.orElse(null), sourceDefinition.getSpec().getConnectionSpecification(), destinationDefinition.getSpec().getConnectionSpecification()); @@ -192,27 +184,18 @@ public void trackSyncForInternalFailure(final Long jobId, }); } - private Map generateSyncConfigMetadata( - final JobConfig config, - @Nullable final AttemptSyncConfig attemptSyncConfig, + private Map generateSyncConfigMetadata(final JobConfig config, final JsonNode sourceConfigSchema, final JsonNode destinationConfigSchema) { if (config.getConfigType() == ConfigType.SYNC) { - final Map actorConfigMetadata = new HashMap<>(); - - if (attemptSyncConfig != null) { - final JsonNode sourceConfiguration = attemptSyncConfig.getSourceConfiguration(); - final JsonNode destinationConfiguration = attemptSyncConfig.getDestinationConfiguration(); - - final Map sourceMetadata = configToMetadata(CONFIG + ".source", sourceConfiguration, sourceConfigSchema); - final Map destinationMetadata = configToMetadata(CONFIG + ".destination", destinationConfiguration, destinationConfigSchema); - - actorConfigMetadata.putAll(sourceMetadata); - actorConfigMetadata.putAll(destinationMetadata); - } + final JsonNode sourceConfiguration = config.getSync().getSourceConfiguration(); + final JsonNode destinationConfiguration = config.getSync().getDestinationConfiguration(); + final Map sourceMetadata = configToMetadata(CONFIG + ".source", sourceConfiguration, sourceConfigSchema); + final Map destinationMetadata = configToMetadata(CONFIG + ".destination", destinationConfiguration, destinationConfigSchema); final Map catalogMetadata = getCatalogMetadata(config.getSync().getConfiguredAirbyteCatalog()); - return MoreMaps.merge(actorConfigMetadata, catalogMetadata); + + return MoreMaps.merge(sourceMetadata, destinationMetadata, catalogMetadata); } else { return emptyMap(); } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java index 3fdbf3e6d4ddc..7311ade09c974 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java @@ -34,6 +34,9 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; +import io.airbyte.config.State; +import io.airbyte.config.helpers.StateMessageHelper; +import io.airbyte.config.persistence.StatePersistence; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -44,6 +47,7 @@ import io.airbyte.protocol.models.SyncMode; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -74,6 +78,7 @@ class DefaultJobCreatorTest { private static final UUID WORKSPACE_ID = UUID.randomUUID(); private JobPersistence jobPersistence; + private StatePersistence statePersistence; private JobCreator jobCreator; private ResourceRequirements workerResourceRequirements; @@ -158,12 +163,13 @@ class DefaultJobCreatorTest { @BeforeEach void setup() { jobPersistence = mock(JobPersistence.class); + statePersistence = mock(StatePersistence.class); workerResourceRequirements = new ResourceRequirements() .withCpuLimit("0.2") .withCpuRequest("0.2") .withMemoryLimit("200Mi") .withMemoryRequest("200Mi"); - jobCreator = new DefaultJobCreator(jobPersistence, workerResourceRequirements); + jobCreator = new DefaultJobCreator(jobPersistence, workerResourceRequirements, statePersistence); } @Test @@ -172,8 +178,10 @@ void testCreateSyncJob() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -214,8 +222,10 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withDestinationProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -260,8 +270,10 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -307,8 +319,10 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -361,8 +375,10 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) .withSourceDockerImage(SOURCE_IMAGE_NAME) .withSourceProtocolVersion(SOURCE_PROTOCOL_VERSION) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) @@ -401,16 +417,22 @@ void testCreateResetConnectionJob() throws IOException { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND))); + final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); + when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId())) + .thenReturn(StateMessageHelper.getTypedState(connectionState.getState(), false)); + final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(expectedCatalog) .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) .withResourceRequirements(workerResourceRequirements) .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset)) + .withState(connectionState) .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(false); @@ -453,16 +475,22 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND))); + final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); + when(statePersistence.getCurrentState(STANDARD_SYNC.getConnectionId())) + .thenReturn(StateMessageHelper.getTypedState(connectionState.getState(), false)); + final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withDestinationProtocolVersion(DESTINATION_PROTOCOL_VERSION) .withConfiguredAirbyteCatalog(expectedCatalog) .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) .withResourceRequirements(workerResourceRequirements) .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset)) + .withState(connectionState) .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(false); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index fe02440aea08c..0652e2a8251ec 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; @@ -31,7 +30,6 @@ import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.AttemptFailureSummary; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.FailureReason.FailureType; @@ -43,7 +41,6 @@ import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; -import io.airbyte.config.State; import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.db.Database; @@ -153,7 +150,6 @@ private static Attempt createAttempt(final int id, final long jobId, final Attem jobId, logPath, null, - null, status, null, null, @@ -168,7 +164,6 @@ private static Attempt createUnfinishedAttempt(final int id, final long jobId, f jobId, logPath, null, - null, status, null, null, @@ -317,25 +312,6 @@ void testWriteOutput() throws IOException { assertEquals(List.of(failureReason1, failureReason2), storedNormalizationSummary.getFailures()); } - @Test - @DisplayName("Should be able to read AttemptSyncConfig that was written") - void testWriteAttemptSyncConfig() throws IOException { - final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); - final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); - final Job created = jobPersistence.getJob(jobId); - final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() - .withSourceConfiguration(Jsons.jsonNode(Map.of("source", "s_config_value"))) - .withDestinationConfiguration(Jsons.jsonNode(Map.of("destination", "d_config_value"))) - .withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("state_key", "state_value")))); - - when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242)); - jobPersistence.writeAttemptSyncConfig(jobId, attemptNumber, attemptSyncConfig); - - final Job updated = jobPersistence.getJob(jobId); - assertEquals(Optional.of(attemptSyncConfig), updated.getAttempts().get(0).getSyncConfig()); - assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond()); - } - @Test @DisplayName("Should be able to read attemptFailureSummary that was written") void testWriteAttemptFailureSummary() throws IOException { diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java index 92ea7c68dad69..f8660bd4cb571 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() { } private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) { - return new Attempt(1, 1L, null, null, null, attemptStatus, null, null, 0L, 0L, null); + return new Attempt(1, 1L, null, null, attemptStatus, null, null, 0L, 0L, null); } } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java index 335a7a30cfd6f..9dc147ae99b5a 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java @@ -43,7 +43,7 @@ void testHasRunningAttempt() { private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) { final List attempts = IntStream.range(0, attemptStatuses.length) - .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, null, attemptStatuses[idx], null, null, idx, 0L, null)) + .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, null, idx, 0L, null)) .collect(Collectors.toList()); return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L); } @@ -78,13 +78,6 @@ void testGetLastAttempt() { assertEquals(3, job.getLastAttempt().get().getAttemptNumber()); } - @Test - void testGetAttemptByNumber() { - final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED, AttemptStatus.SUCCEEDED); - assertTrue(job.getAttemptByNumber(2).isPresent()); - assertEquals(2, job.getAttemptByNumber(2).get().getAttemptNumber()); - } - @Test void testValidateStatusTransitionFromPending() { final Job pendingJob = jobWithStatus(JobStatus.PENDING); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java index 086a42eb87d12..b5e3361b2cec2 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java @@ -20,7 +20,6 @@ import io.airbyte.commons.map.MoreMaps; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.AttemptFailureSummary; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.FailureReason; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; @@ -551,11 +550,9 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con .withDestinationSyncMode(DestinationSyncMode.APPEND))); final JobSyncConfig jobSyncConfig = new JobSyncConfig() - .withConfiguredAirbyteCatalog(catalog); - - final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() .withSourceConfiguration(Jsons.jsonNode(ImmutableMap.of("key", "some_value"))) - .withDestinationConfiguration(Jsons.jsonNode(ImmutableMap.of("key", false))); + .withDestinationConfiguration(Jsons.jsonNode(ImmutableMap.of("key", false))) + .withConfiguredAirbyteCatalog(catalog); final JobConfig jobConfig = mock(JobConfig.class); when(jobConfig.getConfigType()).thenReturn(configType); @@ -564,15 +561,11 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con when(jobConfig.getSync()).thenReturn(jobSyncConfig); } - final Attempt attempt = mock(Attempt.class); - when(attempt.getSyncConfig()).thenReturn(Optional.of(attemptSyncConfig)); - final Job job = mock(Job.class); when(job.getId()).thenReturn(jobId); when(job.getConfig()).thenReturn(jobConfig); when(job.getConfigType()).thenReturn(configType); when(job.getScope()).thenReturn(CONNECTION_ID.toString()); - when(job.getLastAttempt()).thenReturn(Optional.of(attempt)); when(job.getAttemptsCount()).thenReturn(700); return job; } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java index 9dd50747ba51c..d8ffb69eaac18 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/TrackingMetadataTest.java @@ -8,7 +8,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobOutput; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSync; @@ -61,9 +60,8 @@ void testgenerateJobAttemptMetadataWithNulls() { .withMeanSecondsBeforeSourceStateMessageEmitted(2L).withMaxSecondsBetweenStateMessageEmittedandCommitted(null); final StandardSyncSummary standardSyncSummary = new StandardSyncSummary().withTotalStats(syncStats); final StandardSyncOutput standardSyncOutput = new StandardSyncOutput().withStandardSyncSummary(standardSyncSummary); - final AttemptSyncConfig attemptSyncConfig = mock(AttemptSyncConfig.class); final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput); - final Attempt attempt = new Attempt(0, 10L, Path.of("test"), attemptSyncConfig, jobOutput, AttemptStatus.SUCCEEDED, null, null, 100L, 100L, 99L); + final Attempt attempt = new Attempt(0, 10L, Path.of("test"), jobOutput, AttemptStatus.SUCCEEDED, null, null, 100L, 100L, 99L); final Job job = mock(Job.class); when(job.getAttempts()).thenReturn(List.of(attempt)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java index acc500ec9a54d..64e8c4730d8fa 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java @@ -8,7 +8,6 @@ import io.airbyte.api.generated.AttemptApi; import io.airbyte.api.model.generated.InternalOperationResult; -import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody; import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; import io.airbyte.commons.server.handlers.AttemptHandler; @@ -47,12 +46,4 @@ public InternalOperationResult setWorkflowInAttempt(@Body final SetWorkflowInAtt return ApiHelper.execute(() -> attemptHandler.setWorkflowInAttempt(requestBody)); } - @Override - @Post(uri = "/save_sync_config", - processes = MediaType.APPLICATION_JSON) - @Secured({ADMIN}) - public InternalOperationResult saveSyncConfig(@Body final SaveAttemptSyncConfigRequestBody requestBody) { - return ApiHelper.execute(() -> attemptHandler.saveSyncConfig(requestBody)); - } - } diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 597fa14e84f7f..2234d746c2515 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -59,7 +59,6 @@ dependencies { implementation project(':airbyte-commons-protocol') implementation project(':airbyte-commons-temporal') implementation project(':airbyte-commons-worker') - implementation project(':airbyte-commons-server') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') implementation project(':airbyte-config:init') diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java index 63fdd1bb3b965..49edebdf72662 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApplicationBeanFactory.java @@ -14,6 +14,7 @@ import io.airbyte.config.Configs.SecretPersistenceType; import io.airbyte.config.Configs.TrackingStrategy; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.StatePersistence; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricClientFactory; @@ -80,10 +81,12 @@ public Supplier currentSecondsSupplier() { @Singleton public DefaultJobCreator defaultJobCreator(final JobPersistence jobPersistence, - @Named("defaultWorkerConfigs") final WorkerConfigs defaultWorkerConfigs) { + @Named("defaultWorkerConfigs") final WorkerConfigs defaultWorkerConfigs, + final StatePersistence statePersistence) { return new DefaultJobCreator( jobPersistence, - defaultWorkerConfigs.getResourceRequirements()); + defaultWorkerConfigs.getResourceRequirements(), + statePersistence); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java index 8aaa51bb68b4b..739ba9d8efc0a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java @@ -6,17 +6,16 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.TemporalClient; import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.commons.temporal.TemporalResponse; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary.ReplicationStatus; -import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.Job; import io.airbyte.workers.JobStatus; import io.airbyte.workers.OutputAndStatus; @@ -45,18 +44,14 @@ public WorkerRun create(final Job job) { public CheckedSupplier, Exception> createSupplier(final Job job, final int attemptId) { final TemporalJobType temporalJobType = toTemporalJobType(job.getConfigType()); final UUID connectionId = UUID.fromString(job.getScope()); - return switch (job.getConfigType()) { case SYNC -> () -> { - final AttemptSyncConfig attemptConfig = getAttemptSyncConfig(job, attemptId); final TemporalResponse output = temporalClient.submitSync(job.getId(), - attemptId, job.getConfig().getSync(), attemptConfig, connectionId); + attemptId, job.getConfig().getSync(), connectionId); return toOutputAndStatus(output); }; case RESET_CONNECTION -> () -> { final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection(); - final AttemptSyncConfig attemptConfig = getAttemptSyncConfig(job, attemptId); - final JobSyncConfig config = new JobSyncConfig() .withNamespaceDefinition(resetConnection.getNamespaceDefinition()) .withNamespaceFormat(resetConnection.getNamespaceFormat()) @@ -64,6 +59,8 @@ public CheckedSupplier, Exception> createSupplier(fin .withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) .withDestinationDockerImage(resetConnection.getDestinationDockerImage()) .withDestinationProtocolVersion(resetConnection.getDestinationProtocolVersion()) + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(resetConnection.getDestinationConfiguration()) .withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog()) .withOperationSequence(resetConnection.getOperationSequence()) .withResourceRequirements(resetConnection.getResourceRequirements()) @@ -72,18 +69,13 @@ public CheckedSupplier, Exception> createSupplier(fin .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()); - final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, attemptConfig, connectionId); + final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId); return toOutputAndStatus(output); }; default -> throw new IllegalArgumentException("Does not support job type: " + temporalJobType); }; } - private static AttemptSyncConfig getAttemptSyncConfig(final Job job, final int attemptId) { - return job.getAttemptByNumber(attemptId).flatMap(Attempt::getSyncConfig).orElseThrow( - () -> new IllegalStateException(String.format("AttemptSyncConfig for job %s attemptId %s not found", job.getId(), attemptId))); - } - private static TemporalJobType toTemporalJobType(final ConfigType jobType) { return switch (jobType) { case GET_SPEC -> TemporalJobType.GET_SPEC; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 7986a36e7c787..403c70fe33d6d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -9,34 +9,19 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; -import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.generated.AttemptApi; -import io.airbyte.api.client.generated.StateApi; -import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; -import io.airbyte.api.client.model.generated.ConnectionState; -import io.airbyte.api.client.model.generated.ConnectionStateType; -import io.airbyte.api.client.model.generated.SaveAttemptSyncConfigRequestBody; import io.airbyte.commons.docker.DockerUtils; -import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.server.converters.ApiPojoConverters; import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; -import io.airbyte.config.AttemptSyncConfig; -import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.ResetSourceConfiguration; -import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; -import io.airbyte.config.State; -import io.airbyte.config.StateWrapper; -import io.airbyte.config.helpers.StateMessageHelper; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.JobPersistence; @@ -44,13 +29,11 @@ import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; -import io.airbyte.workers.helper.StateConverter; import io.airbyte.workers.utils.ConfigReplacer; import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,43 +44,12 @@ public class GenerateInputActivityImpl implements GenerateInputActivity { private final JobPersistence jobPersistence; private final ConfigRepository configRepository; - private final AttemptApi attemptApi; - private final StateApi stateApi; - private final FeatureFlags featureFlags; - private static final Logger LOGGER = LoggerFactory.getLogger(GenerateInputActivity.class); public GenerateInputActivityImpl(final JobPersistence jobPersistence, - final ConfigRepository configRepository, - final StateApi stateApi, - final AttemptApi attemptApi, - final FeatureFlags featureFlags) { + final ConfigRepository configRepository) { this.jobPersistence = jobPersistence; this.configRepository = configRepository; - this.stateApi = stateApi; - this.attemptApi = attemptApi; - this.featureFlags = featureFlags; - } - - private Optional getCurrentConnectionState(final UUID connectionId) { - final ConnectionState state = AirbyteApiClient.retryWithJitter( - () -> stateApi.getState(new ConnectionIdRequestBody().connectionId(connectionId)), - "get state"); - - if (state.getStateType() == ConnectionStateType.NOT_SET) - return Optional.empty(); - - final StateWrapper internalState = StateConverter.clientToInternal(state); - return Optional.of(StateMessageHelper.getState(internalState)); - } - - private void saveAttemptSyncConfig(final long jobId, final int attemptNumber, final UUID connectionId, final AttemptSyncConfig attemptSyncConfig) { - AirbyteApiClient.retryWithJitter( - () -> attemptApi.saveSyncConfig(new SaveAttemptSyncConfigRequestBody() - .jobId(jobId) - .attemptNumber(attemptNumber) - .syncConfig(ApiPojoConverters.attemptSyncConfigToClient(attemptSyncConfig, connectionId, featureFlags.useStreamCapableState()))), - "set attempt sync config"); } @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @@ -113,26 +65,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { final Job job = jobPersistence.getJob(jobId); final ConfigType jobConfigType = job.getConfig().getConfigType(); - - final UUID connectionId = UUID.fromString(job.getScope()); - final StandardSync standardSync = configRepository.getStandardSync(connectionId); - - final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig(); - getCurrentConnectionState(connectionId).ifPresent(attemptSyncConfig::setState); - if (ConfigType.SYNC.equals(jobConfigType)) { config = job.getConfig().getSync(); - final SourceConnection source = configRepository.getSourceConnection(standardSync.getSourceId()); - attemptSyncConfig.setSourceConfiguration(source.getConfiguration()); } else if (ConfigType.RESET_CONNECTION.equals(jobConfigType)) { final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection(); final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration(); - - // null check for backwards compatibility with reset jobs that did not have a - // resetSourceConfiguration - attemptSyncConfig - .setSourceConfiguration(resetSourceConfiguration == null ? Jsons.emptyObject() : Jsons.jsonNode(resetSourceConfiguration)); - config = new JobSyncConfig() .withNamespaceDefinition(resetConnection.getNamespaceDefinition()) .withNamespaceFormat(resetConnection.getNamespaceFormat()) @@ -140,9 +77,14 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) .withDestinationDockerImage(resetConnection.getDestinationDockerImage()) .withDestinationProtocolVersion(resetConnection.getDestinationProtocolVersion()) + // null check for backwards compatibility with reset jobs that did not have a + // resetSourceConfiguration + .withSourceConfiguration(resetSourceConfiguration == null ? Jsons.emptyObject() : Jsons.jsonNode(resetSourceConfiguration)) + .withDestinationConfiguration(resetConnection.getDestinationConfiguration()) .withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog()) .withOperationSequence(resetConnection.getOperationSequence()) .withResourceRequirements(resetConnection.getResourceRequirements()) + .withState(resetConnection.getState()) .withIsSourceCustomConnector(resetConnection.getIsSourceCustomConnector()) .withIsDestinationCustomConnector(resetConnection.getIsDestinationCustomConnector()) .withWorkspaceId(resetConnection.getWorkspaceId()); @@ -156,14 +98,14 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt); - final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); - attemptSyncConfig.setDestinationConfiguration(destination.getConfiguration()); + final UUID connectionId = UUID.fromString(job.getScope()); + final StandardSync standardSync = configRepository.getStandardSync(connectionId); final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromSource(standardSync.getSourceId()); final StandardDestinationDefinition destinationDefinition = - configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId()); + configRepository.getDestinationDefinitionFromDestination(standardSync.getDestinationId()); final String destinationNormalizationDockerImage = destinationDefinition.getNormalizationConfig() != null ? DockerUtils.getTaggedImageName(destinationDefinition.getNormalizationConfig().getNormalizationRepository(), destinationDefinition.getNormalizationConfig().getNormalizationTag()) @@ -179,7 +121,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withProtocolVersion(config.getSourceProtocolVersion()) .withIsCustomConnector(config.getIsSourceCustomConnector()) .withAllowedHosts(ConfigType.RESET_CONNECTION.equals(jobConfigType) ? null - : configReplacer.getAllowedHosts(sourceDefinition.getAllowedHosts(), attemptSyncConfig.getSourceConfiguration())); + : configReplacer.getAllowedHosts(sourceDefinition.getAllowedHosts(), config.getSourceConfiguration())); final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(jobId)) @@ -190,7 +132,7 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withNormalizationDockerImage(destinationNormalizationDockerImage) .withSupportsDbt(destinationDefinition.getSupportsDbt()) .withNormalizationIntegrationType(normalizationIntegrationType) - .withAllowedHosts(configReplacer.getAllowedHosts(destinationDefinition.getAllowedHosts(), attemptSyncConfig.getDestinationConfiguration())); + .withAllowedHosts(configReplacer.getAllowedHosts(destinationDefinition.getAllowedHosts(), config.getDestinationConfiguration())); final StandardSyncInput syncInput = new StandardSyncInput() .withNamespaceDefinition(config.getNamespaceDefinition()) @@ -198,20 +140,18 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withPrefix(config.getPrefix()) .withSourceId(standardSync.getSourceId()) .withDestinationId(standardSync.getDestinationId()) - .withSourceConfiguration(attemptSyncConfig.getSourceConfiguration()) - .withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration()) + .withSourceConfiguration(config.getSourceConfiguration()) + .withDestinationConfiguration(config.getDestinationConfiguration()) .withOperationSequence(config.getOperationSequence()) .withWebhookOperationConfigs(config.getWebhookOperationConfigs()) .withCatalog(config.getConfiguredAirbyteCatalog()) - .withState(attemptSyncConfig.getState()) + .withState(config.getState()) .withResourceRequirements(config.getResourceRequirements()) .withSourceResourceRequirements(config.getSourceResourceRequirements()) .withDestinationResourceRequirements(config.getDestinationResourceRequirements()) .withConnectionId(standardSync.getConnectionId()) .withWorkspaceId(config.getWorkspaceId()); - saveAttemptSyncConfig(jobId, attempt, connectionId, attemptSyncConfig); - return new GeneratedJobInput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); } catch (final Exception e) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java index 6dbc107926a7a..14d1f99749f0b 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/run/TemporalWorkerRunFactoryTest.java @@ -11,16 +11,16 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.TemporalClient; import io.airbyte.commons.temporal.TemporalResponse; -import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOutput; -import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.Job; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.WorkerConstants; @@ -28,7 +28,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,7 +43,6 @@ class TemporalWorkerRunFactoryTest { private TemporalClient temporalClient; private TemporalWorkerRunFactory workerRunFactory; private Job job; - private AttemptSyncConfig attemptSyncConfig; @BeforeEach void setup() throws IOException { @@ -57,11 +55,7 @@ void setup() throws IOException { "unknown airbyte version", mock(FeatureFlags.class)); job = mock(Job.class, RETURNS_DEEP_STUBS); - final Attempt attempt = mock(Attempt.class, RETURNS_DEEP_STUBS); - attemptSyncConfig = mock(AttemptSyncConfig.class); - when(attempt.getSyncConfig()).thenReturn(Optional.of(attemptSyncConfig)); when(job.getId()).thenReturn(JOB_ID); - when(job.getAttemptByNumber(ATTEMPT_ID)).thenReturn(Optional.of(attempt)); when(job.getAttemptsCount()).thenReturn(ATTEMPT_ID); when(job.getScope()).thenReturn(CONNECTION_ID.toString()); } @@ -71,11 +65,11 @@ void setup() throws IOException { void testSync() throws Exception { when(job.getConfigType()).thenReturn(ConfigType.SYNC); final TemporalResponse mockResponse = mock(TemporalResponse.class); - when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), attemptSyncConfig, + when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), CONNECTION_ID)).thenReturn(mockResponse); final WorkerRun workerRun = workerRunFactory.create(job); workerRun.call(); - verify(temporalClient).submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), attemptSyncConfig, CONNECTION_ID); + verify(temporalClient).submitSync(JOB_ID, ATTEMPT_ID, job.getConfig().getSync(), CONNECTION_ID); assertEquals(jobRoot, workerRun.getJobRoot()); } @@ -84,6 +78,7 @@ void testSync() throws Exception { void testResetConnection() throws Exception { final JobResetConnectionConfig resetConfig = new JobResetConnectionConfig() .withDestinationDockerImage("airbyte/fusion_reactor") + .withDestinationConfiguration(Jsons.jsonNode(ImmutableMap.of("a", 1))) .withOperationSequence(List.of(new StandardSyncOperation().withName("b"))) .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()) .withIsSourceCustomConnector(false) @@ -91,20 +86,22 @@ void testResetConnection() throws Exception { final JobSyncConfig syncConfig = new JobSyncConfig() .withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB) .withDestinationDockerImage(resetConfig.getDestinationDockerImage()) + .withDestinationConfiguration(resetConfig.getDestinationConfiguration()) .withOperationSequence(List.of(new StandardSyncOperation().withName("b"))) + .withSourceConfiguration(Jsons.emptyObject()) .withConfiguredAirbyteCatalog(resetConfig.getConfiguredAirbyteCatalog()) .withIsSourceCustomConnector(false) .withIsDestinationCustomConnector(false); when(job.getConfigType()).thenReturn(ConfigType.RESET_CONNECTION); when(job.getConfig().getResetConnection()).thenReturn(resetConfig); final TemporalResponse mockResponse = mock(TemporalResponse.class); - when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID)).thenReturn(mockResponse); + when(temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID)).thenReturn(mockResponse); final WorkerRun workerRun = workerRunFactory.create(job); workerRun.call(); final ArgumentCaptor argument = ArgumentCaptor.forClass(JobSyncConfig.class); - verify(temporalClient).submitSync(eq(JOB_ID), eq(ATTEMPT_ID), argument.capture(), eq(attemptSyncConfig), eq(CONNECTION_ID)); + verify(temporalClient).submitSync(eq(JOB_ID), eq(ATTEMPT_ID), argument.capture(), eq(CONNECTION_ID)); assertEquals(syncConfig, argument.getValue()); assertEquals(jobRoot, workerRun.getJobRoot()); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java deleted file mode 100644 index 1a71296660e22..0000000000000 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal.scheduling.activities; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.api.client.generated.AttemptApi; -import io.airbyte.api.client.generated.StateApi; -import io.airbyte.api.client.invoker.generated.ApiException; -import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; -import io.airbyte.api.client.model.generated.ConnectionState; -import io.airbyte.api.client.model.generated.ConnectionStateType; -import io.airbyte.api.client.model.generated.SaveAttemptSyncConfigRequestBody; -import io.airbyte.commons.features.FeatureFlags; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.server.converters.ApiPojoConverters; -import io.airbyte.config.AttemptSyncConfig; -import io.airbyte.config.DestinationConnection; -import io.airbyte.config.JobConfig; -import io.airbyte.config.JobConfig.ConfigType; -import io.airbyte.config.JobResetConnectionConfig; -import io.airbyte.config.JobSyncConfig; -import io.airbyte.config.SourceConnection; -import io.airbyte.config.StandardDestinationDefinition; -import io.airbyte.config.StandardSourceDefinition; -import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSyncInput; -import io.airbyte.config.State; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.models.IntegrationLauncherConfig; -import io.airbyte.persistence.job.models.Job; -import io.airbyte.persistence.job.models.JobRunConfig; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.validation.json.JsonValidationException; -import io.airbyte.workers.WorkerConstants; -import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput; -import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput; -import java.io.IOException; -import java.util.Map; -import java.util.UUID; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class GenerateInputActivityTest { - - static private AttemptApi attemptApi; - static private JobPersistence jobPersistence; - static private ConfigRepository configRepository; - static private GenerateInputActivityImpl generateInputActivity; - static private Job job; - - static private final JsonNode SOURCE_CONFIGURATION = Jsons.jsonNode(Map.of("source_key", "source_value")); - static private final JsonNode DESTINATION_CONFIGURATION = Jsons.jsonNode(Map.of("destination_key", "destination_value")); - static private final State STATE = new State().withState(Jsons.jsonNode(Map.of("state_key", "state_value"))); - - static private final long JOB_ID = 1; - static private final int ATTEMPT_ID = 1; - static private final UUID SOURCE_ID = UUID.randomUUID(); - static private final UUID DESTINATION_ID = UUID.randomUUID(); - static private final UUID CONNECTION_ID = UUID.randomUUID(); - - @BeforeEach - void setUp() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { - final StateApi stateApi = mock(StateApi.class); - final FeatureFlags featureFlags = mock(FeatureFlags.class); - - attemptApi = mock(AttemptApi.class); - jobPersistence = mock(JobPersistence.class); - configRepository = mock(ConfigRepository.class); - generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags); - - job = mock(Job.class); - - when(jobPersistence.getJob(JOB_ID)).thenReturn(job); - - final UUID destinationDefinitionId = UUID.randomUUID(); - - final DestinationConnection destinationConnection = new DestinationConnection() - .withDestinationId(DESTINATION_ID) - .withDestinationDefinitionId(destinationDefinitionId) - .withConfiguration(DESTINATION_CONFIGURATION); - when(configRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destinationConnection); - when(configRepository.getStandardDestinationDefinition(destinationDefinitionId)).thenReturn(mock(StandardDestinationDefinition.class)); - when(configRepository.getSourceDefinitionFromSource(SOURCE_ID)).thenReturn(mock(StandardSourceDefinition.class)); - - final StandardSync standardSync = new StandardSync() - .withSourceId(SOURCE_ID) - .withDestinationId(DESTINATION_ID); - when(configRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync); - - when(stateApi.getState(new ConnectionIdRequestBody().connectionId(CONNECTION_ID))) - .thenReturn(new ConnectionState() - .stateType(ConnectionStateType.LEGACY) - .state(STATE.getState())); - } - - @Test - void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundException, IOException, ApiException { - final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID); - - final SourceConnection sourceConnection = new SourceConnection() - .withSourceId(SOURCE_ID) - .withConfiguration(SOURCE_CONFIGURATION); - when(configRepository.getSourceConnection(SOURCE_ID)).thenReturn(sourceConnection); - - final JobSyncConfig jobSyncConfig = new JobSyncConfig() - .withWorkspaceId(UUID.randomUUID()) - .withDestinationDockerImage("destinationDockerImage") - .withSourceDockerImage("sourceDockerImage") - .withConfiguredAirbyteCatalog(mock(ConfiguredAirbyteCatalog.class)); - - final JobConfig jobConfig = new JobConfig() - .withConfigType(ConfigType.SYNC) - .withSync(jobSyncConfig); - - when(job.getConfig()).thenReturn(jobConfig); - when(job.getScope()).thenReturn(CONNECTION_ID.toString()); - - final StandardSyncInput expectedStandardSyncInput = new StandardSyncInput() - .withWorkspaceId(jobSyncConfig.getWorkspaceId()) - .withSourceId(SOURCE_ID) - .withDestinationId(DESTINATION_ID) - .withSourceConfiguration(SOURCE_CONFIGURATION) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) - .withState(STATE) - .withCatalog(jobSyncConfig.getConfiguredAirbyteCatalog()) - .withWorkspaceId(jobSyncConfig.getWorkspaceId()); - - final JobRunConfig expectedJobRunConfig = new JobRunConfig() - .withJobId(String.valueOf(JOB_ID)) - .withAttemptId((long) ATTEMPT_ID); - - final IntegrationLauncherConfig expectedSourceLauncherConfig = new IntegrationLauncherConfig() - .withJobId(String.valueOf(JOB_ID)) - .withAttemptId((long) ATTEMPT_ID) - .withDockerImage(jobSyncConfig.getSourceDockerImage()); - - final IntegrationLauncherConfig expectedDestinationLauncherConfig = new IntegrationLauncherConfig() - .withJobId(String.valueOf(JOB_ID)) - .withAttemptId((long) ATTEMPT_ID) - .withDockerImage(jobSyncConfig.getDestinationDockerImage()); - - final GeneratedJobInput expectedGeneratedJobInput = new GeneratedJobInput( - expectedJobRunConfig, - expectedSourceLauncherConfig, - expectedDestinationLauncherConfig, - expectedStandardSyncInput); - - final GeneratedJobInput generatedJobInput = generateInputActivity.getSyncWorkflowInput(syncInput); - assertEquals(expectedGeneratedJobInput, generatedJobInput); - - final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig() - .withSourceConfiguration(SOURCE_CONFIGURATION) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) - .withState(STATE); - - verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody() - .jobId(JOB_ID) - .attemptNumber(ATTEMPT_ID) - .syncConfig(ApiPojoConverters.attemptSyncConfigToClient(expectedAttemptSyncConfig, CONNECTION_ID, true))); - } - - @Test - void testGetResetSyncWorkflowInput() throws IOException, ApiException { - final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID); - - final JobResetConnectionConfig jobResetConfig = new JobResetConnectionConfig() - .withWorkspaceId(UUID.randomUUID()) - .withDestinationDockerImage("destinationDockerImage") - .withConfiguredAirbyteCatalog(mock(ConfiguredAirbyteCatalog.class)); - - final JobConfig jobConfig = new JobConfig() - .withConfigType(ConfigType.RESET_CONNECTION) - .withResetConnection(jobResetConfig); - - when(job.getConfig()).thenReturn(jobConfig); - when(job.getScope()).thenReturn(CONNECTION_ID.toString()); - - final StandardSyncInput expectedStandardSyncInput = new StandardSyncInput() - .withWorkspaceId(jobResetConfig.getWorkspaceId()) - .withSourceId(SOURCE_ID) - .withDestinationId(DESTINATION_ID) - .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) - .withState(STATE) - .withCatalog(jobResetConfig.getConfiguredAirbyteCatalog()) - .withWorkspaceId(jobResetConfig.getWorkspaceId()); - - final JobRunConfig expectedJobRunConfig = new JobRunConfig() - .withJobId(String.valueOf(JOB_ID)) - .withAttemptId((long) ATTEMPT_ID); - - final IntegrationLauncherConfig expectedSourceLauncherConfig = new IntegrationLauncherConfig() - .withJobId(String.valueOf(JOB_ID)) - .withAttemptId((long) ATTEMPT_ID) - .withDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB); - - final IntegrationLauncherConfig expectedDestinationLauncherConfig = new IntegrationLauncherConfig() - .withJobId(String.valueOf(JOB_ID)) - .withAttemptId((long) ATTEMPT_ID) - .withDockerImage(jobResetConfig.getDestinationDockerImage()); - - final GeneratedJobInput expectedGeneratedJobInput = new GeneratedJobInput( - expectedJobRunConfig, - expectedSourceLauncherConfig, - expectedDestinationLauncherConfig, - expectedStandardSyncInput); - - final GeneratedJobInput generatedJobInput = generateInputActivity.getSyncWorkflowInput(syncInput); - assertEquals(expectedGeneratedJobInput, generatedJobInput); - - final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig() - .withSourceConfiguration(Jsons.emptyObject()) - .withDestinationConfiguration(DESTINATION_CONFIGURATION) - .withState(STATE); - - verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody() - .jobId(JOB_ID) - .attemptNumber(ATTEMPT_ID) - .syncConfig(ApiPojoConverters.attemptSyncConfigToClient(expectedAttemptSyncConfig, CONNECTION_ID, true))); - } - -} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 75ecf4b043d5c..bf95d39b87962 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -195,7 +195,7 @@ void createResetJob() throws JsonValidationException, ConfigNotFoundException, I @Test void isLastJobOrAttemptFailureTrueTest() throws Exception { final int activeAttemptNumber = 0; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.SUCCEEDED, 4L, 4L, 5L); @@ -215,7 +215,7 @@ void isLastJobOrAttemptFailureTrueTest() throws Exception { @Test void isLastJobOrAttemptFailureFalseTest() throws Exception { final int activeAttemptNumber = 0; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.FAILED, 4L, 4L, 5L); @@ -234,9 +234,9 @@ void isLastJobOrAttemptFailureFalseTest() throws Exception { @Test void isLastJobOrAttemptFailurePreviousAttemptFailureTest() throws Exception { - final Attempt previousAttempt = new Attempt(0, 1, Path.of(""), null, null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); + final Attempt previousAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); final int activeAttemptNumber = 1; - final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt activeAttempt = new Attempt(activeAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job previousJob = new Job(PREVIOUS_JOB_ID, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(), JobStatus.SUCCEEDED, 4L, 4L, 5L); @@ -474,9 +474,9 @@ void setJobCancelledWrapException() throws IOException { @Test void ensureCleanJobState() throws IOException { - final Attempt failedAttempt = new Attempt(0, 1, Path.of(""), null, null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); + final Attempt failedAttempt = new Attempt(0, 1, Path.of(""), null, AttemptStatus.FAILED, null, null, 2L, 3L, 3L); final int runningAttemptNumber = 1; - final Attempt runningAttempt = new Attempt(runningAttemptNumber, 1, Path.of(""), null, null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); + final Attempt runningAttempt = new Attempt(runningAttemptNumber, 1, Path.of(""), null, AttemptStatus.RUNNING, null, null, 4L, 5L, null); final Job runningJob = new Job(1, ConfigType.SYNC, CONNECTION_ID.toString(), new JobConfig(), List.of(failedAttempt, runningAttempt), JobStatus.RUNNING, 2L, 2L, 3L); diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index feb39b876cdc4..c772bcb886bce 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -214,7 +214,6 @@

Table of Contents

Attempt

Connection

@@ -274,7 +273,6 @@

Internal

  • post /v1/state/create_or_update
  • post /v1/jobs/get_normalization_status
  • post /v1/attempt/save_stats
  • -
  • post /v1/attempt/save_sync_config
  • post /v1/attempt/set_workflow_in_attempt
  • post /v1/sources/write_discover_catalog_result
  • @@ -409,58 +407,6 @@

    Request body

    -

    Return type

    - - - - -

    Example data

    -
    Content-Type: application/json
    -
    {
    -  "succeeded" : true
    -}
    - -

    Produces

    - This API call produces the following media types according to the Accept request header; - the media type will be conveyed by the Content-Type response header. -
      -
    • application/json
    • -
    - -

    Responses

    -

    200

    - Successful Operation - InternalOperationResult - -
    -
    -
    - Up -
    post /v1/attempt/save_sync_config
    -
    For worker to save the AttemptSyncConfig for an attempt. (saveSyncConfig)
    -
    - - -

    Consumes

    - This API call consumes the following media types via the Content-Type request header: -
      -
    • application/json
    • -
    - -

    Request body

    -
    -
    SaveAttemptSyncConfigRequestBody SaveAttemptSyncConfigRequestBody (required)
    - -
    Body Parameter
    - -
    - - - -

    Return type

    InternalOperationResult @@ -4147,58 +4093,6 @@

    Request body

    -

    Return type

    - - - - -

    Example data

    -
    Content-Type: application/json
    -
    {
    -  "succeeded" : true
    -}
    - -

    Produces

    - This API call produces the following media types according to the Accept request header; - the media type will be conveyed by the Content-Type response header. -
      -
    • application/json
    • -
    - -

    Responses

    -

    200

    - Successful Operation - InternalOperationResult -
    -
    -
    -
    - Up -
    post /v1/attempt/save_sync_config
    -
    For worker to save the AttemptSyncConfig for an attempt. (saveSyncConfig)
    -
    - - -

    Consumes

    - This API call consumes the following media types via the Content-Type request header: -
      -
    • application/json
    • -
    - -

    Request body

    -
    -
    SaveAttemptSyncConfigRequestBody SaveAttemptSyncConfigRequestBody (required)
    - -
    Body Parameter
    - -
    - - - -

    Return type

    -
    -

    AttemptSyncConfig - Up

    -
    -
    -
    sourceConfiguration
    -
    destinationConfiguration
    -
    state (optional)
    -
    -
    -
    -

    SaveAttemptSyncConfigRequestBody - Up

    -
    -
    -
    jobId
    Long format: int64
    -
    attemptNumber
    Integer format: int32
    -
    syncConfig
    -
    -