Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres on Resumable full refresh #37112

Merged
merged 45 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
000d09e
poc pr
xiaohansong Mar 29, 2024
701f879
test
xiaohansong Apr 1, 2024
6961a94
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 1, 2024
3d7e013
merge to head change
xiaohansong Apr 1, 2024
41848ed
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 4, 2024
3907d9d
save work
xiaohansong Apr 4, 2024
9164bfd
save work
xiaohansong Apr 5, 2024
92b71b6
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 5, 2024
651f088
save work
xiaohansong Apr 5, 2024
8183126
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 5, 2024
566e344
cdk change for rfr
xiaohansong Apr 5, 2024
c81fa4c
some clean up
xiaohansong Apr 5, 2024
3b54956
postgres on rfr
xiaohansong Apr 12, 2024
9508924
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong Apr 23, 2024
3f49239
save work
xiaohansong Apr 23, 2024
8293df0
save work
xiaohansong Apr 24, 2024
2f519e5
some fixes
xiaohansong Apr 24, 2024
88ec937
save work
xiaohansong Apr 25, 2024
a1f2c38
save work
xiaohansong Apr 26, 2024
39e8ff5
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong Apr 26, 2024
56cb7ee
fix for cdc
xiaohansong Apr 26, 2024
3a19f32
run tests
xiaohansong Apr 26, 2024
63d1589
format and one test fix
xiaohansong Apr 26, 2024
dc8d314
veresion bump
xiaohansong Apr 30, 2024
00ad230
sanity cleanup
xiaohansong Apr 30, 2024
f4d30c9
fix a test
xiaohansong Apr 30, 2024
967ca66
fix last bug
xiaohansong May 1, 2024
0b5ab01
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 6, 2024
6e682aa
apply fix
xiaohansong May 6, 2024
7e9e1d7
save work
xiaohansong May 7, 2024
c790ebf
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 7, 2024
306b140
fix a bug in the test
xiaohansong May 7, 2024
42640a9
fix
xiaohansong May 7, 2024
46672ad
close db source
xiaohansong May 7, 2024
7f80227
Merge branch 'master' into xiaohan/postgres-rfr
xiaohansong May 7, 2024
370e49c
add a fix
xiaohansong May 8, 2024
8992124
add one more fix
xiaohansong May 8, 2024
610e7e8
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 8, 2024
5163db4
noncdc only for check vacuum
xiaohansong May 8, 2024
efb6173
update criteria for support rfr
xiaohansong May 9, 2024
55c5b71
format
xiaohansong May 9, 2024
ccd8a7f
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 9, 2024
2ffbfde
bump version
xiaohansong May 9, 2024
224e9dd
merge error
xiaohansong May 9, 2024
52095bf
toggle cdk dependency
xiaohansong May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import io.airbyte.cdk.integrations.base.Source
import io.airbyte.cdk.integrations.source.jdbc.dto.JdbcPrivilegeDto
import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
Expand All @@ -54,6 +55,7 @@ import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.commons.util.AutoCloseableIterators
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand All @@ -62,6 +64,7 @@ import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.time.Instant
import java.util.*
import java.util.function.Consumer
import java.util.function.Function
Expand All @@ -84,7 +87,7 @@ import org.slf4j.LoggerFactory
abstract class AbstractJdbcSource<Datatype>(
driverClass: String,
@JvmField val streamingQueryConfigProvider: Supplier<JdbcStreamingQueryConfig>,
sourceOperations: JdbcCompatibleSourceOperations<Datatype>
sourceOperations: JdbcCompatibleSourceOperations<Datatype>,
) : AbstractDbSource<Datatype, JdbcDatabase>(driverClass), Source {
@JvmField val sourceOperations: JdbcCompatibleSourceOperations<Datatype>

Expand All @@ -95,6 +98,61 @@ abstract class AbstractJdbcSource<Datatype>(
this.sourceOperations = sourceOperations
}

open fun supportResumableFullRefresh(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream
): Boolean {
return false
}

open fun getInitialLoadHandler(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?
): InitialLoadHandler<Datatype>? {
return null
}

override fun getFullRefreshStream(
database: JdbcDatabase,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?,
namespace: String,
selectedDatabaseFields: List<String>,
table: TableInfo<CommonField<Datatype>>,
emittedAt: Instant,
syncMode: SyncMode,
cursorField: Optional<String>
): AutoCloseableIterator<AirbyteMessage> {
if (
supportResumableFullRefresh(database, airbyteStream) &&
syncMode == SyncMode.FULL_REFRESH
) {
val initialLoadHandler =
getInitialLoadHandler(database, airbyteStream, catalog, stateManager)
?: throw IllegalStateException(
"Must provide initialLoadHandler for resumable full refresh."
)
return initialLoadHandler.getIteratorForStream(airbyteStream, table, Instant.now())
}

// If flag is off, fall back to legacy non-resumable refresh
return super.getFullRefreshStream(
database,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
emittedAt,
syncMode,
cursorField,
)
}

override fun queryTableFullRefresh(
database: JdbcDatabase,
columnNames: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ protected constructor(driverClassName: String) :
this.getAirbyteType(columnType)
}

initializeForStateManager(database, catalog, fullyQualifiedTableNameToInfo, stateManager)

val incrementalIterators =
getIncrementalIterators(
database,
Expand Down Expand Up @@ -188,6 +190,15 @@ protected constructor(driverClassName: String) :
}
}

// Optional - perform any initialization logic before read. For example, source connector
// can choose to load up state manager here.
protected open fun initializeForStateManager(
database: Database,
catalog: ConfiguredAirbyteCatalog,
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
stateManager: StateManager
) {}

@Throws(SQLException::class)
protected fun validateCursorFieldForIncrementalTables(
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
Expand Down Expand Up @@ -380,7 +391,14 @@ protected constructor(driverClassName: String) :

val table = tableNameToTable[fullyQualifiedTableName]!!
val tableReadIterator =
createReadIterator(database, airbyteStream, table, stateManager, emittedAt)
createReadIterator(
database,
airbyteStream,
catalog,
table,
stateManager,
emittedAt
)
iteratorList.add(tableReadIterator)
}
}
Expand All @@ -401,6 +419,7 @@ protected constructor(driverClassName: String) :
private fun createReadIterator(
database: Database,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
table: TableInfo<CommonField<DataType>>,
stateManager: StateManager?,
emittedAt: Instant
Expand Down Expand Up @@ -442,7 +461,9 @@ protected constructor(driverClassName: String) :
airbyteMessageIterator =
getFullRefreshStream(
database,
streamName,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
Expand Down Expand Up @@ -475,7 +496,9 @@ protected constructor(driverClassName: String) :
iterator =
getFullRefreshStream(
database,
streamName,
airbyteStream,
catalog,
stateManager,
namespace,
selectedDatabaseFields,
table,
Expand Down Expand Up @@ -560,18 +583,22 @@ protected constructor(driverClassName: String) :
* Creates a AirbyteMessageIterator that contains all records for a database source connection
*
* @param database Source Database
* @param streamName name of an individual stream in which a stream represents a source (e.g.
* @param airbyteStream name of an individual stream in which a stream represents a source (e.g.
* API endpoint or database table)
* @param catalog List of streams (e.g. database tables or API endpoints) with settings on sync
* @param stateManager tracking the state from previous sync; used for resumable full refresh.
* @param namespace Namespace of the database (e.g. public)
* @param selectedDatabaseFields List of all interested database column names
* @param table information in tabular format
* @param emittedAt Time when data was emitted from the Source database
* @param syncMode The sync mode that this full refresh stream should be associated with.
* @return AirbyteMessageIterator with all records for a database source
*/
private fun getFullRefreshStream(
protected open fun getFullRefreshStream(
database: Database,
streamName: String,
airbyteStream: ConfiguredAirbyteStream,
catalog: ConfiguredAirbyteCatalog?,
stateManager: StateManager?,
namespace: String,
selectedDatabaseFields: List<String>,
table: TableInfo<CommonField<DataType>>,
Expand All @@ -588,7 +615,12 @@ protected constructor(driverClassName: String) :
syncMode,
cursorField
)
return getMessageIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli())
return getMessageIterator(
queryStream,
airbyteStream.stream.name,
namespace,
emittedAt.toEpochMilli()
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.source.relationaldb

import io.airbyte.commons.util.AutoCloseableIterator
import io.airbyte.protocol.models.CommonField
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.time.Instant

interface InitialLoadHandler<T> {
fun getIteratorForStream(
airbyteStream: ConfiguredAirbyteStream,
table: TableInfo<CommonField<T>>,
emittedAt: Instant
): AutoCloseableIterator<AirbyteMessage>
}
Loading
Loading