Skip to content

Commit

Permalink
add connection status to sourceDiscoverSchemaRead
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Nov 23, 2022
1 parent 75cfdd8 commit 0def914
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 14 deletions.
2 changes: 2 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2935,6 +2935,8 @@ components:
$ref: "#/components/schemas/CatalogDiff"
breakingChange:
type: boolean
connectionStatus:
$ref: "#/components/schemas/ConnectionStatus"
SourceSearch:
type: object
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId;
Expand All @@ -39,6 +40,7 @@
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.ErrorCode;
import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult;
Expand Down Expand Up @@ -95,6 +97,7 @@ public class SchedulerHandler {
private final JobPersistence jobPersistence;
private final JobConverter jobConverter;
private final EventRunner eventRunner;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public SchedulerHandler(final ConfigRepository configRepository,
final SecretsRepositoryReader secretsRepositoryReader,
Expand All @@ -114,7 +117,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
jobPersistence,
eventRunner,
new JobConverter(workerEnvironment, logConfigs),
connectionsHandler);
connectionsHandler,
new EnvVariableFeatureFlags());
}

@VisibleForTesting
Expand All @@ -126,7 +130,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final EventRunner eventRunner,
final JobConverter jobConverter,
final ConnectionsHandler connectionsHandler) {
final ConnectionsHandler connectionsHandler,
final EnvVariableFeatureFlags envVariableFeatureFlags) {
this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -136,6 +141,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.eventRunner = eventRunner;
this.jobConverter = jobConverter;
this.connectionsHandler = connectionsHandler;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -367,8 +373,15 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
ConnectionStatus connectionStatus;
if (envVariableFeatureFlags.autoDetectSchema() && containsBreakingChange) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = ConnectionStatus.ACTIVE;
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange);
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
*/
diff = refreshedCatalog.get().getCatalogDiff();
connection.setBreakingChange(refreshedCatalog.get().getBreakingChange());
connection.setStatus(refreshedCatalog.get().getConnectionStatus());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId;
Expand All @@ -45,6 +46,7 @@
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.temporal.ErrorCode;
Expand Down Expand Up @@ -145,6 +147,7 @@ class SchedulerHandlerTest {
private EventRunner eventRunner;
private JobConverter jobConverter;
private ConnectionsHandler connectionsHandler;
private EnvVariableFeatureFlags envVariableFeatureFlags;

@BeforeEach
void setup() {
Expand All @@ -162,6 +165,7 @@ void setup() {
jobPersistence = mock(JobPersistence.class);
eventRunner = mock(EventRunner.class);
connectionsHandler = mock(ConnectionsHandler.class);
envVariableFeatureFlags = mock(EnvVariableFeatureFlags.class);

jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY));

Expand All @@ -174,7 +178,8 @@ void setup() {
jobPersistence,
eventRunner,
jobConverter,
connectionsHandler);
connectionsHandler,
envVariableFeatureFlags);
}

@Test
Expand Down Expand Up @@ -619,7 +624,8 @@ void testDiscoverSchemaFromSourceIdWithConnectionIdBreaking() throws IOException
final AirbyteCatalog persistenceCatalog = Jsons.object(actorCatalog.getCatalog(),
io.airbyte.protocol.models.AirbyteCatalog.class);
final io.airbyte.api.model.generated.AirbyteCatalog expectedActorCatalog = CatalogConverter.toApi(persistenceCatalog);
final ConnectionUpdate expectedConnectionUpdate = new ConnectionUpdate().connectionId(connectionId).breakingChange(true);
final ConnectionUpdate expectedConnectionUpdate =
new ConnectionUpdate().connectionId(connectionId).breakingChange(true).status(ConnectionStatus.ACTIVE);

final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);
assertEquals(actual.getCatalogDiff(), catalogDiff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition;
Expand Down Expand Up @@ -172,9 +173,10 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio
final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, destinationDefinition);

final StandardSync standardSync =
ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false);
ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), false, Status.ACTIVE);
final StandardSync brokenStandardSync =
ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true);
ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId(), true, Status.INACTIVE);

when(configRepository.listWorkspaceStandardSyncs(sourceRead.getWorkspaceId(), false))
.thenReturn(Collections.singletonList(standardSync));
when(configRepository.getSourceAndDefinitionsFromSourceIds(Collections.singletonList(source.getSourceId())))
Expand Down Expand Up @@ -276,7 +278,7 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio
.streamDescriptor(new io.airbyte.api.model.generated.StreamDescriptor().name("users-data1"))
.updateStream(null))));

expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(connectionRead, sourceRead, destinationRead,
expectedWithNewSchemaAndBreakingChange = expectedWebBackendConnectionReadObject(brokenConnectionRead, sourceRead, destinationRead,
new OperationReadList().operations(expected.getOperations()), SchemaChange.BREAKING, now, modifiedCatalog, null)
.catalogDiff(new CatalogDiff().transforms(List.of(
new StreamTransform().transformType(TransformTypeEnum.ADD_STREAM)
Expand Down Expand Up @@ -417,7 +419,7 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchema() throws ConfigNotFoun
when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID()));
SourceDiscoverSchemaRead schemaRead =
new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog())
.breakingChange(false);
.breakingChange(false).connectionStatus(ConnectionStatus.ACTIVE);
when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead);

final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead,
Expand All @@ -434,10 +436,10 @@ void testWebBackendGetConnectionWithDiscoveryAndNewSchemaBreakingChange() throws
when(configRepository.getActorCatalogById(any())).thenReturn(new ActorCatalog().withId(UUID.randomUUID()));
SourceDiscoverSchemaRead schemaRead =
new SourceDiscoverSchemaRead().catalogDiff(expectedWithNewSchema.getCatalogDiff()).catalog(expectedWithNewSchema.getSyncCatalog())
.breakingChange(true);
.breakingChange(true).connectionStatus(ConnectionStatus.INACTIVE);
when(schedulerHandler.discoverSchemaForSourceFromSourceId(any())).thenReturn(schemaRead);

final WebBackendConnectionRead result = testWebBackendGetConnection(true, connectionRead,
final WebBackendConnectionRead result = testWebBackendGetConnection(true, brokenConnectionRead,
operationReadList);
assertEquals(expectedWithNewSchemaAndBreakingChange, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.config.Schedule.TimeUnit;
import io.airbyte.config.ScheduleData;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -107,7 +108,10 @@ public static StandardSync generateSyncWithDestinationId(final UUID destinationI
.withManual(true);
}

public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, final UUID destinationId, final boolean isBroken) {
public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId,
final UUID destinationId,
final boolean isBroken,
final Status status) {
final UUID connectionId = UUID.randomUUID();

return new StandardSync()
Expand All @@ -116,7 +120,7 @@ public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sou
.withNamespaceDefinition(NamespaceDefinitionType.SOURCE)
.withNamespaceFormat(null)
.withPrefix(STANDARD_SYNC_PREFIX)
.withStatus(StandardSync.Status.ACTIVE)
.withStatus(status)
.withCatalog(generateBasicConfiguredAirbyteCatalog())
.withSourceCatalogId(UUID.randomUUID())
.withSourceId(sourceId)
Expand Down Expand Up @@ -166,7 +170,6 @@ public static ConnectionRead generateExpectedConnectionRead(final UUID connectio
.namespaceDefinition(io.airbyte.api.model.generated.NamespaceDefinitionType.SOURCE)
.namespaceFormat(null)
.prefix("presto_to_hudi")
.status(ConnectionStatus.ACTIVE)
.schedule(generateBasicConnectionSchedule())
.scheduleType(ConnectionScheduleType.BASIC)
.scheduleData(generateBasicConnectionScheduleData())
Expand Down Expand Up @@ -199,6 +202,14 @@ public static ConnectionRead generateExpectedConnectionRead(final StandardSync s
.units(standardSync.getSchedule().getUnits()));
}

if (standardSync.getStatus() == Status.INACTIVE) {
connectionRead.setStatus(ConnectionStatus.INACTIVE);
} else if (standardSync.getStatus() == Status.ACTIVE) {
connectionRead.setStatus(ConnectionStatus.ACTIVE);
} else if (standardSync.getStatus() == Status.DEPRECATED) {
connectionRead.setStatus(ConnectionStatus.DEPRECATED);
}

return connectionRead;
}

Expand Down
1 change: 1 addition & 0 deletions docs/reference/api/generated-api-html/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -11891,6 +11891,7 @@ <h3><a name="SourceDiscoverSchemaRead"><code>SourceDiscoverSchemaRead</code> - <
<div class="param">catalogId (optional)</div><div class="param-desc"><span class="param-type"><a href="#UUID">UUID</a></span> format: uuid</div>
<div class="param">catalogDiff (optional)</div><div class="param-desc"><span class="param-type"><a href="#CatalogDiff">CatalogDiff</a></span> </div>
<div class="param">breakingChange (optional)</div><div class="param-desc"><span class="param-type"><a href="#boolean">Boolean</a></span> </div>
<div class="param">connectionStatus (optional)</div><div class="param-desc"><span class="param-type"><a href="#ConnectionStatus">ConnectionStatus</a></span> </div>
</div> <!-- field-items -->
</div>
<div class="model">
Expand Down
2 changes: 2 additions & 0 deletions tools/openapi2jsonschema/examples/airbyte.local/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2493,6 +2493,8 @@ components:
$ref: "#/components/schemas/CatalogDiff"
breakingChange:
type: boolean
connectionStatus:
$ref: "#/components/schemas/ConnectionStatus"
required:
- jobInfo
type: object
Expand Down

0 comments on commit 0def914

Please sign in to comment.