Skip to content

Commit

Permalink
Bug Fix: Handle missing data types in migrations. (#5089)
Browse files Browse the repository at this point in the history
Previously, when Airbyte exported configurations, even if there were not configurations of a type (e.g. operations) it would create a placeholder file for it in the export archive. The implementation has changed such that we cannot make that guarantee anymore. Specifically the postgres database only exports data that it actually has.

The migration assumed that every archive it processed would contain data from all types. This commit makes the change to make sure that the types represented in the archive just need to be a subset of the types declared in the schema.

In the future we can decide if we have an opinion on whether we should always export placeholder files. Since we have versions of Airbyte that don't do it though, we need to make this change in the migration code.
  • Loading branch information
cgardens authored Jul 30, 2021
1 parent 2fe0043 commit faa7f9a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static <T> void assertEqualsVerbose(Set<T> set1, Set<T> set2) {
Preconditions.checkNotNull(set2);

Preconditions.checkArgument(set1.equals(set2), String.format(
"Sets are not the same. Elements in set 1 and not in set 2: %s. Elements in set 2 and not in set 1 %s",
"Sets are not the same. Elements in set 1 and not in set 2: %s. Elements in set 2 and not in set 1: %s",
Sets.difference(set1, set2), Sets.difference(set2, set1)));
}

Expand Down
13 changes: 9 additions & 4 deletions airbyte-migration/src/main/java/io/airbyte/migrate/Migrate.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,15 @@ private Map<ResourceId, AutoCloseableIterator<JsonNode>> createInputStreams(Migr
createInputStreamsForResourceType(migrationInputRoot, ResourceType.CONFIG),
createInputStreamsForResourceType(migrationInputRoot, ResourceType.JOB));

try {
MoreSets.assertEqualsVerbose(migration.getInputSchema().keySet(), resourceIdToInputStreams.keySet());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Input record resources do not match declared schema resources", e);
System.out.println("\n\nschema = \n" + migration.getInputSchema().keySet().stream().map(ResourceId::getName).collect(Collectors.joining("\n")));
System.out.println("\n\nrecords = \n" + resourceIdToInputStreams.keySet().stream().map(ResourceId::getName).collect(Collectors.joining("\n")));
if (!migration.getInputSchema().keySet().containsAll(resourceIdToInputStreams.keySet())) {
try {
// we know something is wrong. check equality to get a full log message of the total difference.
MoreSets.assertEqualsVerbose(migration.getInputSchema().keySet(), resourceIdToInputStreams.keySet());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Input records contain resource not declared in schema resources", e);
}
}
return resourceIdToInputStreams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void testInvalidOutputRecord() throws IOException {
}

@Test
void testMissingRecordResource() throws IOException {
void testSchemaContainsTypeNotPresentInData() throws IOException {
final Set<String> configResourcesMissingWorkspace = Enums.valuesAsStrings(ConfigKeys.class);
configResourcesMissingWorkspace.remove(MigrationV0_14_0.ConfigKeys.STANDARD_WORKSPACE.name());
writeInputs(
Expand All @@ -182,7 +182,7 @@ void testMissingRecordResource() throws IOException {
inputRoot.resolve(ResourceType.CONFIG.getDirectoryName()),
V0_14_0_TEST_RECORDS);
writeInputs(
ResourceType.CONFIG,
ResourceType.JOB,
Enums.valuesAsStrings(MigrationV0_14_0.JobKeys.class),
inputRoot.resolve(ResourceType.JOB.getDirectoryName()),
V0_14_0_TEST_RECORDS);
Expand All @@ -193,23 +193,31 @@ void testMissingRecordResource() throws IOException {
final Migrate migrate = new Migrate(migrateRoot, TEST_MIGRATIONS);
final MigrateConfig config = new MigrateConfig(inputRoot, outputRoot, targetVersion);

assertThrows(IllegalArgumentException.class, () -> migrate.run(config));
migrate.run(config);
final Map<ResourceId, List<JsonNode>> expectedRecords = addFooBarToAllRecordsExceptMetadata(V0_14_0_TEST_RECORDS);
assertExpectedOutputVersion(outputRoot, targetVersion);
assertRecordsInOutput(expectedRecords, 1);
}

@Test
void testMissingSchemaResource() throws IOException {
final Set<String> configResourcesMissingWorkspace = Enums.valuesAsStrings(ConfigKeys.class);
configResourcesMissingWorkspace.add("FAKE");
void testRecordNotInSchema() throws IOException {
final Set<String> configResourceWithExtraResource = Enums.valuesAsStrings(ConfigKeys.class);
configResourceWithExtraResource.add("FAKE");
final Map<ResourceId, List<JsonNode>> mapWithFakeRecord = ImmutableMap.<ResourceId, List<JsonNode>>builder()
.putAll(V0_14_0_TEST_RECORDS)
.put(ResourceId.fromConstantCase(ResourceType.CONFIG, "FAKE"), List.of(Jsons.emptyObject()))
.build();

writeInputs(
ResourceType.CONFIG,
configResourcesMissingWorkspace,
configResourceWithExtraResource,
inputRoot.resolve(ResourceType.CONFIG.getDirectoryName()),
V0_14_0_TEST_RECORDS);
mapWithFakeRecord);
writeInputs(
ResourceType.JOB,
Enums.valuesAsStrings(MigrationV0_14_0.JobKeys.class),
inputRoot.resolve(ResourceType.JOB.getDirectoryName()),
V0_14_0_TEST_RECORDS);
mapWithFakeRecord);
IOs.writeFile(inputRoot, Migrate.VERSION_FILE_NAME, TEST_MIGRATIONS.get(0).getVersion());

final String targetVersion = TEST_MIGRATIONS.get(1).getVersion();
Expand Down

0 comments on commit faa7f9a

Please sign in to comment.