Skip to content

Commit

Permalink
replace all java Collectors.toList with kotlin construct (#37537)
Browse files Browse the repository at this point in the history
more kotlin cleanup
  • Loading branch information
stephane-airbyte authored May 23, 2024
1 parent c304df3 commit b0ab148
Show file tree
Hide file tree
Showing 40 changed files with 116 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import io.airbyte.commons.json.Jsons
import io.airbyte.commons.string.Strings
import io.airbyte.commons.text.Names
import io.airbyte.commons.util.MoreIterators
import java.util.stream.Collectors

open class StandardNameTransformer : NamingConventionTransformer {
override fun getIdentifier(name: String): String {
Expand Down Expand Up @@ -77,7 +76,7 @@ open class StandardNameTransformer : NamingConventionTransformer {
MoreIterators.toList(root.elements())
.stream()
.map { r: JsonNode -> formatJsonPath(r) }
.collect(Collectors.toList())
.toList()
)
} else {
return root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ internal constructor(
}
.thenComparing { s: StreamDescriptor -> s.namespace + s.name },
)
.collect(Collectors.toList())
.toList()
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import java.util.*
import java.util.concurrent.*
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.function.Consumer
import java.util.stream.Collectors
import kotlin.math.min
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -85,7 +84,7 @@ class ConcurrentStreamConsumer(
.map { runnable: ConcurrentStreamRunnable ->
CompletableFuture.runAsync(runnable, executorService)
}
.collect(Collectors.toList())
.toList()

/*
* Wait for the submitted streams to complete before returning. This uses the join() method to allow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import io.airbyte.commons.string.Strings
import io.airbyte.protocol.models.JsonSchemaType
import java.math.BigDecimal
import java.sql.*
import java.util.stream.Collectors
import javax.sql.DataSource
import org.bouncycastle.util.encoders.Base64
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -122,7 +121,7 @@ internal class TestJdbcUtils {
JdbcDatabase.toUnsafeStream(rs) { queryContext: ResultSet ->
sourceOperations.rowToJson(queryContext)
}
.collect(Collectors.toList())
.toList()
Assertions.assertEquals(RECORDS_AS_JSON, actual)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
import java.util.stream.Collectors
import org.apache.commons.lang3.ThreadUtils
import org.assertj.core.api.AssertionsForClassTypes
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -474,7 +473,7 @@ ${Jsons.serialize(message2)}""".toByteArray(
ThreadUtils.getAllThreads()
.stream()
.filter(IntegrationRunner::filterOrphanedThread)
.collect(Collectors.toList())
.toList()
// all threads should be interrupted
Assertions.assertEquals(listOf<Any>(), runningThreads)
Assertions.assertEquals(1, caughtExceptions.size)
Expand Down Expand Up @@ -502,7 +501,7 @@ ${Jsons.serialize(message2)}""".toByteArray(
ThreadUtils.getAllThreads()
.stream()
.filter(IntegrationRunner::filterOrphanedThread)
.collect(Collectors.toList())
.toList()
// a thread that refuses to be interrupted should remain
Assertions.assertEquals(1, runningThreads.size)
Assertions.assertEquals(1, caughtExceptions.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Assertions.assertEquals
Expand Down Expand Up @@ -574,7 +573,7 @@ class AsyncStreamConsumerTest {
),
)
}
.collect(Collectors.toList())
.toList()
assertEquals(expRecords, actualRecords)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.time.Instant
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -150,7 +149,7 @@ class BufferedStreamConsumerTest {
Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2)
.stream()
.flatMap { obj: List<AirbyteMessage> -> obj.stream() }
.collect(Collectors.toList())
.toList()
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords)

Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1)
Expand Down Expand Up @@ -241,7 +240,7 @@ class BufferedStreamConsumerTest {
.stream()
.map { `object`: AirbyteMessage -> Jsons.clone(`object`) }
.peek { m: AirbyteMessage -> m.record.withStream(STREAM_NAME2) }
.collect(Collectors.toList())
.toList()

consumer.start()
consumeRecords(consumer, expectedRecordsStream1)
Expand All @@ -266,7 +265,7 @@ class BufferedStreamConsumerTest {
.stream()
.map { `object`: AirbyteMessage -> Jsons.clone(`object`) }
.peek { m: AirbyteMessage -> m.record.withStream(STREAM_NAME2) }
.collect(Collectors.toList())
.toList()

consumer.start()
consumeRecords(consumer, expectedRecordsStream1)
Expand Down Expand Up @@ -310,7 +309,7 @@ class BufferedStreamConsumerTest {
STREAM_NAME,
SCHEMA_NAME,
Stream.concat(expectedRecordsStream1.stream(), expectedRecordsStream1Batch2.stream())
.collect(Collectors.toList())
.toList()
)
Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1)
}
Expand Down Expand Up @@ -345,7 +344,7 @@ class BufferedStreamConsumerTest {
STREAM_NAME,
SCHEMA_NAME,
Stream.concat(expectedRecordsStream1.stream(), expectedRecordsStream1Batch2.stream())
.collect(Collectors.toList())
.toList()
)
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1Batch3)
// expects two STATE messages returned since one will be flushed after periodic flushing
Expand Down Expand Up @@ -589,10 +588,7 @@ class BufferedStreamConsumerTest {
Mockito.verify(recordWriter)
.accept(
AirbyteStreamNameNamespacePair(streamName, namespace),
expectedRecords
.stream()
.map { obj: AirbyteMessage -> obj.record }
.collect(Collectors.toList())
expectedRecords.stream().map { obj: AirbyteMessage -> obj.record }.toList()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import java.io.IOException
import java.sql.SQLException
import java.util.*
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.tuple.ImmutablePair
Expand Down Expand Up @@ -106,7 +105,7 @@ constructor(
.setType(StandardSQLTypeName.STRING)
.build()
}
.collect(Collectors.toList())
.toList()

return query(sql, parameterValueList)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import java.util.*
import java.util.concurrent.Executors
import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Collectors
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -111,15 +110,13 @@ object JdbcBufferedConsumerFactory {
return if (parsedCatalog == null) {
catalog!!
.streams
.stream()
.map(toWriteConfig(namingResolver, config, schemaRequired))
.collect(Collectors.toList())
.map { toWriteConfig(namingResolver, config, schemaRequired).apply(it) }
.toList()
} else {
// we should switch this to kotlin-style list processing, but meh for now
parsedCatalog.streams
.stream()
.map(parsedStreamToWriteConfig(namingResolver))
.collect(Collectors.toList())
.map { parsedStreamToWriteConfig(namingResolver).apply(it) }
.toList()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Timestamp
import java.time.Instant
import java.util.Locale
import java.util.Optional
import java.util.stream.Collectors
import java.util.*
import kotlin.Any
import kotlin.Boolean
import kotlin.IllegalArgumentException
import kotlin.Int
import org.jooq.Condition
import org.jooq.DSLContext
Expand Down Expand Up @@ -176,14 +177,14 @@ constructor(
.map { metaColumn: Map.Entry<String?, DataType<*>?> ->
DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value)
}
.collect(Collectors.toList())
.toList()
val dataFields =
columns.entries
.stream()
.map { column: Map.Entry<ColumnId?, AirbyteType> ->
DSL.field(DSL.quotedName(column.key!!.name), toDialectType(column.value))
}
.collect(Collectors.toList())
.toList()
dataFields.addAll(fields)
return dataFields
}
Expand Down Expand Up @@ -227,7 +228,7 @@ constructor(
.map { metaColumn: Map.Entry<String?, DataType<*>?> ->
DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value)
}
.collect(Collectors.toList())
.toList()
// Use originalName with non-sanitized characters when extracting data from _airbyte_data
val dataFields = extractRawDataFields(columns, useExpensiveSaferCasting)
dataFields.addAll(fields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.time.Instant
import java.util.UUID
import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Collectors
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -125,7 +124,7 @@ open class SerialStagingConsumerFactory {
return catalog.streams
.stream()
.map(toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns))
.collect(Collectors.toList())
.toList()
}

private fun toWriteConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import kotlin.test.assertNotNull
import org.junit.jupiter.api.*
Expand Down Expand Up @@ -486,7 +485,7 @@ abstract class DestinationAcceptanceTest {
else message.toString()
}
)
.collect(Collectors.toList())
.toList()

val config = getConfig()
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false)
Expand Down Expand Up @@ -861,7 +860,7 @@ abstract class DestinationAcceptanceTest {
}
message
}
.collect(Collectors.toList())
.toList()
assertSameMessages(expectedMessages, actualMessages, true)
}

Expand Down Expand Up @@ -1031,7 +1030,7 @@ abstract class DestinationAcceptanceTest {
it.record.data["NZD"].asText()
(it.record.emittedAt == latestMessagesOnly[key]!!.record.emittedAt)
}
.collect(Collectors.toList())
.toList()

val defaultSchema = getDefaultSchema(config)
retrieveRawRecordsAndAssertSameMessages(
Expand Down Expand Up @@ -1754,7 +1753,7 @@ abstract class DestinationAcceptanceTest {
if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage
}
.map { obj: AirbyteRecordMessage -> obj.data }
.collect(Collectors.toList())
.toList()

val actualProcessed =
actual
Expand All @@ -1763,7 +1762,7 @@ abstract class DestinationAcceptanceTest {
if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage
}
.map { obj: AirbyteRecordMessage -> obj.data }
.collect(Collectors.toList())
.toList()

_testDataComparator.assertSameData(expectedProcessed, actualProcessed)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ abstract class AbstractJdbcSource<Datatype>(
)
}
.map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) }
.collect(Collectors.toList())
.toList()
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected constructor(driverClassName: String) :
val iteratorList =
Stream.of(incrementalIterators, fullRefreshIterators)
.flatMap(Collection<AutoCloseableIterator<AirbyteMessage>>::stream)
.collect(Collectors.toList())
.toList()

return AutoCloseableIterators.appendOnClose(
AutoCloseableIterators.concatWithEagerClose(
Expand Down Expand Up @@ -318,7 +318,7 @@ protected constructor(driverClassName: String) :
.filter { table: TableInfo<CommonField<DataType>> ->
!systemNameSpaces.contains(table.nameSpace) && !systemViews.contains(table.name)
}
.collect(Collectors.toList()))
.toList())
}

protected fun getFullRefreshIterators(
Expand Down Expand Up @@ -434,7 +434,7 @@ protected constructor(driverClassName: String) :
.stream()
.map { obj: CommonField<DataType> -> obj.name }
.filter { o: String -> selectedFieldsInCatalog.contains(o) }
.collect(Collectors.toList())
.toList()

val iterator: AutoCloseableIterator<AirbyteMessage>
// checks for which sync mode we're using based on the configured airbytestream
Expand Down
Loading

0 comments on commit b0ab148

Please sign in to comment.