Skip to content

Commit

Permalink
Bulk load CDK: switch to passing path around in integration tests (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Oct 22, 2024
1 parent 9cc083d commit 32691ad
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
- name: Fetch last commit id from remote branch [PULL REQUESTS]
if: github.event_name == 'pull_request'
id: fetch_last_commit_id_pr
run: echo "commit_id=$(git ls-remote --heads origin ${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
run: echo "commit_id=$(git ls-remote --heads origin refs/heads/${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
- name: Fetch last commit id from remote branch [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch'
id: fetch_last_commit_id_wd
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cdk_connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
- name: Fetch last commit id from remote branch [PULL REQUESTS]
if: github.event_name == 'pull_request'
id: fetch_last_commit_id_pr
run: echo "commit_id=$(git ls-remote --heads origin ${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
run: echo "commit_id=$(git ls-remote --heads origin refs/heads/${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
- name: Fetch last commit id from remote branch [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch'
id: fetch_last_commit_id_wd
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
- name: Fetch last commit id from remote branch [PULL REQUESTS]
if: github.event_name == 'pull_request'
id: fetch_last_commit_id_pr
run: echo "commit_id=$(git ls-remote --heads origin ${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
run: echo "commit_id=$(git ls-remote --heads origin refs/heads/${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
- name: Fetch last commit id from remote branch [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch'
id: fetch_last_commit_id_wd
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
- name: Fetch last commit id from remote branch [PULL REQUESTS]
if: github.event_name == 'pull_request'
id: fetch_last_commit_id_pr
run: echo "commit_id=$(git ls-remote --heads origin ${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
run: echo "commit_id=$(git ls-remote --heads origin refs/heads/${{ github.head_ref }} | cut -f 1)" >> $GITHUB_OUTPUT
- name: Test connectors [PULL REQUESTS]
if: github.event_name == 'pull_request'
uses: ./.github/actions/run-airbyte-ci
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ data object CliRunner {
/** Same as [source] but for destinations. */
fun destination(
op: String,
config: ConfigurationSpecification? = null,
configContents: String? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
inputStream: InputStream,
Expand All @@ -60,8 +60,9 @@ data object CliRunner {
.singleton(true)
.build()
val out = CliRunnerOutputStream()
val configPath: Path? = inputFileFromString(configContents)
val runnable: Runnable =
makeRunnable(op, config, catalog, state) { args: Array<String> ->
makeRunnable(op, configPath, catalog, state) { args: Array<String> ->
AirbyteDestinationRunner(
args,
featureFlags.systemEnv,
Expand All @@ -75,7 +76,7 @@ data object CliRunner {
/** Same as the other [destination] but with [AirbyteMessage] input. */
fun destination(
op: String,
config: ConfigurationSpecification? = null,
configContents: String? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
featureFlags: Set<FeatureFlag> = setOf(),
Expand All @@ -90,7 +91,14 @@ data object CliRunner {
baos.toByteArray()
}
val inputStream: InputStream = ByteArrayInputStream(inputJsonBytes)
return destination(op, config, catalog, state, inputStream, *featureFlags.toTypedArray())
return destination(
op,
configContents,
catalog,
state,
inputStream,
*featureFlags.toTypedArray()
)
}

private fun makeRunnable(
Expand All @@ -101,6 +109,16 @@ data object CliRunner {
connectorRunnerConstructor: (Array<String>) -> AirbyteConnectorRunner,
): Runnable {
val configFile: Path? = inputFile(config)
return makeRunnable(op, configFile, catalog, state, connectorRunnerConstructor)
}

private fun makeRunnable(
op: String,
configFile: Path?,
catalog: ConfiguredAirbyteCatalog?,
state: List<AirbyteStateMessage>?,
connectorRunnerConstructor: (Array<String>) -> AirbyteConnectorRunner,
): Runnable {
val catalogFile: Path? = inputFile(catalog)
val stateFile: Path? = inputFile(state)
val args: List<String> =
Expand All @@ -126,9 +144,10 @@ data object CliRunner {
get() = toSet().map { it.envVar.name to it.requiredEnvVarValue }.toMap()

private fun inputFile(contents: Any?): Path? =
contents?.let { inputFileFromString(Jsons.writeValueAsString(contents)) }

private fun inputFileFromString(contents: String?): Path? =
contents?.let {
Files.createTempFile(null, null).also { file ->
Files.writeString(file, Jsons.writeValueAsString(contents))
}
Files.createTempFile(null, null).also { file -> Files.writeString(file, contents) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import org.junit.jupiter.api.Test

class MockBasicFunctionalityIntegrationTest :
BasicFunctionalityIntegrationTest(
MockDestinationSpecification(),
MockDestinationSpecification.CONFIG,
MockDestinationSpecification::class.java,
MockDestinationDataDumper,
NoopDestinationCleaner,
NoopExpectedRecordMapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ class MockDestinationConfiguration : DestinationConfiguration() {
override val recordBatchSizeBytes = 10 * 1024L
}

@Singleton class MockDestinationSpecification : ConfigurationSpecification()
@Singleton
class MockDestinationSpecification : ConfigurationSpecification() {
companion object {
const val CONFIG: String = "{}"
}
}

@Singleton
class MockDestinationConfigurationFactory :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.cdk.load.check

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
Expand All @@ -23,7 +22,7 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll

data class CheckTestConfig(val configPath: String, val featureFlags: Set<FeatureFlag> = emptySet())
data class CheckTestConfig(val configPath: Path, val featureFlags: Set<FeatureFlag> = emptySet())

open class CheckIntegrationTest<T : ConfigurationSpecification>(
val configurationClass: Class<T>,
Expand All @@ -38,12 +37,11 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
@Test
open fun testSuccessConfigs() {
for ((path, featureFlags) in successConfigFilenames) {
val fileContents = Files.readString(Path.of(path), StandardCharsets.UTF_8)
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val config = Files.readString(path, StandardCharsets.UTF_8)
val process =
destinationProcessFactory.createDestinationProcess(
"check",
config = config,
configContents = config,
featureFlags = featureFlags.toTypedArray(),
)
runBlocking { process.run() }
Expand All @@ -67,12 +65,11 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
open fun testFailConfigs() {
for ((checkTestConfig, failurePattern) in failConfigFilenamesAndFailureReasons) {
val (path, featureFlags) = checkTestConfig
val fileContents = Files.readString(Path.of(path))
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val config = Files.readString(path)
val process =
destinationProcessFactory.createDestinationProcess(
"check",
config = config,
configContents = config,
featureFlags = featureFlags.toTypedArray(),
)
runBlocking { process.run() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ abstract class IntegrationTest(

/** Convenience wrapper for [runSync] using a single stream. */
fun runSync(
config: ConfigurationSpecification,
configContents: String,
stream: DestinationStream,
messages: List<DestinationMessage>,
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
): List<AirbyteMessage> =
runSync(config, DestinationCatalog(listOf(stream)), messages, streamStatus)
runSync(configContents, DestinationCatalog(listOf(stream)), messages, streamStatus)

/**
* Run a sync with the given config+stream+messages, sending a trace message at the end of the
Expand All @@ -129,15 +129,15 @@ abstract class IntegrationTest(
* want to send multiple stream status messages).
*/
fun runSync(
config: ConfigurationSpecification,
configContents: String,
catalog: DestinationCatalog,
messages: List<DestinationMessage>,
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
): List<AirbyteMessage> {
val destination =
destinationProcessFactory.createDestinationProcess(
"write",
config,
configContents,
catalog.asProtocolObject(),
)
return runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.cdk.load.test.util.destination_process

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand Down Expand Up @@ -57,7 +56,7 @@ abstract class DestinationProcessFactory {

abstract fun createDestinationProcess(
command: String,
config: ConfigurationSpecification? = null,
configContents: String? = null,
catalog: ConfiguredAirbyteCatalog? = null,
vararg featureFlags: FeatureFlag,
): DestinationProcess
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.cdk.load.test.util.destination_process

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.util.Jsons
Expand Down Expand Up @@ -34,7 +33,7 @@ private val logger = KotlinLogging.logger {}
class DockerizedDestination(
imageTag: String,
command: String,
config: ConfigurationSpecification?,
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
private val testName: String,
vararg featureFlags: FeatureFlag,
Expand Down Expand Up @@ -118,16 +117,16 @@ class DockerizedDestination(
))
.toMutableList()

fun addInput(paramName: String, fileContents: Any) {
fun addInput(paramName: String, fileContents: ByteArray) {
Files.write(
jobRoot.resolve("destination_$paramName.json"),
Jsons.writeValueAsBytes(fileContents),
fileContents,
)
cmd.add("--$paramName")
cmd.add("destination_$paramName.json")
}
config?.let { addInput("config", it) }
catalog?.let { addInput("catalog", it) }
configContents?.let { addInput("config", it.toByteArray(Charsets.UTF_8)) }
catalog?.let { addInput("catalog", Jsons.writeValueAsBytes(catalog)) }

logger.info { "Executing command: ${cmd.joinToString(" ")}" }
process = ProcessBuilder(cmd).start()
Expand Down Expand Up @@ -231,14 +230,14 @@ class DockerizedDestinationFactory(
) : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,
config: ConfigurationSpecification?,
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
vararg featureFlags: FeatureFlag,
): DestinationProcess {
return DockerizedDestination(
"$imageName:$imageVersion",
command,
config,
configContents,
catalog,
testName,
*featureFlags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package io.airbyte.cdk.load.test.util.destination_process
import io.airbyte.cdk.ConnectorUncleanExitException
import io.airbyte.cdk.command.CliRunnable
import io.airbyte.cdk.command.CliRunner
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.protocol.models.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand All @@ -21,7 +20,7 @@ import kotlinx.coroutines.CompletableDeferred

class NonDockerizedDestination(
command: String,
config: ConfigurationSpecification?,
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
vararg featureFlags: FeatureFlag,
) : DestinationProcess {
Expand All @@ -42,7 +41,7 @@ class NonDockerizedDestination(
destination =
CliRunner.destination(
command,
config = config,
configContents = configContents,
catalog = catalog,
inputStream = destinationStdin,
featureFlags = featureFlags,
Expand Down Expand Up @@ -78,11 +77,11 @@ class NonDockerizedDestination(
class NonDockerizedDestinationFactory : DestinationProcessFactory() {
override fun createDestinationProcess(
command: String,
config: ConfigurationSpecification?,
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
vararg featureFlags: FeatureFlag,
): DestinationProcess {
// TODO pass test name into the destination process
return NonDockerizedDestination(command, config, catalog, *featureFlags)
return NonDockerizedDestination(command, configContents, catalog, *featureFlags)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.write

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
Expand Down Expand Up @@ -35,7 +36,9 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll

abstract class BasicFunctionalityIntegrationTest(
val config: ConfigurationSpecification,
/** The config to pass into the connector, as a serialized JSON blob */
val configContents: String,
val configSpecClass: Class<out ConfigurationSpecification>,
dataDumper: DestinationDataDumper,
destinationCleaner: DestinationCleaner,
recordMangler: ExpectedRecordMapper = NoopExpectedRecordMapper,
Expand All @@ -46,6 +49,8 @@ abstract class BasicFunctionalityIntegrationTest(
*/
val verifyDataWriting: Boolean = true,
) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper) {
val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configContents)

@Test
open fun testBasicWrite() {
val stream =
Expand All @@ -59,7 +64,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val messages =
runSync(
config,
configContents,
stream,
listOf(
DestinationRecord(
Expand Down Expand Up @@ -110,7 +115,7 @@ abstract class BasicFunctionalityIntegrationTest(
{
if (verifyDataWriting) {
dumpAndDiffRecords(
config,
ValidatedJsonUtils.parseOne(configSpecClass, configContents),
listOf(
OutputRecord(
extractedAt = 1234,
Expand Down Expand Up @@ -158,7 +163,7 @@ abstract class BasicFunctionalityIntegrationTest(
val destination =
destinationProcessFactory.createDestinationProcess(
"write",
config,
configContents,
DestinationCatalog(
listOf(
makeStream("test_stream1"),
Expand Down Expand Up @@ -267,7 +272,7 @@ abstract class BasicFunctionalityIntegrationTest(
}
)
if (verifyDataWriting) {
val records = dataDumper.dumpRecords(config, makeStream(streamName))
val records = dataDumper.dumpRecords(parsedConfig, makeStream(streamName))
val expectedId =
when (streamName) {
"test_stream1" -> 12
Expand Down
Loading

0 comments on commit 32691ad

Please sign in to comment.