Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace all java Collectors.toList with kotlin construct #37537

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import io.airbyte.commons.json.Jsons
import io.airbyte.commons.string.Strings
import io.airbyte.commons.text.Names
import io.airbyte.commons.util.MoreIterators
import java.util.stream.Collectors

open class StandardNameTransformer : NamingConventionTransformer {
override fun getIdentifier(name: String): String {
Expand Down Expand Up @@ -77,7 +76,7 @@ open class StandardNameTransformer : NamingConventionTransformer {
MoreIterators.toList(root.elements())
.stream()
.map { r: JsonNode -> formatJsonPath(r) }
.collect(Collectors.toList())
.toList()
)
} else {
return root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ internal constructor(
}
.thenComparing { s: StreamDescriptor -> s.namespace + s.name },
)
.collect(Collectors.toList())
.toList()
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import java.util.*
import java.util.concurrent.*
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import java.util.function.Consumer
import java.util.stream.Collectors
import kotlin.math.min
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -85,7 +84,7 @@ class ConcurrentStreamConsumer(
.map { runnable: ConcurrentStreamRunnable ->
CompletableFuture.runAsync(runnable, executorService)
}
.collect(Collectors.toList())
.toList()

/*
* Wait for the submitted streams to complete before returning. This uses the join() method to allow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import io.airbyte.commons.string.Strings
import io.airbyte.protocol.models.JsonSchemaType
import java.math.BigDecimal
import java.sql.*
import java.util.stream.Collectors
import javax.sql.DataSource
import org.bouncycastle.util.encoders.Base64
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -122,7 +121,7 @@ internal class TestJdbcUtils {
JdbcDatabase.toUnsafeStream(rs) { queryContext: ResultSet ->
sourceOperations.rowToJson(queryContext)
}
.collect(Collectors.toList())
.toList()
Assertions.assertEquals(RECORDS_AS_JSON, actual)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
import java.util.stream.Collectors
import org.apache.commons.lang3.ThreadUtils
import org.assertj.core.api.AssertionsForClassTypes
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -474,7 +473,7 @@ ${Jsons.serialize(message2)}""".toByteArray(
ThreadUtils.getAllThreads()
.stream()
.filter(IntegrationRunner::filterOrphanedThread)
.collect(Collectors.toList())
.toList()
// all threads should be interrupted
Assertions.assertEquals(listOf<Any>(), runningThreads)
Assertions.assertEquals(1, caughtExceptions.size)
Expand Down Expand Up @@ -502,7 +501,7 @@ ${Jsons.serialize(message2)}""".toByteArray(
ThreadUtils.getAllThreads()
.stream()
.filter(IntegrationRunner::filterOrphanedThread)
.collect(Collectors.toList())
.toList()
// a thread that refuses to be interrupted should remain
Assertions.assertEquals(1, runningThreads.size)
Assertions.assertEquals(1, caughtExceptions.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Assertions.assertEquals
Expand Down Expand Up @@ -574,7 +573,7 @@ class AsyncStreamConsumerTest {
),
)
}
.collect(Collectors.toList())
.toList()
assertEquals(expRecords, actualRecords)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.time.Instant
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -150,7 +149,7 @@ class BufferedStreamConsumerTest {
Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2)
.stream()
.flatMap { obj: List<AirbyteMessage> -> obj.stream() }
.collect(Collectors.toList())
.toList()
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords)

Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1)
Expand Down Expand Up @@ -241,7 +240,7 @@ class BufferedStreamConsumerTest {
.stream()
.map { `object`: AirbyteMessage -> Jsons.clone(`object`) }
.peek { m: AirbyteMessage -> m.record.withStream(STREAM_NAME2) }
.collect(Collectors.toList())
.toList()

consumer.start()
consumeRecords(consumer, expectedRecordsStream1)
Expand All @@ -266,7 +265,7 @@ class BufferedStreamConsumerTest {
.stream()
.map { `object`: AirbyteMessage -> Jsons.clone(`object`) }
.peek { m: AirbyteMessage -> m.record.withStream(STREAM_NAME2) }
.collect(Collectors.toList())
.toList()

consumer.start()
consumeRecords(consumer, expectedRecordsStream1)
Expand Down Expand Up @@ -310,7 +309,7 @@ class BufferedStreamConsumerTest {
STREAM_NAME,
SCHEMA_NAME,
Stream.concat(expectedRecordsStream1.stream(), expectedRecordsStream1Batch2.stream())
.collect(Collectors.toList())
.toList()
)
Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1)
}
Expand Down Expand Up @@ -345,7 +344,7 @@ class BufferedStreamConsumerTest {
STREAM_NAME,
SCHEMA_NAME,
Stream.concat(expectedRecordsStream1.stream(), expectedRecordsStream1Batch2.stream())
.collect(Collectors.toList())
.toList()
)
verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecordsStream1Batch3)
// expects two STATE messages returned since one will be flushed after periodic flushing
Expand Down Expand Up @@ -589,10 +588,7 @@ class BufferedStreamConsumerTest {
Mockito.verify(recordWriter)
.accept(
AirbyteStreamNameNamespacePair(streamName, namespace),
expectedRecords
.stream()
.map { obj: AirbyteMessage -> obj.record }
.collect(Collectors.toList())
expectedRecords.stream().map { obj: AirbyteMessage -> obj.record }.toList()
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import java.io.IOException
import java.sql.SQLException
import java.util.*
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.tuple.ImmutablePair
Expand Down Expand Up @@ -106,7 +105,7 @@ constructor(
.setType(StandardSQLTypeName.STRING)
.build()
}
.collect(Collectors.toList())
.toList()

return query(sql, parameterValueList)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import java.util.*
import java.util.concurrent.Executors
import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Collectors
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -111,15 +110,13 @@ object JdbcBufferedConsumerFactory {
return if (parsedCatalog == null) {
catalog!!
.streams
.stream()
.map(toWriteConfig(namingResolver, config, schemaRequired))
.collect(Collectors.toList())
.map { toWriteConfig(namingResolver, config, schemaRequired).apply(it) }
.toList()
} else {
// we should switch this to kotlin-style list processing, but meh for now
parsedCatalog.streams
.stream()
.map(parsedStreamToWriteConfig(namingResolver))
.collect(Collectors.toList())
.map { parsedStreamToWriteConfig(namingResolver).apply(it) }
.toList()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Timestamp
import java.time.Instant
import java.util.Locale
import java.util.Optional
import java.util.stream.Collectors
import java.util.*
import kotlin.Any
import kotlin.Boolean
import kotlin.IllegalArgumentException
import kotlin.Int
import org.jooq.Condition
import org.jooq.DSLContext
Expand Down Expand Up @@ -176,14 +177,14 @@ constructor(
.map { metaColumn: Map.Entry<String?, DataType<*>?> ->
DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value)
}
.collect(Collectors.toList())
.toList()
val dataFields =
columns.entries
.stream()
.map { column: Map.Entry<ColumnId?, AirbyteType> ->
DSL.field(DSL.quotedName(column.key!!.name), toDialectType(column.value))
}
.collect(Collectors.toList())
.toList()
dataFields.addAll(fields)
return dataFields
}
Expand Down Expand Up @@ -227,7 +228,7 @@ constructor(
.map { metaColumn: Map.Entry<String?, DataType<*>?> ->
DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value)
}
.collect(Collectors.toList())
.toList()
// Use originalName with non-sanitized characters when extracting data from _airbyte_data
val dataFields = extractRawDataFields(columns, useExpensiveSaferCasting)
dataFields.addAll(fields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.time.Instant
import java.util.UUID
import java.util.function.Consumer
import java.util.function.Function
import java.util.stream.Collectors
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -125,7 +124,7 @@ open class SerialStagingConsumerFactory {
return catalog.streams
.stream()
.map(toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns))
.collect(Collectors.toList())
.toList()
}

private fun toWriteConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import kotlin.test.assertNotNull
import org.junit.jupiter.api.*
Expand Down Expand Up @@ -486,7 +485,7 @@ abstract class DestinationAcceptanceTest {
else message.toString()
}
)
.collect(Collectors.toList())
.toList()

val config = getConfig()
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false)
Expand Down Expand Up @@ -861,7 +860,7 @@ abstract class DestinationAcceptanceTest {
}
message
}
.collect(Collectors.toList())
.toList()
assertSameMessages(expectedMessages, actualMessages, true)
}

Expand Down Expand Up @@ -1031,7 +1030,7 @@ abstract class DestinationAcceptanceTest {
it.record.data["NZD"].asText()
(it.record.emittedAt == latestMessagesOnly[key]!!.record.emittedAt)
}
.collect(Collectors.toList())
.toList()

val defaultSchema = getDefaultSchema(config)
retrieveRawRecordsAndAssertSameMessages(
Expand Down Expand Up @@ -1754,7 +1753,7 @@ abstract class DestinationAcceptanceTest {
if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage
}
.map { obj: AirbyteRecordMessage -> obj.data }
.collect(Collectors.toList())
.toList()

val actualProcessed =
actual
Expand All @@ -1763,7 +1762,7 @@ abstract class DestinationAcceptanceTest {
if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage
}
.map { obj: AirbyteRecordMessage -> obj.data }
.collect(Collectors.toList())
.toList()

_testDataComparator.assertSameData(expectedProcessed, actualProcessed)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ abstract class AbstractJdbcSource<Datatype>(
)
}
.map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) }
.collect(Collectors.toList())
.toList()
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected constructor(driverClassName: String) :
val iteratorList =
Stream.of(incrementalIterators, fullRefreshIterators)
.flatMap(Collection<AutoCloseableIterator<AirbyteMessage>>::stream)
.collect(Collectors.toList())
.toList()

return AutoCloseableIterators.appendOnClose(
AutoCloseableIterators.concatWithEagerClose(
Expand Down Expand Up @@ -318,7 +318,7 @@ protected constructor(driverClassName: String) :
.filter { table: TableInfo<CommonField<DataType>> ->
!systemNameSpaces.contains(table.nameSpace) && !systemViews.contains(table.name)
}
.collect(Collectors.toList()))
.toList())
}

protected fun getFullRefreshIterators(
Expand Down Expand Up @@ -434,7 +434,7 @@ protected constructor(driverClassName: String) :
.stream()
.map { obj: CommonField<DataType> -> obj.name }
.filter { o: String -> selectedFieldsInCatalog.contains(o) }
.collect(Collectors.toList())
.toList()

val iterator: AutoCloseableIterator<AirbyteMessage>
// checks for which sync mode we're using based on the configured airbytestream
Expand Down
Loading
Loading