-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathJobPersistence.java
370 lines (310 loc) · 13.6 KB
/
JobPersistence.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.persistence.job;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.persistence.job.models.AttemptWithJobInfo;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobStatus;
import io.airbyte.persistence.job.models.JobWithStatusAndTimestamp;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
/**
* General interface methods for persistence to the Jobs database. This database is separate from
* the config database as job-related tables has an order of magnitude higher load and scale
* differently from the config tables.
*/
public interface JobPersistence {
//
// SIMPLE GETTERS
//
/**
* Convenience POJO for various stats data structures.
*
* @param combinedStats
* @param perStreamStats
*/
record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStats) {}
record JobAttemptPair(long id, int attemptNumber) {}
/**
* Retrieve the combined and per stream stats for a single attempt.
*
* @return {@link AttemptStats}
* @throws IOException
*/
AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException;
/**
* Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to
* avoid overloading the database with too many queries.
* <p>
* This implementation is intended to utilise complex joins under the hood to reduce the potential
* N+1 database pattern.
*
* @param jobIds
* @return
* @throws IOException
*/
Map<JobAttemptPair, AttemptStats> getAttemptStats(List<Long> jobIds) throws IOException;
List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;
Job getJob(long jobId) throws IOException;
//
// JOB LIFECYCLE
//
/**
* Enqueue a new job. Its initial status will be pending.
*
* @param scope key that will be used to determine if two jobs should not be run at the same time;
* it is the primary id of the standard sync (StandardSync#connectionId)
* @param jobConfig configuration for the job
* @return job id
* @throws IOException exception due to interaction with persistence
*/
Optional<Long> enqueueJob(String scope, JobConfig jobConfig) throws IOException;
/**
* Set job status from current status to PENDING. Throws {@link IllegalStateException} if the job is
* in a terminal state.
*
* @param jobId job to reset
* @throws IOException exception due to interaction with persistence
*/
void resetJob(long jobId) throws IOException;
/**
* Set job status from current status to CANCELLED. If already in a terminal status, no op.
*
* @param jobId job to cancel
* @throws IOException exception due to interaction with persistence
*/
void cancelJob(long jobId) throws IOException;
/**
* Set job status from current status to FAILED. If already in a terminal status, no op.
*
* @param jobId job to fail
* @throws IOException exception due to interaction with persistence
*/
void failJob(long jobId) throws IOException;
//
// ATTEMPT LIFECYCLE
//
/**
* Create a new attempt for a job and return its attempt number. Throws
* {@link IllegalStateException} if the job is already in a terminal state.
*
* @param jobId job for which an attempt will be created
* @param logPath path where logs should be written for the attempt
* @return The attempt number of the created attempt (see {@link DefaultJobPersistence})
* @throws IOException exception due to interaction with persistence
*/
int createAttempt(long jobId, Path logPath) throws IOException;
/**
* Sets an attempt to FAILED. Also attempts to set the parent job to INCOMPLETE. The job's status
* will not be changed if it is already in a terminal state.
*
* @param jobId job id
* @param attemptNumber attempt id
* @throws IOException exception due to interaction with persistence
*/
void failAttempt(long jobId, int attemptNumber) throws IOException;
/**
* Sets an attempt to SUCCEEDED. Also attempts to set the parent job to SUCCEEDED. The job's status
* is changed regardless of what state it is in.
*
* @param jobId job id
* @param attemptNumber attempt id
* @throws IOException exception due to interaction with persistence
*/
void succeedAttempt(long jobId, int attemptNumber) throws IOException;
//
// END OF LIFECYCLE
//
/**
* Sets an attempt's temporal workflow id. Later used to cancel the workflow.
*/
void setAttemptTemporalWorkflowInfo(long jobId, int attemptNumber, String temporalWorkflowId, String processingTaskQueue) throws IOException;
/**
* Retrieves an attempt's temporal workflow id. Used to cancel the workflow.
*/
Optional<String> getAttemptTemporalWorkflowId(long jobId, int attemptNumber) throws IOException;
/**
* When the output is a StandardSyncOutput, caller of this method should persiste
* StandardSyncOutput#state in the configs database by calling
* ConfigRepository#updateConnectionState, which takes care of persisting the connection state.
*/
void writeOutput(long jobId, int attemptNumber, JobOutput output) throws IOException;
void writeStats(long jobId,
int attemptNumber,
long estimatedRecords,
long estimatedBytes,
long recordsEmitted,
long bytesEmitted,
List<StreamSyncStats> streamStats)
throws IOException;
/**
* Writes a summary of all failures that occurred during the attempt.
*
* @param jobId job id
* @param attemptNumber attempt number
* @param failureSummary summary containing failure metadata and ordered list of failures
* @throws IOException exception due to interaction with persistence
*/
void writeAttemptFailureSummary(long jobId, int attemptNumber, AttemptFailureSummary failureSummary) throws IOException;
/**
* Writes the attempt-specific configuration used to build the sync input during the attempt.
*
* @param jobId job id
* @param attemptNumber attempt number
* @param attemptSyncConfig attempt-specific configuration used to build the sync input for this
* attempt
* @throws IOException exception due to interaction with persistence
*/
void writeAttemptSyncConfig(long jobId, int attemptNumber, AttemptSyncConfig attemptSyncConfig) throws IOException;
/**
* @param configTypes - the type of config, e.g. sync
* @param connectionId - ID of the connection for which the job count should be retrieved
* @return count of jobs belonging to the specified connection
* @throws IOException
*/
Long getJobCount(final Set<ConfigType> configTypes, final String connectionId) throws IOException;
/**
* @param configTypes - type of config, e.g. sync
* @param configId - id of that config
* @return lists job in descending order by created_at
* @throws IOException - what you do when you IO
*/
List<Job> listJobs(Set<JobConfig.ConfigType> configTypes, String configId, int limit, int offset) throws IOException;
/**
* @param configType The type of job
* @param attemptEndedAtTimestamp The timestamp after which you want the jobs
* @return List of jobs that have attempts after the provided timestamp
* @throws IOException
*/
List<Job> listJobs(ConfigType configType, Instant attemptEndedAtTimestamp) throws IOException;
List<Job> listJobs(JobConfig.ConfigType configType, String configId, int limit, int offset) throws IOException;
/**
* @param configTypes - type of config, e.g. sync
* @param connectionId - id of the connection for which jobs should be retrieved
* @param includingJobId - id of the job that should be the included in the list, if it exists in
* the connection
* @param pagesize - the pagesize that should be used when building the list (response may include
* multiple pages)
* @return List of jobs in descending created_at order including the specified job. Will include
* multiple pages of jobs if required to include the specified job. If the specified job
* does not exist in the connection, the returned list will be empty.
* @throws IOException
*/
List<Job> listJobsIncludingId(Set<JobConfig.ConfigType> configTypes, String connectionId, long includingJobId, int pagesize) throws IOException;
List<Job> listJobsWithStatus(JobStatus status) throws IOException;
List<Job> listJobsWithStatus(Set<JobConfig.ConfigType> configTypes, JobStatus status) throws IOException;
List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus status) throws IOException;
List<Job> listJobsForConnectionWithStatuses(UUID connectionId, Set<JobConfig.ConfigType> configTypes, Set<JobStatus> statuses) throws IOException;
/**
* @param connectionId The ID of the connection
* @param configTypes The types of jobs
* @param jobCreatedAtTimestamp The timestamp after which you want the jobs
* @return List of jobs that only include information regarding id, status, timestamps from a
* specific connection that have attempts after the provided timestamp, sorted by jobs'
* createAt in descending order
* @throws IOException
*/
List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID connectionId,
Set<JobConfig.ConfigType> configTypes,
Instant jobCreatedAtTimestamp)
throws IOException;
Optional<Job> getLastReplicationJob(UUID connectionId) throws IOException;
Optional<Job> getLastSyncJob(UUID connectionId) throws IOException;
List<Job> getLastSyncJobForConnections(final List<UUID> connectionIds) throws IOException;
List<Job> getRunningSyncJobForConnections(final List<UUID> connectionIds) throws IOException;
Optional<Job> getFirstReplicationJob(UUID connectionId) throws IOException;
Optional<Job> getNextJob() throws IOException;
/**
* @param configType The type of job
* @param attemptEndedAtTimestamp The timestamp after which you want the attempts
* @return List of attempts (with job attached) that ended after the provided timestamp, sorted by
* attempts' endedAt in ascending order
* @throws IOException
*/
List<AttemptWithJobInfo> listAttemptsWithJobInfo(ConfigType configType, Instant attemptEndedAtTimestamp) throws IOException;
/// ARCHIVE
/**
* Returns the AirbyteVersion.
*/
Optional<String> getVersion() throws IOException;
/**
* Set the airbyte version
*/
void setVersion(String airbyteVersion) throws IOException;
/**
* Get the max supported Airbyte Protocol Version
*/
Optional<Version> getAirbyteProtocolVersionMax() throws IOException;
/**
* Set the max supported Airbyte Protocol Version
*/
void setAirbyteProtocolVersionMax(Version version) throws IOException;
/**
* Get the min supported Airbyte Protocol Version
*/
Optional<Version> getAirbyteProtocolVersionMin() throws IOException;
/**
* Set the min supported Airbyte Protocol Version
*/
void setAirbyteProtocolVersionMin(Version version) throws IOException;
/**
* Get the current Airbyte Protocol Version range if defined
*/
Optional<AirbyteProtocolVersionRange> getCurrentProtocolVersionRange() throws IOException;
/**
* Returns a deployment UUID.
*/
Optional<UUID> getDeployment() throws IOException;
// a deployment references a setup of airbyte. it is created the first time the docker compose or
// K8s is ready.
/**
* Set deployment id. If one is already set, the new value is ignored.
*/
void setDeployment(UUID uuid) throws IOException;
/**
* Export all SQL tables from @param schema into streams of JsonNode objects. This returns a Map of
* table schemas to the associated streams of records that is being exported.
*/
Map<JobsDatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException;
/**
* Import all SQL tables from streams of JsonNode objects.
*
* @param data is a Map of table schemas to the associated streams of records to import.
* @param airbyteVersion is the version of the files to be imported and should match the Airbyte
* version in the Database.
*/
void importDatabase(String airbyteVersion, Map<JobsDatabaseSchema, Stream<JsonNode>> data) throws IOException;
/**
* Purges job history while ensuring that the latest saved-state information is maintained.
*/
void purgeJobHistory();
/**
* Check if the secret has been migrated to a new secret store from a plain text values
*/
boolean isSecretMigrated() throws IOException;
/**
* Set that the secret migration has been performed.
*/
void setSecretMigrationDone() throws IOException;
List<AttemptNormalizationStatus> getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException;
}