-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathDefaultJobCreator.java
148 lines (136 loc) · 7.69 KB
/
DefaultJobCreator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.persistence.job;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.version.Version;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.protocol.models.SyncMode;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DefaultJobCreator implements JobCreator {
private final JobPersistence jobPersistence;
private final ResourceRequirements workerResourceRequirements;
public DefaultJobCreator(final JobPersistence jobPersistence,
final ResourceRequirements workerResourceRequirements) {
this.jobPersistence = jobPersistence;
this.workerResourceRequirements = workerResourceRequirements;
}
@Override
public Optional<Long> createSyncJob(final SourceConnection source,
final DestinationConnection destination,
final StandardSync standardSync,
final String sourceDockerImageName,
final Version sourceProtocolVersion,
final String destinationDockerImageName,
final Version destinationProtocolVersion,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final JsonNode webhookOperationConfigs,
final StandardSourceDefinition sourceDefinition,
final StandardDestinationDefinition destinationDefinition,
final UUID workspaceId)
throws IOException {
// reusing this isn't going to quite work.
final ResourceRequirements mergedOrchestratorResourceReq = ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
workerResourceRequirements);
final ResourceRequirements mergedSrcResourceReq = ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
sourceDefinition.getResourceRequirements(),
workerResourceRequirements,
JobType.SYNC);
final ResourceRequirements mergedDstResourceReq = ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
destinationDefinition.getResourceRequirements(),
workerResourceRequirements,
JobType.SYNC);
final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withNamespaceDefinition(standardSync.getNamespaceDefinition())
.withNamespaceFormat(standardSync.getNamespaceFormat())
.withPrefix(standardSync.getPrefix())
.withSourceDockerImage(sourceDockerImageName)
.withSourceProtocolVersion(sourceProtocolVersion)
.withDestinationDockerImage(destinationDockerImageName)
.withDestinationProtocolVersion(destinationProtocolVersion)
.withOperationSequence(standardSyncOperations)
.withWebhookOperationConfigs(webhookOperationConfigs)
.withConfiguredAirbyteCatalog(standardSync.getCatalog())
.withResourceRequirements(mergedOrchestratorResourceReq)
.withSourceResourceRequirements(mergedSrcResourceReq)
.withDestinationResourceRequirements(mergedDstResourceReq)
.withIsSourceCustomConnector(sourceDefinition.getCustom())
.withIsDestinationCustomConnector(destinationDefinition.getCustom())
.withWorkspaceId(workspaceId);
final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.SYNC)
.withSync(jobSyncConfig);
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}
@Override
public Optional<Long> createResetConnectionJob(final DestinationConnection destination,
final StandardSync standardSync,
final String destinationDockerImage,
final Version destinationProtocolVersion,
final boolean isDestinationCustomConnector,
final List<StandardSyncOperation> standardSyncOperations,
final List<StreamDescriptor> streamsToReset)
throws IOException {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = standardSync.getCatalog();
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> {
final StreamDescriptor streamDescriptor = CatalogHelpers.extractDescriptor(configuredAirbyteStream);
if (streamsToReset.contains(streamDescriptor)) {
// The Reset Source will emit no record messages for any streams, so setting the destination sync
// mode to OVERWRITE will empty out this stream in the destination.
// Note: streams in streamsToReset that are NOT in this configured catalog (i.e. deleted streams)
// will still have their state reset by the Reset Source, but will not be modified in the
// destination since they are not present in the catalog that is sent to the destination.
configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH);
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
} else {
// Set streams that are not being reset to APPEND so that they are not modified in the destination
if (configuredAirbyteStream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) {
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.APPEND);
}
}
});
final JobResetConnectionConfig resetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(standardSync.getNamespaceDefinition())
.withNamespaceFormat(standardSync.getNamespaceFormat())
.withPrefix(standardSync.getPrefix())
.withDestinationDockerImage(destinationDockerImage)
.withDestinationProtocolVersion(destinationProtocolVersion)
.withOperationSequence(standardSyncOperations)
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog)
.withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
workerResourceRequirements))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset))
.withIsSourceCustomConnector(false)
.withIsDestinationCustomConnector(isDestinationCustomConnector);
final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(resetConnectionConfig);
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}
}