Skip to content

Commit

Permalink
remove log4j from java cdk (#38583)
Browse files Browse the repository at this point in the history
## What
<!--
* Describe what the change is solving. Link all GitHub issues related to this change.
-->

## How
<!--
* Describe how code changes achieve the solution.
-->

## Review guide
<!--
1. `x.py`
2. `y.py`
-->

## User Impact
<!--
* What is the end result perceived by the user?
* If there are negative side effects, please list them. 
-->

## Can this PR be safely reverted and rolled back?
<!--
* If unsure, leave it blank.
-->
- [ ] YES 💚
- [ ] NO ❌
  • Loading branch information
stephane-airbyte authored May 23, 2024
1 parent d82639c commit f74f5d9
Show file tree
Hide file tree
Showing 147 changed files with 1,103 additions and 1,534 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.*
import java.nio.charset.StandardCharsets
import java.sql.SQLException
Expand All @@ -26,8 +27,8 @@ import java.util.*
import java.util.function.Consumer
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}

abstract class AzureBlobStorageStreamCopier(
protected val stagingFolder: String,
Expand Down Expand Up @@ -104,38 +105,32 @@ abstract class AzureBlobStorageStreamCopier(

@Throws(Exception::class)
override fun closeStagingUploader(hasFailed: Boolean) {
LOGGER.info("Uploading remaining data for {} stream.", streamName)
LOGGER.info { "Uploading remaining data for $streamName stream." }
for (csvPrinter in csvPrinters.values) {
csvPrinter.close()
}
LOGGER.info("All data for {} stream uploaded.", streamName)
LOGGER.info { "All data for $streamName stream uploaded." }
}

@Throws(Exception::class)
override fun createDestinationSchema() {
LOGGER.info("Creating schema in destination if it doesn't exist: {}", schemaName)
LOGGER.info { "Creating schema in destination if it doesn't exist: $schemaName" }
sqlOperations.createSchemaIfNotExists(db, schemaName)
}

@Throws(Exception::class)
override fun createTemporaryTable() {
LOGGER.info(
"Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.",
streamName,
schemaName,
tmpTableName
)
LOGGER.info {
"Preparing tmp table in destination for stream: $streamName, schema: $schemaName, tmp table name: $tmpTableName."
}
sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName)
}

@Throws(Exception::class)
override fun copyStagingFileToTemporaryTable() {
LOGGER.info(
"Starting copy to tmp table: {} in destination for stream: {}, schema: {}.",
tmpTableName,
streamName,
schemaName
)
LOGGER.info {
"Starting copy to tmp table: $tmpTableName in destination for stream: $streamName, schema: $schemaName."
}
for (azureStagingFile in azureStagingFiles) {
copyAzureBlobCsvFileIntoTable(
db,
Expand All @@ -145,11 +140,9 @@ abstract class AzureBlobStorageStreamCopier(
azureBlobConfig
)
}
LOGGER.info(
"Copy to tmp table {} in destination for stream {} complete.",
tmpTableName,
streamName
)
LOGGER.info {
"Copy to tmp table $tmpTableName in destination for stream $streamName complete."
}
}

private fun getFullAzurePath(azureStagingFile: String?): String {
Expand All @@ -166,50 +159,45 @@ abstract class AzureBlobStorageStreamCopier(
@Throws(Exception::class)
override fun createDestinationTable(): String? {
@Suppress("DEPRECATION") val destTableName = nameTransformer.getRawTableName(streamName)
LOGGER.info("Preparing table {} in destination.", destTableName)
LOGGER.info { "Preparing table $destTableName in destination." }
sqlOperations.createTableIfNotExists(db, schemaName, destTableName)
LOGGER.info("Table {} in destination prepared.", tmpTableName)
LOGGER.info { "Table $tmpTableName in destination prepared." }

return destTableName
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String {
LOGGER.info(
"Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.",
tmpTableName,
destTableName,
schemaName
)
LOGGER.info {
"Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination."
}
val queries = StringBuilder()
if (destSyncMode == DestinationSyncMode.OVERWRITE) {
queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName))
LOGGER.info(
"Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.",
destTableName,
schemaName
)
LOGGER.info {
"Destination OVERWRITE mode detected. Dest table: $destTableName, schema: $schemaName, truncated."
}
}
queries.append(sqlOperations.insertTableQuery(db, schemaName, tmpTableName, destTableName))
return queries.toString()
}

@Throws(Exception::class)
override fun removeFileAndDropTmpTable() {
LOGGER.info("Begin cleaning azure blob staging files.")
LOGGER.info { "Begin cleaning azure blob staging files." }
for (appendBlobClient in blobClients.values) {
appendBlobClient.delete()
}
LOGGER.info("Azure Blob staging files cleaned.")
LOGGER.info { "Azure Blob staging files cleaned." }

LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName)
LOGGER.info { "Begin cleaning $tmpTableName tmp table in destination." }
sqlOperations.dropTableIfExists(db, schemaName, tmpTableName)
LOGGER.info("{} tmp table in destination cleaned.", tmpTableName)
LOGGER.info { "$tmpTableName tmp table in destination cleaned." }
}

@Throws(Exception::class)
override fun closeNonCurrentStagingFileWriters() {
LOGGER.info("Begin closing non current file writers")
LOGGER.info { "Begin closing non current file writers" }
val removedKeys: MutableSet<String> = HashSet()
for (key in activeStagingWriterFileNames) {
if (key != currentFile) {
Expand All @@ -231,8 +219,7 @@ abstract class AzureBlobStorageStreamCopier(
)

companion object {
private val LOGGER: Logger =
LoggerFactory.getLogger(AzureBlobStorageStreamCopier::class.java)

fun attemptAzureBlobWriteAndDelete(config: AzureBlobStorageConfig) {
var appendBlobClient: AppendBlobClient? = null
try {
Expand All @@ -249,7 +236,7 @@ abstract class AzureBlobStorageStreamCopier(
listCreatedBlob(containerClient)
} finally {
if (appendBlobClient != null && appendBlobClient.exists()) {
LOGGER.info("Deleting blob: " + appendBlobClient.blobName)
LOGGER.info { "Deleting blob: ${appendBlobClient.blobName}" }
appendBlobClient.delete()
}
}
Expand All @@ -260,16 +247,14 @@ abstract class AzureBlobStorageStreamCopier(
.listBlobs()
.forEach(
Consumer { blobItem: BlobItem ->
LOGGER.info(
"Blob name: " + blobItem.name + "Snapshot: " + blobItem.snapshot
)
LOGGER.info { "Blob name: ${blobItem.name} Snapshot: ${blobItem.snapshot}" }
}
)
}

private fun writeTestDataIntoBlob(appendBlobClient: AppendBlobClient?) {
val test = "test_data"
LOGGER.info("Writing test data to Azure Blob storage: $test")
LOGGER.info { "Writing test data to Azure Blob storage: $test" }
val dataStream: InputStream =
ByteArrayInputStream(test.toByteArray(StandardCharsets.UTF_8))

Expand All @@ -278,7 +263,7 @@ abstract class AzureBlobStorageStreamCopier(
.appendBlock(dataStream, test.length.toLong())
.blobCommittedBlockCount

LOGGER.info("blobCommittedBlockCount: $blobCommittedBlockCount")
LOGGER.info { "blobCommittedBlockCount: $blobCommittedBlockCount" }
}

private fun getBlobContainerClient(
Expand All @@ -291,9 +276,9 @@ abstract class AzureBlobStorageStreamCopier(

if (!appendBlobClient.exists()) {
appendBlobClient.create()
LOGGER.info("blobContainerClient created")
LOGGER.info { "blobContainerClient created" }
} else {
LOGGER.info("blobContainerClient already exists")
LOGGER.info { "blobContainerClient already exists" }
}
return containerClient
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.github.oshai.kotlinlogging.KotlinLogging
import java.math.BigDecimal
import java.sql.*
import java.sql.Date
Expand All @@ -21,16 +22,12 @@ import java.time.*
import java.time.chrono.IsoEra
import java.time.format.DateTimeParseException
import java.util.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}
/** Source operation skeleton for JDBC compatible databases. */
abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
JdbcCompatibleSourceOperations<Datatype> {

private val LOGGER: Logger =
LoggerFactory.getLogger(AbstractJdbcCompatibleSourceOperations::class.java)

@Throws(SQLException::class)
override fun convertDatabaseRowToAirbyteRecordData(queryContext: ResultSet): AirbyteRecordData {
// the first call communicates with the database. after that the result is cached.
Expand All @@ -47,12 +44,9 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
jsonNode.putNull(columnName)
LOGGER.info(
"Failed to serialize column: {}, of type {}, with error {}",
columnName,
columnTypeName,
e.message
)
LOGGER.info {
"Failed to serialize column: $columnName, of type $columnTypeName, with error ${e.message}"
}
AirbyteTraceMessageUtility.emitAnalyticsTrace(dataTypesSerializationErrorMessage())
metaChanges.add(
AirbyteRecordMessageMetaChange()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ package io.airbyte.cdk.db.jdbc

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.DataTypeUtils
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.*
import java.time.*
import java.time.format.DateTimeFormatter
import java.util.concurrent.*
import kotlin.math.abs
import kotlin.math.min
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}

object DateTimeConverter {
private val LOGGER: Logger = LoggerFactory.getLogger(DateTimeConverter::class.java)

val TIME_WITH_TIMEZONE_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]"
Expand All @@ -34,7 +35,7 @@ object DateTimeConverter {
else time.toString()
} else {
if (!loggedUnknownTimeWithTimeZoneClass) {
LOGGER.info("Unknown class for Time with timezone data type" + time.javaClass)
LOGGER.info { "Unknown class for Time with timezone data type ${time.javaClass}" }
loggedUnknownTimeWithTimeZoneClass = true
}
val timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER)
Expand Down Expand Up @@ -78,9 +79,9 @@ object DateTimeConverter {
return AbstractJdbcCompatibleSourceOperations.Companion.resolveEra(localDate, value)
} else {
if (!loggedUnknownTimestampWithTimeZoneClass) {
LOGGER.info(
"Unknown class for Timestamp with time zone data type" + timestamp.javaClass
)
LOGGER.info {
"Unknown class for Timestamp with time zone data type ${timestamp.javaClass}"
}
loggedUnknownTimestampWithTimeZoneClass = true
}
val instant = Instant.parse(timestamp.toString())
Expand Down Expand Up @@ -123,7 +124,7 @@ object DateTimeConverter {
)
} else {
if (!loggedUnknownTimestampClass) {
LOGGER.info("Unknown class for Timestamp data type" + timestamp.javaClass)
LOGGER.info { "Unknown class for Timestamp data type ${timestamp.javaClass}" }
loggedUnknownTimestampClass = true
}
val localDateTime = LocalDateTime.parse(timestamp.toString())
Expand Down Expand Up @@ -158,7 +159,7 @@ object DateTimeConverter {
return LocalDate.ofEpochDay(date.toLong()).format(DataTypeUtils.DATE_FORMATTER)
} else {
if (!loggedUnknownDateClass) {
LOGGER.info("Unknown class for Date data type" + date.javaClass)
LOGGER.info { "Unknown class for Date data type${date.javaClass}" }
loggedUnknownDateClass = true
}
val localDate = LocalDate.parse(date.toString())
Expand All @@ -182,22 +183,22 @@ object DateTimeConverter {
} else {
val updatedValue =
min(abs(value.toDouble()), LocalTime.MAX.toNanoOfDay().toDouble()).toLong()
LOGGER.debug(
"Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its {}, converting to {} ",
value,
updatedValue
)
LOGGER.debug {
"Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its $value, converting to $updatedValue "
}
return formatTime(LocalTime.ofNanoOfDay(updatedValue))
}
} else {
if (!loggedUnknownTimeClass) {
LOGGER.info("Unknown class for Time data type" + time.javaClass)
LOGGER.info { "Unknown class for Time data type ${time.javaClass}" }
loggedUnknownTimeClass = true
}

val valueAsString = time.toString()
if (valueAsString.startsWith("24")) {
LOGGER.debug("Time value {} is above range, converting to 23:59:59", valueAsString)
LOGGER.debug {
"Time value ${valueAsString} is above range, converting to 23:59:59"
}
return LocalTime.MAX.toString()
}
return formatTime(LocalTime.parse(valueAsString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import io.airbyte.cdk.db.JdbcCompatibleSourceOperations
import io.airbyte.commons.exceptions.ConnectionErrorException
import io.airbyte.commons.functional.CheckedConsumer
import io.airbyte.commons.functional.CheckedFunction
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.*
import java.util.*
import java.util.function.Function
import java.util.stream.Stream
import javax.sql.DataSource
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val LOGGER = KotlinLogging.logger {}
/**
* Database object for interacting with a JDBC connection. Can be used for any JDBC compliant db.
*/
Expand Down Expand Up @@ -50,15 +50,13 @@ constructor(
): Stream<T> {
val connection = dataSource.connection
return JdbcDatabase.Companion.toUnsafeStream<T>(query.apply(connection), recordTransform)
.onClose(
Runnable {
try {
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
}
.onClose {
try {
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
}
)
}
}

@get:Throws(SQLException::class)
Expand Down Expand Up @@ -125,16 +123,12 @@ constructor(
.onClose(
Runnable {
try {
LOGGER.info("closing connection")
LOGGER.info { "closing connection" }
connection.close()
} catch (e: SQLException) {
throw RuntimeException(e)
}
}
)
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(DefaultJdbcDatabase::class.java)
}
}
Loading

0 comments on commit f74f5d9

Please sign in to comment.