Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres on Resumable full refresh #37112

Merged
merged 45 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
000d09e
poc pr
xiaohansong Mar 29, 2024
701f879
test
xiaohansong Apr 1, 2024
6961a94
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 1, 2024
3d7e013
merge to head change
xiaohansong Apr 1, 2024
41848ed
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 4, 2024
3907d9d
save work
xiaohansong Apr 4, 2024
9164bfd
save work
xiaohansong Apr 5, 2024
92b71b6
Merge remote-tracking branch 'origin/master' into xiaohan/poc-rfr
xiaohansong Apr 5, 2024
651f088
save work
xiaohansong Apr 5, 2024
8183126
Merge remote-tracking branch 'origin/master' into xiaohan/cdk-rfr-int…
xiaohansong Apr 5, 2024
566e344
cdk change for rfr
xiaohansong Apr 5, 2024
c81fa4c
some clean up
xiaohansong Apr 5, 2024
3b54956
postgres on rfr
xiaohansong Apr 12, 2024
9508924
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong Apr 23, 2024
3f49239
save work
xiaohansong Apr 23, 2024
8293df0
save work
xiaohansong Apr 24, 2024
2f519e5
some fixes
xiaohansong Apr 24, 2024
88ec937
save work
xiaohansong Apr 25, 2024
a1f2c38
save work
xiaohansong Apr 26, 2024
39e8ff5
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong Apr 26, 2024
56cb7ee
fix for cdc
xiaohansong Apr 26, 2024
3a19f32
run tests
xiaohansong Apr 26, 2024
63d1589
format and one test fix
xiaohansong Apr 26, 2024
dc8d314
veresion bump
xiaohansong Apr 30, 2024
00ad230
sanity cleanup
xiaohansong Apr 30, 2024
f4d30c9
fix a test
xiaohansong Apr 30, 2024
967ca66
fix last bug
xiaohansong May 1, 2024
0b5ab01
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 6, 2024
6e682aa
apply fix
xiaohansong May 6, 2024
7e9e1d7
save work
xiaohansong May 7, 2024
c790ebf
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 7, 2024
306b140
fix a bug in the test
xiaohansong May 7, 2024
42640a9
fix
xiaohansong May 7, 2024
46672ad
close db source
xiaohansong May 7, 2024
7f80227
Merge branch 'master' into xiaohan/postgres-rfr
xiaohansong May 7, 2024
370e49c
add a fix
xiaohansong May 8, 2024
8992124
add one more fix
xiaohansong May 8, 2024
610e7e8
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 8, 2024
5163db4
noncdc only for check vacuum
xiaohansong May 8, 2024
efb6173
update criteria for support rfr
xiaohansong May 9, 2024
55c5b71
format
xiaohansong May 9, 2024
ccd8a7f
Merge remote-tracking branch 'origin/master' into xiaohan/postgres-rfr
xiaohansong May 9, 2024
2ffbfde
bump version
xiaohansong May 9, 2024
224e9dd
merge error
xiaohansong May 9, 2024
52095bf
toggle cdk dependency
xiaohansong May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
// sync, the
// data is replicated as expected.
@Throws(Exception::class)
fun testCdcAndNonResumableFullRefreshInSameSync() {
protected open fun testCdcAndNonResumableFullRefreshInSameSync() {
val configuredCatalog = Jsons.clone(configuredCatalog)

val MODEL_RECORDS_2: List<JsonNode> =
Expand All @@ -734,7 +734,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
createTableSqlFmt(),
modelsSchema(),
MODELS_STREAM_NAME_2,
columnClause(columns, Optional.of(COL_ID)),
columnClause(columns, Optional.empty()),
)

for (recordJson in MODEL_RECORDS_2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,10 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
setEmittedAtToNull(actualMessages)

val expectedMessages = airbyteMessagesReadOneColumn
Assertions.assertEquals(expectedMessages.size, actualMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
val actualRecordMessages = filterRecords(actualMessages)
Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages))
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
}

protected open val airbyteMessagesReadOneColumn: List<AirbyteMessage>
Expand Down Expand Up @@ -507,8 +508,6 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {

expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2))

System.out.println("catalog: " + catalog)

val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null))
val actualRecordMessages = filterRecords(actualMessages)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ java {
airbyteJavaConnector {
cdkVersionRequired = '0.31.5'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.32
dockerImageTag: 3.4.0
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector
this.lsn = null;
}

PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) {
public PostgresCdcConnectorMetadataInjector(final String transactionTimestamp, final Long lsn) {
this.transactionTimestamp = transactionTimestamp;
this.lsn = lsn;
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -34,7 +33,6 @@ public static CtidStreams streamsToSyncViaCtid(final CdcStateManager stateManage
return new CtidStreams(
fullCatalog.getStreams()
.stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.collect(Collectors.toList()),
new HashMap<>());
}
Expand Down Expand Up @@ -78,7 +76,6 @@ private static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Con
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySyncedStreams));
return catalog.getStreams().stream()
.filter(c -> c.getSyncMode() == SyncMode.INCREMENTAL)
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream()))).map(Jsons::clone)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState;
import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcCtidUtils.CtidStreams;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
Expand All @@ -33,29 +34,41 @@ public class CtidGlobalStateManager extends CtidStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(CtidGlobalStateManager.class);

private final CdcState cdcState;
private final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
private final StateManager stateManager;
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
private Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot;
private final boolean savedOffsetAfterReplicationSlotLSN;
private final CdcState defaultCdcState;

public CtidGlobalStateManager(final CtidStreams ctidStreams,
final FileNodeHandler fileNodeHandler,
final CdcState cdcState,
final ConfiguredAirbyteCatalog catalog) {
final StateManager stateManager,
final ConfiguredAirbyteCatalog catalog,
final boolean savedOffsetAfterReplicationSlotLSN,
final CdcState defaultCdcState) {
super(filterOutExpiredFileNodes(ctidStreams.pairToCtidStatus(), fileNodeHandler));
this.cdcState = cdcState;
this.streamsThatHaveCompletedSnapshot = initStreamsCompletedSnapshot(ctidStreams, catalog);
this.stateManager = stateManager;
this.savedOffsetAfterReplicationSlotLSN = savedOffsetAfterReplicationSlotLSN;
this.defaultCdcState = defaultCdcState;
initStream(ctidStreams, catalog);
this.fileNodeHandler = fileNodeHandler;
}

private static Set<AirbyteStreamNameNamespacePair> initStreamsCompletedSnapshot(final CtidStreams ctidStreams,
final ConfiguredAirbyteCatalog catalog) {
final Set<AirbyteStreamNameNamespacePair> streamsThatHaveCompletedSnapshot = new HashSet<>();
private void initStream(final CtidStreams ctidStreams,
final ConfiguredAirbyteCatalog catalog) {
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
this.resumableFullRefreshStreams = new HashSet<>();
catalog.getStreams().forEach(configuredAirbyteStream -> {
if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) || configuredAirbyteStream.getSyncMode() != SyncMode.INCREMENTAL) {
return;
if (!ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream) && configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
streamsThatHaveCompletedSnapshot.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
}
if (ctidStreams.streamsForCtidSync().contains(configuredAirbyteStream)
&& configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
this.resumableFullRefreshStreams.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
}
streamsThatHaveCompletedSnapshot.add(
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
});
return streamsThatHaveCompletedSnapshot;
}

private static Map<AirbyteStreamNameNamespacePair, CtidStatus> filterOutExpiredFileNodes(
Expand All @@ -79,37 +92,65 @@ private static Map<AirbyteStreamNameNamespacePair, CtidStatus> filterOutExpiredF
public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespacePair pair, final CtidStatus ctidStatus) {
pairToCtidStatus.put(pair, ctidStatus);
final List<AirbyteStreamState> streamStates = new ArrayList<>();

streamsThatHaveCompletedSnapshot.forEach(stream -> {
final DbStreamState state = getFinalState(stream);
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));

});
streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus))));
final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setSharedState(Jsons.jsonNode(cdcState));
globalState.setStreamStates(streamStates);

resumableFullRefreshStreams.forEach(stream -> {
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(stream);
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(ctidStatusForFullRefreshStream))));
});

if (!resumableFullRefreshStreams.contains(pair)) {
streamStates.add(getAirbyteStreamState(pair, (Jsons.jsonNode(ctidStatus))));
}

return new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(globalState);
.withGlobal(generateGlobalState(streamStates));
}

public AirbyteGlobalState generateGlobalState(final List<AirbyteStreamState> streamStates) {
final CdcState stateToBeUsed = getCdcState();
final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setSharedState(Jsons.jsonNode(stateToBeUsed));
globalState.setStreamStates(streamStates);
return globalState;

}

public CdcState getCdcState() {
final CdcState stateManagerCdcState = stateManager.getCdcStateManager().getCdcState();

return !savedOffsetAfterReplicationSlotLSN || stateManagerCdcState == null
|| stateManagerCdcState.getState() == null ? defaultCdcState
: stateManagerCdcState;

}

@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
streamsThatHaveCompletedSnapshot.add(pair);
// Only incremental streams can be transformed into the next phase.
if (!resumableFullRefreshStreams.contains(pair)) {
streamsThatHaveCompletedSnapshot.add(pair);
}
final List<AirbyteStreamState> streamStates = new ArrayList<>();
streamsThatHaveCompletedSnapshot.forEach(stream -> {
final DbStreamState state = getFinalState(stream);
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(state)));
});

final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setSharedState(Jsons.jsonNode(cdcState));
globalState.setStreamStates(streamStates);
resumableFullRefreshStreams.forEach(stream -> {
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair);
streamStates.add(getAirbyteStreamState(pair, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
});

return new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(globalState);
.withGlobal(generateGlobalState(streamStates));
}

private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespacePair pair, final JsonNode stateData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public AirbyteStateMessage createCtidStateMessage(final AirbyteStreamNameNamespa

@Override
public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun) {
if (streamStateForIncrementalRun == null || streamStateForIncrementalRun.isEmpty()) {
// resumeable full refresh for cursor based stream.
var ctidStatus = generateCtidStatusForState(pair);
return createCtidStateMessage(pair, ctidStatus);
}
return XminStateManager.getAirbyteStateMessage(pair, Jsons.object(streamStateForIncrementalRun, XminStatus.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.integrations.source.postgres.internal.models.InternalModels.StateType;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
Expand All @@ -27,13 +28,14 @@ public abstract class CtidStateManager implements SourceStateMessageProducer<Air
public static final String STATE_TYPE_KEY = "state_type";

protected final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus;
private Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;
protected Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;

private String lastCtid;
private FileNodeHandler fileNodeHandler;
protected String lastCtid;
protected FileNodeHandler fileNodeHandler;

protected CtidStateManager(final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus) {
this.pairToCtidStatus = pairToCtidStatus;
this.streamStateForIncrementalRunSupplier = namespacePair -> Jsons.emptyObject();
}

public CtidStatus getCtidStatus(final AirbyteStreamNameNamespacePair pair) {
Expand All @@ -55,26 +57,39 @@ public static boolean validateRelationFileNode(final CtidStatus ctidstatus,

public abstract AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun);

public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
FileNodeHandler fileNodeHandler) {
public void setStreamStateIteratorFields(Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier) {
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
}

public void setFileNodeHandler(final FileNodeHandler fileNodeHandler) {
this.fileNodeHandler = fileNodeHandler;
}

public FileNodeHandler getFileNodeHandler() {
return fileNodeHandler;
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
final CtidStatus ctidStatus = generateCtidStatusForState(pair);
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
return createCtidStateMessage(pair, ctidStatus);
}

protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) {
final Long fileNode = fileNodeHandler.getFileNode(pair);
assert fileNode != null;
final CtidStatus ctidStatus = new CtidStatus()
// If the table is empty, lastCtid will be set to zero for the final state message.
final String lastCtidInState = (Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString();
return new CtidStatus()
.withVersion(CTID_STATUS_VERSION)
.withStateType(StateType.CTID)
.withCtid(lastCtid)
.withCtid(lastCtidInState)
.withIncrementalState(getStreamState(pair))
.withRelationFilenode(fileNode);
LOGGER.info("Emitting ctid state for stream {}, state is {}", pair, ctidStatus);
return createCtidStateMessage(pair, ctidStatus);
}

/**
Expand Down Expand Up @@ -112,6 +127,7 @@ public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) {

private JsonNode getStreamState(final AirbyteStreamNameNamespacePair pair) {
final CtidStatus currentCtidStatus = getCtidStatus(pair);

return (currentCtidStatus == null || currentCtidStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair)
: currentCtidStatus.getIncrementalState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@

package io.airbyte.integrations.source.postgres.ctid;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils;
import io.airbyte.integrations.source.postgres.PostgresQueryUtils.TableBlockSize;
import io.airbyte.integrations.source.postgres.cdc.PostgresCdcConnectorMetadataInjector;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -25,14 +31,12 @@ public class CtidUtils {
public static final int POSTGRESQL_VERSION_TID_RANGE_SCAN_CAPABLE = 14;

public static List<ConfiguredAirbyteStream> identifyNewlyAddedStreams(final ConfiguredAirbyteCatalog fullCatalog,
final Set<AirbyteStreamNameNamespacePair> alreadySeenStreams,
final SyncMode syncMode) {
final Set<AirbyteStreamNameNamespacePair> alreadySeenStreams) {
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(fullCatalog);

final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySeenStreams));

return fullCatalog.getStreams().stream()
.filter(stream -> stream.getSyncMode() == syncMode)
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.collect(Collectors.toList());
Expand Down Expand Up @@ -75,4 +79,32 @@ public static boolean isTidRangeScanCapableDBServer(final JdbcDatabase database)
return true;
}

public static PostgresCtidHandler createInitialLoader(final JdbcDatabase database,
final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid,
final FileNodeHandler fileNodeHandler,
final String quoteString,
final CtidStateManager ctidStateManager,
Optional<PostgresCdcConnectorMetadataInjector> optionalMetadataInjector) {
final JsonNode sourceConfig = database.getSourceConfig();

final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, TableBlockSize> tableBlockSizes =
PostgresQueryUtils.getTableBlockSizeForStreams(
database,
finalListOfStreamsToBeSyncedViaCtid,
quoteString);

final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, Integer> tablesMaxTuple =
CtidUtils.isTidRangeScanCapableDBServer(database) ? null
: PostgresQueryUtils.getTableMaxTupleForStreams(database, finalListOfStreamsToBeSyncedViaCtid, quoteString);

return new PostgresCtidHandler(sourceConfig,
database,
new CtidPostgresSourceOperations(optionalMetadataInjector),
quoteString,
fileNodeHandler,
tableBlockSizes,
tablesMaxTuple,
ctidStateManager);
}

}
Loading
Loading