From 0805f9e94c62a0f544f4d3710da55d9b33dd455a Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Thu, 23 May 2024 11:09:48 -0700 Subject: [PATCH] replace all java collectors.toSet with kotlin construct (#37538) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ## How ## Review guide ## User Impact ## Can this PR be safely reverted and rolled back? - [ ] YES 💚 - [ ] NO ❌ --- .../airbyte/cdk/db/mongodb/MongoDatabase.kt | 4 +- .../source/jdbc/AbstractJdbcSource.kt | 5 +- .../relationaldb/DbSourceDiscoverUtil.kt | 5 +- .../relationaldb/RelationalDbReadUtil.kt | 4 +- .../relationaldb/state/CursorManager.kt | 3 +- .../relationaldb/state/GlobalStateManager.kt | 6 +- .../integrations/debezium/CdcSourceTest.kt | 58 ++++++------------- .../source/jdbc/test/JdbcStressTest.kt | 2 - .../kotlin/io/airbyte/commons/enums/Enums.kt | 10 +--- .../validation/json/JsonSchemaValidator.kt | 4 +- .../workers/helper/CatalogClientConverters.kt | 4 +- ...GcsAvroParquetDestinationAcceptanceTest.kt | 8 +-- .../S3AvroParquetDestinationAcceptanceTest.kt | 13 ++--- 13 files changed, 40 insertions(+), 86 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt index 7b36576ff5bd..02f11b3d0802 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt @@ -16,7 +16,6 @@ import io.airbyte.commons.util.MoreIterators import java.util.* import java.util.Spliterators.AbstractSpliterator import java.util.function.Consumer -import java.util.stream.Collectors import java.util.stream.Stream import java.util.stream.StreamSupport import org.bson.BsonDocument @@ -57,9 +56,8 @@ class MongoDatabase(connectionString: String, databaseName: String) : get() { val collectionNames = database.listCollectionNames() ?: return Collections.emptySet() return MoreIterators.toSet(collectionNames.iterator()) - .stream() .filter { c: String -> !c.startsWith(MONGO_RESERVED_COLLECTION_PREFIX) } - .collect(Collectors.toSet()) + .toSet() } fun getCollection(collectionName: String): MongoCollection { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index 4756f2aa158e..65ad0404847b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -423,10 +423,7 @@ abstract class AbstractJdbcSource( ): Map> { LOGGER.info( "Discover primary keys for tables: " + - tableInfos - .stream() - .map { obj: TableInfo> -> obj.name } - .collect(Collectors.toSet()) + tableInfos.map { obj: TableInfo> -> obj.name }.toSet() ) try { // Get all primary keys without specifying a table name diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt index 40c99432deef..7b6a023062a2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt @@ -143,9 +143,8 @@ object DbSourceDiscoverUtil { ) .withSourceDefinedPrimaryKey(primaryKeys) } - // This is ugly. Some of our tests change the streams on the AirbyteCatalog - // object... - .toMutableList() + .toMutableList() // This is ugly, but we modify this list in + // JdbcSourceAcceptanceTest.testDiscoverWithMultipleSchemas return AirbyteCatalog().withStreams(streams) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt index 2ea21ddcc7ac..255ca51c885e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt @@ -9,7 +9,6 @@ import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.SyncMode -import java.util.stream.Collectors object RelationalDbReadUtil { fun identifyStreamsToSnapshot( @@ -38,11 +37,10 @@ object RelationalDbReadUtil { ): List { val initialLoadStreamsNamespacePairs = streamsForInitialLoad - .stream() .map { stream: ConfiguredAirbyteStream -> AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.stream) } - .collect(Collectors.toSet()) + .toSet() return catalog.streams .stream() .filter { c: ConfiguredAirbyteStream -> c.syncMode == SyncMode.INCREMENTAL } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt index 90a086a68a14..3f78b9726e72 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt @@ -92,7 +92,6 @@ class CursorManager( ): Map { val allStreamNames = catalog.streams - .stream() .filter { c: ConfiguredAirbyteStream -> if (onlyIncludeIncrementalStreams) { return@filter c.syncMode == SyncMode.INCREMENTAL @@ -103,7 +102,7 @@ class CursorManager( .map { stream: AirbyteStream -> AirbyteStreamNameNamespacePair.fromAirbyteStream(stream) } - .collect(Collectors.toSet()) + .toMutableSet() allStreamNames.addAll( streamSupplier .get() diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt index a4c475b4bc06..9948a578b5dd 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt @@ -105,7 +105,6 @@ class GlobalStateManager( ): Set { if (airbyteStateMessage!!.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) { return airbyteStateMessage.global.streamStates - .stream() .map { streamState: AirbyteStreamState -> val cloned = Jsons.clone(streamState) AirbyteStreamNameNamespacePair( @@ -113,7 +112,7 @@ class GlobalStateManager( cloned.streamDescriptor.namespace ) } - .collect(Collectors.toSet()) + .toSet() } else { val legacyState: DbState? = Jsons.`object`(airbyteStateMessage.data, DbState::class.java) @@ -127,12 +126,11 @@ class GlobalStateManager( streams: List ): Set { return streams - .stream() .map { stream: DbStreamState -> val cloned = Jsons.clone(stream) AirbyteStreamNameNamespacePair(cloned.streamName, cloned.streamNamespace) } - .collect(Collectors.toSet()) + .toSet() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index 9a523b262244..677c95b30446 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -16,8 +16,6 @@ import io.airbyte.protocol.models.JsonSchemaType import io.airbyte.protocol.models.v0.* import java.util.* import java.util.function.Consumer -import java.util.stream.Collectors -import java.util.stream.Stream import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeEach @@ -123,9 +121,8 @@ abstract class CdcSourceTest> { private fun assertStateDoNotHaveDuplicateStreams(stateMessage: AirbyteStateMessage) { val dedupedStreamStates = stateMessage.global.streamStates - .stream() .map { streamState: AirbyteStreamState -> streamState.streamDescriptor } - .collect(Collectors.toSet()) + .toSet() Assertions.assertEquals(dedupedStreamStates.size, stateMessage.global.streamStates.size) } @@ -290,7 +287,7 @@ abstract class CdcSourceTest> { if (message.type == AirbyteMessage.Type.RECORD) { val recordMessage = message.record recordsPerStream - .computeIfAbsent(recordMessage.stream) { c: String -> ArrayList() } + .computeIfAbsent(recordMessage.stream) { _: String -> ArrayList() } .add(recordMessage) } } @@ -326,10 +323,7 @@ abstract class CdcSourceTest> { assertExpectedRecords( expectedRecords, actualRecords, - actualRecords - .stream() - .map { obj: AirbyteRecordMessage -> obj.stream } - .collect(Collectors.toSet()), + actualRecords.map { obj: AirbyteRecordMessage -> obj.stream }.toSet(), ) } @@ -356,8 +350,7 @@ abstract class CdcSourceTest> { ) { val actualData = actualRecords - .stream() - .map { recordMessage: AirbyteRecordMessage -> + .map { recordMessage: AirbyteRecordMessage -> Assertions.assertTrue(streamNames.contains(recordMessage.stream)) Assertions.assertNotNull(recordMessage.emittedAt) @@ -374,7 +367,7 @@ abstract class CdcSourceTest> { removeCDCColumns(data as ObjectNode) data } - .collect(Collectors.toSet()) + .toSet() Assertions.assertEquals(expectedRecords, actualData) } @@ -639,8 +632,7 @@ abstract class CdcSourceTest> { // Non resumeable full refresh does not get any state messages. assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong()) assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream()) - .collect(Collectors.toSet()), + (MODEL_RECORDS_2 + MODEL_RECORDS).toSet(), recordMessages1, setOf(MODELS_STREAM_NAME), names, @@ -657,8 +649,7 @@ abstract class CdcSourceTest> { assertExpectedStateMessagesFromIncrementalSync(stateMessages2) assertExpectedStateMessageCountMatches(stateMessages2, 1) assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord)) - .collect(Collectors.toSet()), + (MODEL_RECORDS_2 + puntoRecord).toSet(), recordMessages2, setOf(MODELS_STREAM_NAME), names, @@ -666,8 +657,7 @@ abstract class CdcSourceTest> { ) } else { assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream()) - .collect(Collectors.toSet()), + (MODEL_RECORDS_2 + MODEL_RECORDS).toSet(), recordMessages1, setOf(MODELS_STREAM_NAME), names, @@ -687,8 +677,7 @@ abstract class CdcSourceTest> { val stateMessages2 = extractStateMessages(actualRecords2) assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord)) - .collect(Collectors.toSet()), + (MODEL_RECORDS_2 + puntoRecord).toSet(), recordMessages2, setOf(MODELS_STREAM_NAME), names, @@ -706,7 +695,7 @@ abstract class CdcSourceTest> { val actualRecords3 = AutoCloseableIterators.toListAndClose(read3) val recordMessages3 = extractRecordMessages(actualRecords3) assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream()).collect(Collectors.toSet()), + MODEL_RECORDS_2.toSet(), recordMessages3, setOf(MODELS_STREAM_NAME), names, @@ -790,8 +779,7 @@ abstract class CdcSourceTest> { // Non resumeable full refresh does not get any state messages. assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong()) assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream()) - .collect(Collectors.toSet()), + (MODEL_RECORDS_2 + MODEL_RECORDS).toSet(), recordMessages1, setOf(MODELS_STREAM_NAME), names, @@ -808,8 +796,7 @@ abstract class CdcSourceTest> { assertExpectedStateMessagesFromIncrementalSync(stateMessages2) assertExpectedStateMessageCountMatches(stateMessages2, 1) assertExpectedRecords( - Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord)) - .collect(Collectors.toSet()), + (MODEL_RECORDS_2 + puntoRecord).toSet(), recordMessages2, setOf(MODELS_STREAM_NAME), names, @@ -917,9 +904,8 @@ abstract class CdcSourceTest> { Assertions.assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.global.sharedState) val streamsInStateAfterFirstSyncCompletion = stateMessageEmittedAfterFirstSyncCompletion.global.streamStates - .stream() .map { obj: AirbyteStreamState -> obj.streamDescriptor } - .collect(Collectors.toSet()) + .toSet() Assertions.assertEquals(1, streamsInStateAfterFirstSyncCompletion.size) Assertions.assertTrue( streamsInStateAfterFirstSyncCompletion.contains( @@ -1009,9 +995,8 @@ abstract class CdcSourceTest> { HashSet(MODEL_RECORDS_RANDOM), recordsForModelsRandomStreamFromSecondBatch, recordsForModelsRandomStreamFromSecondBatch - .stream() .map { obj: AirbyteRecordMessage -> obj.stream } - .collect(Collectors.toSet()), + .toSet(), Sets.newHashSet(RANDOM_TABLE_NAME), randomSchema(), ) @@ -1078,9 +1063,8 @@ abstract class CdcSourceTest> { ) val streamsInSyncCompletionStateAfterThirdSync = stateMessageEmittedAfterThirdSyncCompletion.global.streamStates - .stream() .map { obj: AirbyteStreamState -> obj.streamDescriptor } - .collect(Collectors.toSet()) + .toSet() Assertions.assertTrue( streamsInSyncCompletionStateAfterThirdSync.contains( StreamDescriptor().withName(RANDOM_TABLE_NAME).withNamespace(randomSchema()), @@ -1109,9 +1093,8 @@ abstract class CdcSourceTest> { recordsWrittenInRandomTable, recordsForModelsRandomStreamFromThirdBatch, recordsForModelsRandomStreamFromThirdBatch - .stream() .map { obj: AirbyteRecordMessage -> obj.stream } - .collect(Collectors.toSet()), + .toSet(), Sets.newHashSet(RANDOM_TABLE_NAME), randomSchema(), ) @@ -1138,9 +1121,8 @@ abstract class CdcSourceTest> { Assertions.assertNotNull(stateMessageEmittedAfterFirstSyncCompletion.global.sharedState) val streamsInStateAfterFirstSyncCompletion = stateMessageEmittedAfterFirstSyncCompletion.global.streamStates - .stream() .map { obj: AirbyteStreamState -> obj.streamDescriptor } - .collect(Collectors.toSet()) + .toSet() Assertions.assertEquals(1, streamsInStateAfterFirstSyncCompletion.size) Assertions.assertTrue( streamsInStateAfterFirstSyncCompletion.contains( @@ -1256,9 +1238,8 @@ abstract class CdcSourceTest> { ) val streamsInSnapshotState = stateMessageEmittedAfterSnapshotCompletionInSecondSync.global.streamStates - .stream() .map { obj: AirbyteStreamState -> obj.streamDescriptor } - .collect(Collectors.toSet()) + .toSet() Assertions.assertEquals(2, streamsInSnapshotState.size) Assertions.assertTrue( streamsInSnapshotState.contains( @@ -1283,9 +1264,8 @@ abstract class CdcSourceTest> { ) val streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.global.streamStates - .stream() .map { obj: AirbyteStreamState -> obj.streamDescriptor } - .collect(Collectors.toSet()) + .toSet() Assertions.assertEquals(2, streamsInSnapshotState.size) Assertions.assertTrue( streamsInSyncCompletionState.contains( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt index f231d01ab0c2..9fbd2d1252fc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt @@ -20,7 +20,6 @@ import io.airbyte.protocol.models.Field import io.airbyte.protocol.models.JsonSchemaType import io.airbyte.protocol.models.v0.* import java.math.BigDecimal -import java.nio.ByteBuffer import java.sql.Connection import java.util.* import org.junit.jupiter.api.Assertions @@ -172,7 +171,6 @@ abstract class JdbcStressTest { } .peek { m: AirbyteMessage -> assertExpectedMessage(m) } .count() - var a: ByteBuffer val expectedRoundedRecordsCount = TOTAL_RECORDS - TOTAL_RECORDS % 1000 LOGGER.info("expected records count: " + TOTAL_RECORDS) LOGGER.info("actual records count: $actualCount") diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt index 56a964761d41..9bee126061c8 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt @@ -7,11 +7,9 @@ package io.airbyte.commons.enums import com.google.common.base.Preconditions import com.google.common.collect.Maps import com.google.common.collect.Sets -import java.util.Arrays import java.util.Locale import java.util.Optional import java.util.concurrent.ConcurrentMap -import java.util.stream.Collectors class Enums { companion object { @@ -54,12 +52,8 @@ class Enums { Preconditions.checkArgument(c2.isEnum) return (c1.enumConstants.size == c2.enumConstants.size && Sets.difference( - Arrays.stream(c1.enumConstants) - .map { obj: T1 -> obj!!.name } - .collect(Collectors.toSet()), - Arrays.stream(c2.enumConstants) - .map { obj: T2 -> obj!!.name } - .collect(Collectors.toSet()), + c1.enumConstants.map { obj: T1 -> obj!!.name }.toSet(), + c2.enumConstants.map { obj: T2 -> obj!!.name }.toSet(), ) .isEmpty()) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt index a7b85b909708..c06001032560 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt @@ -11,7 +11,6 @@ import java.io.File import java.io.IOException import java.net.URI import java.net.URISyntaxException -import java.util.stream.Collectors import me.andrz.jackson.JsonContext import me.andrz.jackson.JsonReferenceException import me.andrz.jackson.JsonReferenceProcessor @@ -89,9 +88,8 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR fun validate(schemaJson: JsonNode, objectJson: JsonNode): Set { return validateInternal(schemaJson, objectJson) - .stream() .map { obj: ValidationMessage -> obj.message } - .collect(Collectors.toSet()) + .toSet() } fun getValidationMessageArgs(schemaJson: JsonNode, objectJson: JsonNode): List> { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt index 619f5be63677..a480ef7bab05 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt @@ -10,7 +10,6 @@ import io.airbyte.commons.text.Names import io.airbyte.protocol.models.SyncMode import io.airbyte.validation.json.JsonValidationException import java.util.* -import java.util.stream.Collectors /** * Utilities to convert Catalog protocol to Catalog API client. This class was similar to existing @@ -76,9 +75,8 @@ object CatalogClientConverters { // field path. val selectedFieldNames = config.selectedFields!! - .stream() .map { field: SelectedFieldInfo -> field.fieldPath!![0] } - .collect(Collectors.toSet()) + .toSet() // TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields // because we // don't support filtering nested fields yet. diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt index aa1ec7c96221..05fb1799c1dc 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt @@ -148,10 +148,9 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(fileUploadFormat: FileUpl field .schema() .types - .stream() .map { obj: Schema -> obj.type } .filter { type: Schema.Type -> type != Schema.Type.NULL } - .collect(Collectors.toSet()) + .toSet() } ) ) @@ -165,12 +164,11 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(fileUploadFormat: FileUpl field .schema() .types - .stream() .filter { type: Schema -> type.type != Schema.Type.NULL } - .flatMap { type: Schema -> type.elementType.types.stream() } + .flatMap { type: Schema -> type.elementType.types } .map { obj: Schema -> obj.type } .filter { type: Schema.Type -> type != Schema.Type.NULL } - .collect(Collectors.toSet()) + .toSet() } ) ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt index 19547cce3b74..8e0109311e7f 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt @@ -79,13 +79,14 @@ protected constructor(fileUploadFormat: FileUploadFormat) : else fieldDefinition["type"] val airbyteTypeProperty = fieldDefinition["airbyte_type"] val airbyteTypePropertyText = airbyteTypeProperty?.asText() - return Arrays.stream(JsonSchemaType.entries.toTypedArray()) + return JsonSchemaType.entries + .toTypedArray() .filter { value: JsonSchemaType -> value.jsonSchemaType == typeProperty.asText() && compareAirbyteTypes(airbyteTypePropertyText, value) } .map { obj: JsonSchemaType -> obj.avroType } - .collect(Collectors.toSet()) + .toSet() } private fun compareAirbyteTypes( @@ -138,10 +139,9 @@ protected constructor(fileUploadFormat: FileUploadFormat) : field .schema() .types - .stream() .map { obj: Schema -> obj.type } .filter { type: Schema.Type -> type != Schema.Type.NULL } - .collect(Collectors.toSet()) + .toSet() } ) ) @@ -155,12 +155,11 @@ protected constructor(fileUploadFormat: FileUploadFormat) : field .schema() .types - .stream() .filter { type: Schema -> type.type != Schema.Type.NULL } - .flatMap { type: Schema -> type.elementType.types.stream() } + .flatMap { type: Schema -> type.elementType.types } .map { obj: Schema -> obj.type } .filter { type: Schema.Type -> type != Schema.Type.NULL } - .collect(Collectors.toSet()) + .toSet() } ) )