forked from airbytehq/airbyte
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
:tada Namespace support. Supported source-destination pairs will now …
…sync data into the same namespace as the source. (airbytehq#2862) This PR introduces the following behavior for JDBC sources: Instead of streamName = schema.tableName, this is now streamName = tableName and namespace = schema. This means that, when replicating from these sources, data will be replicated into a form matching the source. e.g. public.users (postgres source) -> public.users (postgres destination) instead of current behaviour of public.public_users. Since MySQL does not have schemas, the MySQL source uses the database as it's namespace. To do so: - Make namespace a field class concept in Airbyte Protocol. This allows the source to propagate namespace and destinations to write to a source-defined namespace. Also sets us up for future namespace related configurability. - Add an optional namespace field to the AirbyteRecordMessage. This field will be set by sources that support namespace. - Introduce AirbyteStreamNameNamespacePair as a type-safe manner of identifying streams throughout our code base. - Modify base_normalisation to better support source defined namespace, specifically allowing normalisation of tables with the same name to different schemas.
- Loading branch information
Showing
52 changed files
with
1,054 additions
and
450 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
43 changes: 43 additions & 0 deletions
43
airbyte-commons/src/main/java/io/airbyte/commons/type/Types.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.commons.type; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
|
||
public class Types { | ||
|
||
/** | ||
* Convenience method converting a list to a list of lists of the same type. Each item in the | ||
* original list is inserted into its own list. | ||
*/ | ||
public static <T> List<List<T>> boxToListofList(List<T> list) { | ||
var nonNullEntries = list.stream().filter(Objects::nonNull); | ||
return nonNullEntries.map(Collections::singletonList).collect(Collectors.toList()); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
.../base-java/src/main/java/io/airbyte/integrations/base/AirbyteStreamNameNamespacePair.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.base; | ||
|
||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import java.util.HashSet; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
|
||
/** | ||
* Wraps an {@link io.airbyte.protocol.models.AirbyteStream}'s name and namespace fields to simplify | ||
* comparison checks. This is helpful since these two fields are often used as an Airbyte Stream's | ||
* unique identifiers. | ||
*/ | ||
public class AirbyteStreamNameNamespacePair implements Comparable<AirbyteStreamNameNamespacePair> { | ||
|
||
final private String name; | ||
final private String namespace; | ||
|
||
public AirbyteStreamNameNamespacePair(String name, String namespace) { | ||
this.name = name; | ||
this.namespace = namespace; | ||
} | ||
|
||
public String getName() { | ||
return name; | ||
} | ||
|
||
public String getNamespace() { | ||
return namespace; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "AirbyteStreamNameNamespacePair{" + | ||
"name='" + name + '\'' + | ||
", namespace='" + namespace + '\'' + | ||
'}'; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
AirbyteStreamNameNamespacePair that = (AirbyteStreamNameNamespacePair) o; | ||
return Objects.equals(name, that.name) && Objects.equals(namespace, that.namespace); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(name, namespace); | ||
} | ||
|
||
@Override | ||
public int compareTo(AirbyteStreamNameNamespacePair o) { | ||
if (o == null) { | ||
return 1; | ||
} | ||
|
||
// first sort by name | ||
int nameCheck = name.compareTo(o.getName()); | ||
if (nameCheck != 0) { | ||
return nameCheck; | ||
} | ||
|
||
// then sort by namespace | ||
if (namespace == null && o.getNamespace() == null) { | ||
return 0; | ||
} | ||
if (namespace == null && o.getNamespace() != null) { | ||
return -1; | ||
} | ||
if (namespace != null && o.getNamespace() == null) { | ||
return 1; | ||
} | ||
return namespace.compareTo(o.getNamespace()); | ||
} | ||
|
||
public static void main(String[] args) { | ||
System.out.println("test".compareTo(null)); | ||
} | ||
|
||
public static AirbyteStreamNameNamespacePair fromRecordMessage(AirbyteRecordMessage msg) { | ||
return new AirbyteStreamNameNamespacePair(msg.getStream(), msg.getNamespace()); | ||
} | ||
|
||
public static AirbyteStreamNameNamespacePair fromAirbyteSteam(AirbyteStream stream) { | ||
return new AirbyteStreamNameNamespacePair(stream.getName(), stream.getNamespace()); | ||
} | ||
|
||
public static AirbyteStreamNameNamespacePair fromConfiguredAirbyteSteam(ConfiguredAirbyteStream stream) { | ||
return fromAirbyteSteam(stream.getStream()); | ||
} | ||
|
||
public static Set<AirbyteStreamNameNamespacePair> fromConfiguredCatalog(ConfiguredAirbyteCatalog catalog) { | ||
var pairs = new HashSet<AirbyteStreamNameNamespacePair>(); | ||
|
||
for (ConfiguredAirbyteStream stream : catalog.getStreams()) { | ||
var pair = fromAirbyteSteam(stream.getStream()); | ||
pairs.add(pair); | ||
} | ||
|
||
return pairs; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.