Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

27787 fine tune sync parameters for ctid with large tables #27792

Draft
wants to merge 74 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
194a62c
initial ctid for testing
rodireich Jun 10, 2023
ec7198c
initial ctid for testing
rodireich Jun 11, 2023
1459acb
initial ctid for testing
rodireich Jun 13, 2023
8681c8c
Automated Commit - Format and Process Resources Changes
rodireich Jun 13, 2023
14371ea
add version and state type to xmin status
subodh1810 Jun 13, 2023
a4ecc7e
Merge remote-tracking branch 'origin/26486-initial-sync-using-ctid' i…
subodh1810 Jun 13, 2023
eb7255c
add logic to swtich between xmin and ctid sync
subodh1810 Jun 13, 2023
69e6989
Merge branch 'master' into 26486-initial-sync-using-ctid
subodh1810 Jun 13, 2023
a796a81
Merge branch '26486-initial-sync-using-ctid' into state-structure-cha…
subodh1810 Jun 13, 2023
2215f64
npe fixes
subodh1810 Jun 13, 2023
52d09a0
use enum
subodh1810 Jun 13, 2023
c6bebb5
refactor
subodh1810 Jun 15, 2023
286c5cd
add relation node logic + validation for vacuuming + more refactor
subodh1810 Jun 16, 2023
7950be8
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 18, 2023
7ef08e3
refine test + make PR ready for review
subodh1810 Jun 18, 2023
cba94db
remove un-wanted changes
subodh1810 Jun 18, 2023
378357a
missed this one
subodh1810 Jun 18, 2023
cb5bd39
remove irrelevant comments
subodh1810 Jun 18, 2023
0689f30
add more assertions
subodh1810 Jun 19, 2023
7ea3fdf
remove jdbc log
subodh1810 Jun 19, 2023
3ab062c
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 20, 2023
f7e9939
address review comments
subodh1810 Jun 20, 2023
06f3ea7
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 20, 2023
643aff7
skip streams under vacuum
subodh1810 Jun 20, 2023
0756579
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 20, 2023
134637c
update log message
subodh1810 Jun 20, 2023
1f67fac
Merge branch 'state-structure-change-xmin-ctid' of https://github.com…
subodh1810 Jun 20, 2023
1e57a69
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 20, 2023
f7dddd8
comment
rodireich Jun 20, 2023
06dcae4
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 21, 2023
854b414
latest round of review comments
subodh1810 Jun 21, 2023
65f4711
missed this file
subodh1810 Jun 21, 2023
63a14f7
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 21, 2023
52eb61d
🤖 Auto format source-postgres code [skip ci]
octavia-squidington-iii Jun 21, 2023
27fb1a2
Merge branch 'master' into state-structure-change-xmin-ctid
subodh1810 Jun 21, 2023
e7e89c0
initial drop for testing
rodireich Jun 24, 2023
0653433
Merge branch 'master' into 27558-break-a-long-running-ctid-query-into…
rodireich Jun 24, 2023
c5caac6
test
rodireich Jun 26, 2023
fa7db79
refactor query plan
rodireich Jun 27, 2023
6c18678
Merge branch 'master' into 27558-break-a-long-running-ctid-query-into…
rodireich Jun 27, 2023
3571e98
Add testing for query plan
rodireich Jun 27, 2023
0fa0da7
sanity
rodireich Jun 27, 2023
f203c63
sanity
rodireich Jun 27, 2023
43039fc
sanity
rodireich Jun 27, 2023
5d5052a
remove change in spec json
rodireich Jun 27, 2023
89c4ed0
sanity
rodireich Jun 27, 2023
d22e395
test
rodireich Jun 27, 2023
ac78f12
comment
rodireich Jun 27, 2023
2ffef6f
remove hardcoded testing value
rodireich Jun 27, 2023
3b6c7c8
add test case
rodireich Jun 27, 2023
a6e1ea8
revert json spec
rodireich Jun 27, 2023
53aff2f
test
rodireich Jun 28, 2023
2d4bca0
test
rodireich Jun 28, 2023
09b7a06
Merge branch 'master' into 27558-break-a-long-running-ctid-query-into…
alafanechere Jun 28, 2023
4cc5bea
Merge branch '27558-break-a-long-running-ctid-query-into-smaller-size…
alafanechere Jun 28, 2023
de85f62
test
rodireich Jun 28, 2023
a7939fa
test
rodireich Jun 28, 2023
689ced7
test
rodireich Jun 28, 2023
1b78b0f
test
rodireich Jun 28, 2023
bb37555
test
rodireich Jun 28, 2023
2c416a1
Automated Commit - Format and Process Resources Changes
rodireich Jun 28, 2023
3fa733e
test
rodireich Jun 28, 2023
d13ecff
Merge remote-tracking branch 'origin/27787-fine-tune-sync-parameters-…
rodireich Jun 28, 2023
f249110
test
rodireich Jun 29, 2023
403c865
small imrovements
rodireich Jun 30, 2023
622f4a1
Merge branch 'master' into 27558-break-a-long-running-ctid-query-into…
rodireich Jun 30, 2023
a4a6d00
typo
rodireich Jun 30, 2023
8d660f8
test
rodireich Jun 30, 2023
353192d
Merge branch '27558-break-a-long-running-ctid-query-into-smaller-size…
rodireich Jun 30, 2023
c96424f
fix test
rodireich Jun 30, 2023
75c933b
Merge branch '27558-break-a-long-running-ctid-query-into-smaller-size…
rodireich Jun 30, 2023
fe79713
Merge branch 'master' into 27787-fine-tune-sync-parameters-for-ctid-w…
rodireich Jun 30, 2023
0523484
test
rodireich Jul 2, 2023
78dd4e7
test
rodireich Jul 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add logic to swtich between xmin and ctid sync
  • Loading branch information
subodh1810 committed Jun 13, 2023
commit eb7255c2ff287d5049d95b9c65d6252849a6d12f
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import static java.util.stream.Collectors.toSet;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import datadog.trace.api.Trace;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down Expand Up @@ -70,11 +72,13 @@
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
Expand All @@ -85,7 +89,9 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -94,6 +100,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -455,17 +462,78 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
AutoCloseableIterators.lazyIterator(incrementalIteratorSupplier, null)));

} else if (PostgresUtils.isXmin(sourceConfig) && isIncrementalSyncMode(catalog)) {
final XminStateManager xminStateManager = new XminStateManager(stateManager.getRawStateMessages());
final PostgresXminHandler handler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);
return handler.getIncrementalIterators(catalog, tableNameToTable, emittedAt);
final List<AirbyteStateMessage> rawStateMessages = stateManager.getRawStateMessages();

final List<AirbyteStateMessage> statesFromCtidSync = new ArrayList<>();
final List<AirbyteStateMessage> statesFromXminSync = new ArrayList<>();

final Set<AirbyteStreamNameNamespacePair> alreadySeenStreams = new HashSet<>();
final Set<AirbyteStreamNameNamespacePair> streamsStillInCtidSync = new HashSet<>();
rawStateMessages.forEach(s -> {
final JsonNode streamState = s.getStream().getStreamState();
final StreamDescriptor streamDescriptor = s.getStream().getStreamDescriptor();
final AirbyteStateMessage clonedState = Jsons.clone(s);
if (streamState.has("type") && streamState.get("type").asText().equalsIgnoreCase("ctid")) {
statesFromCtidSync.add(clonedState);
streamsStillInCtidSync.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace()));
} else {
statesFromXminSync.add(clonedState);
}
alreadySeenStreams.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace()));
});

final List<ConfiguredAirbyteStream> newlyAddedStreams = identifyNewlyAddedStreams(catalog, alreadySeenStreams);

final List<ConfiguredAirbyteStream> configuredStreamsStillInCtidSync = catalog.getStreams().stream()
.filter(stream -> streamsStillInCtidSync.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.toList();

configuredStreamsStillInCtidSync.addAll(newlyAddedStreams);

final List<ConfiguredAirbyteStream> streamsInXminSync = catalog.getStreams().stream()
.filter(stream -> !configuredStreamsStillInCtidSync.contains(stream))
.map(Jsons::clone)
.toList();

final XminStateManager xminStateManager = new XminStateManager(statesFromXminSync);
final PostgresXminHandler xminHandler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);

final List<AutoCloseableIterator<AirbyteMessage>> xminIterator = xminHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(streamsInXminSync), tableNameToTable, emittedAt);

if (configuredStreamsStillInCtidSync.isEmpty()) {
return xminIterator;
}

final CtidStateManager ctidStateManager = new CtidStateManager(statesFromCtidSync);
final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(database, sourceOperations, getQuoteString(), ctidStateManager,
x -> new ObjectMapper().valueToTree(xminStatus),
(pair, jsonState) -> XminStateManager.getAirbyteStateMessage(pair, Jsons.object(jsonState, XminStatus.class)));
final List<AutoCloseableIterator<AirbyteMessage>> ctidIterator = ctidHandler.getIncrementalIterators(
new ConfiguredAirbyteCatalog().withStreams(configuredStreamsStillInCtidSync), tableNameToTable, emittedAt);
return Stream
.of(ctidIterator, xminIterator)
.flatMap(Collection::stream)
.collect(Collectors.toList());
} else {
final CtidStateManager ctidStateManager = new CtidStateManager(stateManager.getRawStateMessages());
final PostgresCtidHandler handler = new PostgresCtidHandler(database, sourceOperations, getQuoteString(), null, ctidStateManager);
return handler.getIncrementalIterators(catalog, tableNameToTable, emittedAt);
// return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt);
}
}

protected List<ConfiguredAirbyteStream> identifyNewlyAddedStreams(final ConfiguredAirbyteCatalog catalog, final Set<AirbyteStreamNameNamespacePair> alreadySeenStreams) {
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);

final Set<AirbyteStreamNameNamespacePair> newlyAddedStreams = new HashSet<>(Sets.difference(allStreams, alreadySeenStreams));

return catalog.getStreams().stream()
.filter(stream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.getStream())))
.map(Jsons::clone)
.collect(Collectors.toList());
}



@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
final String schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.CTID_STATUS_TYPE;
import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.CTID_STATUS_VERSION;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.integrations.source.postgres.internal.models.CtidStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.CheckForNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -22,11 +28,18 @@ public class CtidStateIterator extends AbstractIterator<AirbyteMessage> implemen
private boolean hasEmittedFinalState;
private boolean hasCaughtException = false;
private String lastCtid;
private final JsonNode streamStateForIncrementalRun;
final BiFunction<AirbyteStreamNameNamespacePair, JsonNode, AirbyteStateMessage> finalStateMessageSupplier;
final AtomicLong recordCount = new AtomicLong();

public CtidStateIterator(final Iterator<AirbyteMessage> messageIterator,
final AirbyteStreamNameNamespacePair pair) {
final AirbyteStreamNameNamespacePair pair,
final JsonNode streamStateForIncrementalRun,
final BiFunction<AirbyteStreamNameNamespacePair, JsonNode, AirbyteStateMessage> finalStateMessageSupplier) {
this.messageIterator = messageIterator;
this.pair = pair;
this.streamStateForIncrementalRun = streamStateForIncrementalRun;
this.finalStateMessageSupplier = finalStateMessageSupplier;
}

@CheckForNull
Expand All @@ -49,7 +62,8 @@ protected AirbyteMessage computeNext() {
new CtidStatus()
.withVer(CTID_STATUS_VERSION)
.withType(CTID_STATUS_TYPE)
.withCtid(lastCtid));
.withCtid(lastCtid)
.withIncrementalState(streamStateForIncrementalRun));
}
// Use try-catch to catch Exception that could occur when connection to the database fails
try {
Expand All @@ -68,11 +82,9 @@ protected AirbyteMessage computeNext() {
}
} else if (!hasEmittedFinalState) {
hasEmittedFinalState = true;
return CtidStateManager.createStateMessage(pair,
new CtidStatus()
.withVer(CTID_STATUS_VERSION)
.withType(CTID_STATUS_TYPE)
.withCtid(lastCtid));
return new AirbyteMessage()
.withType(Type.STATE)
.withState(finalStateMessageSupplier.apply(pair, streamStateForIncrementalRun));
} else {
return endOfData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
Expand All @@ -32,6 +33,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,18 +46,22 @@ public class PostgresCtidHandler {
private final JdbcDatabase database;
private final JdbcCompatibleSourceOperations<?> sourceOperations;
private final String quoteString;
private final CtidStatus ctidStatus;
private final CtidStateManager ctidStateManager;
private final Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;
private final BiFunction<AirbyteStreamNameNamespacePair, JsonNode, AirbyteStateMessage> finalStateMessageSupplier;

public PostgresCtidHandler(final JdbcDatabase database,
final JdbcCompatibleSourceOperations<?> sourceOperations,
final String quoteString,
final CtidStatus ctidStatus,
final CtidStateManager ctidStateManager) {
final CtidStateManager ctidStateManager,
final Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier,
final BiFunction<AirbyteStreamNameNamespacePair, JsonNode, AirbyteStateMessage> finalStateMessageSupplier) {
this.database = database;
this.sourceOperations = sourceOperations;
this.quoteString = quoteString;
this.ctidStatus = ctidStatus;
this.ctidStateManager = ctidStateManager;
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
this.finalStateMessageSupplier = finalStateMessageSupplier;
}
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final ConfiguredAirbyteCatalog catalog,
Expand Down Expand Up @@ -174,8 +182,13 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithLogs(final AutoCloseabl

private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseableIterator<AirbyteMessage> recordIterator,
final AirbyteStreamNameNamespacePair pair) {

final CtidStatus currentCtidStatus = ctidStateManager.getCtidStatus(pair);
final JsonNode incrementalState = currentCtidStatus.getIncrementalState() != null ? currentCtidStatus.getIncrementalState() : streamStateForIncrementalRunSupplier.apply(pair);


return AutoCloseableIterators.transform(
autoClosableIterator -> new CtidStateIterator(recordIterator, pair), recordIterator, pair);
autoClosableIterator -> new CtidStateIterator(recordIterator, pair, incrementalState, finalStateMessageSupplier ), recordIterator, pair);
}

private AutoCloseableIterator<AirbyteMessage> swallowCtid(final AutoCloseableIterator<AirbyteMessage> iterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public XminStatus getXminStatus(final AirbyteStreamNameNamespacePair pair) {
* @return AirbyteMessage which includes information on state of records read so far
*/
public static AirbyteMessage createStateMessage(final AirbyteStreamNameNamespacePair pair, final XminStatus xminStatus) {
final AirbyteStateMessage stateMessage = getAirbyteStateMessage(pair, xminStatus);

return new AirbyteMessage()
.withType(Type.STATE)
.withState(stateMessage);
}

public static AirbyteStateMessage getAirbyteStateMessage(final AirbyteStreamNameNamespacePair pair, final XminStatus xminStatus) {
final AirbyteStreamState airbyteStreamState =
new AirbyteStreamState()
.withStreamDescriptor(
Expand All @@ -86,10 +94,7 @@ public static AirbyteMessage createStateMessage(final AirbyteStreamNameNamespace
new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(airbyteStreamState);

return new AirbyteMessage()
.withType(Type.STATE)
.withState(stateMessage);
return stateMessage;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ definitions:
ctid:
description: ctid bookmark
type: string
incremental_state:
description: "State to switch to after completion of ctid snapsho"
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode