Skip to content

Commit eddd745

Browse files
refactor(MonitorableJobWithResourceLock): Extract resource lock mgmt code to separate class.
1 parent 22ad6ac commit eddd745

File tree

3 files changed

+260
-233
lines changed

3 files changed

+260
-233
lines changed
Lines changed: 145 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.conveyal.datatools.manager.jobs;
22

3-
import com.conveyal.datatools.common.status.MonitorableJob;
43
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
54
import com.conveyal.datatools.manager.models.Deployment;
65
import com.conveyal.datatools.manager.models.FeedVersion;
@@ -12,7 +11,6 @@
1211
import org.slf4j.LoggerFactory;
1312

1413
import java.util.Collection;
15-
import java.util.Collections;
1614
import java.util.Date;
1715
import java.util.HashSet;
1816
import java.util.LinkedList;
@@ -32,37 +30,38 @@
3230
* If there are related feed fetches in progress auto deploy is skipped but the deployment's feed versions are still
3331
* advanced to the latest versions.
3432
*/
35-
public class AutoDeployJob extends MonitorableJob {
33+
public class AutoDeployJob extends MonitorableJobWithResourceLock<Project> {
3634
public static final Logger LOG = LoggerFactory.getLogger(AutoDeployJob.class);
3735

3836
/**
3937
* Project to be considered for auto deployment.
4038
*/
4139
private final Project project;
4240

43-
/**
44-
* A set of projects which have been locked by a instance of {@link AutoDeployJob} to prevent repeat
45-
* deployments.
46-
*/
47-
private static final Set<String> lockedProjects = Collections.synchronizedSet(new HashSet<>());
41+
private final Deployment deployment;
4842

4943
/**
5044
* Auto deploy specific project.
5145
*/
5246
public AutoDeployJob(Project project, Auth0UserProfile owner) {
53-
super(owner, "Auto Deploy Feed", JobType.AUTO_DEPLOY_FEED_VERSION);
47+
super(
48+
owner,
49+
"Auto Deploy Feed",
50+
JobType.AUTO_DEPLOY_FEED_VERSION,
51+
project,
52+
project.name
53+
);
5454
this.project = project;
55+
deployment = Persistence.deployments.getById(project.pinnedDeploymentId);
5556
}
5657

5758
@Override
5859
public void jobLogic() {
59-
Deployment deployment = Persistence.deployments.getById(project.pinnedDeploymentId);
6060
// Define if project and deployment are candidates for auto deploy.
6161
if (
6262
project.pinnedDeploymentId == null ||
6363
deployment == null ||
64-
deployment.feedVersionIds.isEmpty() ||
65-
lockedProjects.contains(project.id)
64+
deployment.feedVersionIds.isEmpty()
6665
) {
6766
String message = String.format(
6867
"Project %s skipped for auto deployment as required criteria not met.",
@@ -89,171 +88,157 @@ public void jobLogic() {
8988
return;
9089
}
9190

92-
try {
93-
synchronized (lockedProjects) {
94-
if (!lockedProjects.contains(project.id)) {
95-
lockedProjects.add(project.id);
96-
LOG.info("Auto deploy lock added for project id: {}", project.id);
97-
} else {
98-
LOG.warn("Unable to acquire lock for project {}", project.name);
99-
status.fail(String.format("Project %s is locked for auto-deployments.", project.name));
100-
return;
101-
}
102-
}
103-
LOG.info("Auto deploy task running for project {}", project.name);
104-
105-
// Get the most recently used server.
106-
String latestServerId = deployment.latest().serverId;
107-
OtpServer server = Persistence.servers.getById(latestServerId);
108-
if (server == null) {
109-
String message = String.format(
110-
"Server with id %s no longer exists. Skipping deployment for project %s.",
111-
latestServerId,
112-
project.name
113-
);
114-
LOG.warn(message);
115-
status.fail(message);
116-
return;
117-
}
91+
// Super class handles lock management and will trigger innerJobLogic.
92+
super.jobLogic();
93+
}
11894

119-
// Analyze and update feed versions in deployment.
120-
Collection<String> updatedFeedVersionIds = new LinkedList<>();
121-
List<FeedVersion> latestVersionsWithCriticalErrors = new LinkedList<>();
122-
List<Deployment.SummarizedFeedVersion> previousFeedVersions = deployment.retrieveFeedVersions();
123-
boolean shouldWaitForNewFeedVersions = false;
95+
@Override
96+
protected void innerJobLogic() {
97+
LOG.info("Auto deploy task running for project {}", project.name);
12498

125-
// Production ready feed versions.
126-
List<Deployment.SummarizedFeedVersion> pinnedFeedVersions = deployment.retrievePinnedFeedVersions();
127-
Set<String> pinnedFeedSourceIds = new HashSet<>(
128-
pinnedFeedVersions
129-
.stream()
130-
.map(pinnedFeedVersion -> pinnedFeedVersion.feedSource.id)
131-
.collect(Collectors.toList())
99+
// Get the most recently used server.
100+
String latestServerId = deployment.latest().serverId;
101+
OtpServer server = Persistence.servers.getById(latestServerId);
102+
if (server == null) {
103+
String message = String.format(
104+
"Server with id %s no longer exists. Skipping deployment for project %s.",
105+
latestServerId,
106+
project.name
132107
);
108+
LOG.warn(message);
109+
status.fail(message);
110+
return;
111+
}
133112

134-
// Iterate through each feed version for deployment.
135-
for (
136-
Deployment.SummarizedFeedVersion currentDeploymentFeedVersion : previousFeedVersions
137-
) {
138-
// Retrieve the latest feed version associated with the feed source of the current
139-
// feed version set for the deployment.
140-
FeedVersion latestFeedVersion = currentDeploymentFeedVersion.feedSource.retrieveLatest();
141-
// Make sure the latest feed version is not going to supersede a pinned feed version.
142-
if (pinnedFeedSourceIds.contains(latestFeedVersion.feedSourceId)) {
143-
continue;
144-
}
145-
146-
// Update to the latest feed version.
147-
updatedFeedVersionIds.add(latestFeedVersion.id);
148-
149-
// Throttle this auto-deployment if needed. For projects that haven't yet been auto-deployed, don't
150-
// wait and go ahead with the auto-deployment. But if the project has been auto-deployed before and
151-
// if the latest feed version was created before the last time the project was auto-deployed and
152-
// there are currently-active jobs that could result in an updated feed version being created, then
153-
// this auto deployment should be throttled.
154-
if (
155-
project.lastAutoDeploy != null &&
156-
latestFeedVersion.dateCreated.before(project.lastAutoDeploy) &&
157-
currentDeploymentFeedVersion.feedSource.hasJobsInProgress()
158-
) {
159-
// Another job exists that should result in the creation of a new feed version which should then
160-
// trigger an additional AutoDeploy job.
161-
LOG.warn(
162-
"Feed source {} contains an active job that should result in the creation of a new feed version. Auto deployment will be skipped until that version has fully processed.",
163-
currentDeploymentFeedVersion.feedSource.name
164-
);
165-
shouldWaitForNewFeedVersions = true;
166-
}
167-
168-
// Make sure the latest feed version has no critical errors.
169-
if (latestFeedVersion.hasCriticalErrors()) {
170-
latestVersionsWithCriticalErrors.add(latestFeedVersion);
171-
}
113+
// Analyze and update feed versions in deployment.
114+
Collection<String> updatedFeedVersionIds = new LinkedList<>();
115+
List<FeedVersion> latestVersionsWithCriticalErrors = new LinkedList<>();
116+
List<Deployment.SummarizedFeedVersion> previousFeedVersions = deployment.retrieveFeedVersions();
117+
boolean shouldWaitForNewFeedVersions = false;
118+
119+
// Production ready feed versions.
120+
List<Deployment.SummarizedFeedVersion> pinnedFeedVersions = deployment.retrievePinnedFeedVersions();
121+
Set<String> pinnedFeedSourceIds = new HashSet<>(
122+
pinnedFeedVersions
123+
.stream()
124+
.map(pinnedFeedVersion -> pinnedFeedVersion.feedSource.id)
125+
.collect(Collectors.toList())
126+
);
127+
128+
// Iterate through each feed version for deployment.
129+
for (
130+
Deployment.SummarizedFeedVersion currentDeploymentFeedVersion : previousFeedVersions
131+
) {
132+
// Retrieve the latest feed version associated with the feed source of the current
133+
// feed version set for the deployment.
134+
FeedVersion latestFeedVersion = currentDeploymentFeedVersion.feedSource.retrieveLatest();
135+
// Make sure the latest feed version is not going to supersede a pinned feed version.
136+
if (pinnedFeedSourceIds.contains(latestFeedVersion.feedSourceId)) {
137+
continue;
172138
}
173139

174-
// Skip auto-deployment for this project if Data Tools should wait for a job that should create a new
175-
// feed version to complete.
176-
if (shouldWaitForNewFeedVersions) {
177-
status.completeSuccessfully("Auto-Deployment will wait for new feed versions to be created from jobs in-progress");
178-
return;
179-
}
140+
// Update to the latest feed version.
141+
updatedFeedVersionIds.add(latestFeedVersion.id);
180142

181-
// Skip auto-deployment for this project if any of the feed versions contained critical errors.
182-
if (latestVersionsWithCriticalErrors.size() > 0) {
183-
StringBuilder errorMessageBuilder = new StringBuilder(
184-
String.format("Auto deployment for project %s has %s feed(s) with critical errors:",
185-
project.name,
186-
latestVersionsWithCriticalErrors.size())
143+
// Throttle this auto-deployment if needed. For projects that haven't yet been auto-deployed, don't
144+
// wait and go ahead with the auto-deployment. But if the project has been auto-deployed before and
145+
// if the latest feed version was created before the last time the project was auto-deployed and
146+
// there are currently-active jobs that could result in an updated feed version being created, then
147+
// this auto deployment should be throttled.
148+
if (
149+
project.lastAutoDeploy != null &&
150+
latestFeedVersion.dateCreated.before(project.lastAutoDeploy) &&
151+
currentDeploymentFeedVersion.feedSource.hasJobsInProgress()
152+
) {
153+
// Another job exists that should result in the creation of a new feed version which should then
154+
// trigger an additional AutoDeploy job.
155+
LOG.warn(
156+
"Feed source {} contains an active job that should result in the creation of a new feed version. Auto deployment will be skipped until that version has fully processed.",
157+
currentDeploymentFeedVersion.feedSource.name
187158
);
188-
for (FeedVersion version : latestVersionsWithCriticalErrors) {
189-
errorMessageBuilder.append(
190-
String.format(
191-
"%s (version %s), ",
192-
version.parentFeedSource().name,
193-
version.id
194-
)
195-
);
196-
}
197-
String message = errorMessageBuilder.toString();
198-
LOG.warn(message);
199-
if (!project.autoDeployWithCriticalErrors) {
200-
NotifyUsersForSubscriptionJob.createNotification(
201-
"deployment-updated",
202-
project.id,
203-
message
204-
);
205-
status.fail(message);
206-
return;
207-
}
159+
shouldWaitForNewFeedVersions = true;
208160
}
209161

210-
// Add all pinned feed versions to the list of feed versions to be deployed so that they aren't lost as part
211-
// of this update.
212-
for (Deployment.SummarizedFeedVersion pinnedFeedVersion : pinnedFeedVersions) {
213-
updatedFeedVersionIds.add(pinnedFeedVersion.id);
162+
// Make sure the latest feed version has no critical errors.
163+
if (latestFeedVersion.hasCriticalErrors()) {
164+
latestVersionsWithCriticalErrors.add(latestFeedVersion);
214165
}
166+
}
167+
168+
// Skip auto-deployment for this project if Data Tools should wait for a job that should create a new
169+
// feed version to complete.
170+
if (shouldWaitForNewFeedVersions) {
171+
status.completeSuccessfully("Auto-Deployment will wait for new feed versions to be created from jobs in-progress");
172+
return;
173+
}
215174

216-
// Check if the updated feed versions have any difference between the previous ones. If not, and if not
217-
// doing a regularly scheduled update with street data, then don't bother starting a deploy job.
218-
// TODO: add logic for street data update
219-
Set<String> previousFeedVersionIds = new HashSet<>(
220-
previousFeedVersions.stream().map(feedVersion -> feedVersion.id).collect(Collectors.toList())
175+
// Skip auto-deployment for this project if any of the feed versions contained critical errors.
176+
if (latestVersionsWithCriticalErrors.size() > 0) {
177+
StringBuilder errorMessageBuilder = new StringBuilder(
178+
String.format("Auto deployment for project %s has %s feed(s) with critical errors:",
179+
project.name,
180+
latestVersionsWithCriticalErrors.size())
221181
);
222-
if (
223-
!updatedFeedVersionIds.stream()
224-
.anyMatch(feedVersionId -> !previousFeedVersionIds.contains(feedVersionId))
225-
) {
226-
LOG.info("No updated feed versions to deploy for project {}.", project.name);
227-
status.completeSuccessfully("No updated feed versions to deploy.");
228-
return;
182+
for (FeedVersion version : latestVersionsWithCriticalErrors) {
183+
errorMessageBuilder.append(
184+
String.format(
185+
"%s (version %s), ",
186+
version.parentFeedSource().name,
187+
version.id
188+
)
189+
);
229190
}
230-
231-
// Queue up the deploy job.
232-
if (JobUtils.queueDeployJob(deployment, owner, server) != null) {
233-
LOG.info("Last auto deploy date updated for project {}.", project.name);
234-
// Update the deployment's feed version IDs with the latest (and pinned) feed versions.
235-
deployment.feedVersionIds = updatedFeedVersionIds;
236-
project.lastAutoDeploy = new Date();
237-
Persistence.deployments.replace(deployment.id, deployment);
238-
Persistence.projects.replace(project.id, project);
239-
status.completeSuccessfully("Auto deploy started new deploy job.");
240-
} else {
241-
String message = String.format(
242-
"Auto-deployment to %s should occur after active deployment for project %s completes.",
243-
server.name,
244-
project.name
191+
String message = errorMessageBuilder.toString();
192+
LOG.warn(message);
193+
if (!project.autoDeployWithCriticalErrors) {
194+
NotifyUsersForSubscriptionJob.createNotification(
195+
"deployment-updated",
196+
project.id,
197+
message
245198
);
246-
LOG.info(message);
247-
status.completeSuccessfully(message);
199+
status.fail(message);
200+
return;
248201
}
249-
} catch (Exception e) {
250-
status.fail(
251-
String.format("Could not auto-deploy project %s!", project.name),
252-
e
202+
}
203+
204+
// Add all pinned feed versions to the list of feed versions to be deployed so that they aren't lost as part
205+
// of this update.
206+
for (Deployment.SummarizedFeedVersion pinnedFeedVersion : pinnedFeedVersions) {
207+
updatedFeedVersionIds.add(pinnedFeedVersion.id);
208+
}
209+
210+
// Check if the updated feed versions have any difference between the previous ones. If not, and if not
211+
// doing a regularly scheduled update with street data, then don't bother starting a deploy job.
212+
// TODO: add logic for street data update
213+
Set<String> previousFeedVersionIds = new HashSet<>(
214+
previousFeedVersions.stream().map(feedVersion -> feedVersion.id).collect(Collectors.toList())
215+
);
216+
if (
217+
!updatedFeedVersionIds.stream()
218+
.anyMatch(feedVersionId -> !previousFeedVersionIds.contains(feedVersionId))
219+
) {
220+
LOG.info("No updated feed versions to deploy for project {}.", project.name);
221+
status.completeSuccessfully("No updated feed versions to deploy.");
222+
return;
223+
}
224+
225+
// Queue up the deploy job.
226+
if (JobUtils.queueDeployJob(deployment, owner, server) != null) {
227+
LOG.info("Last auto deploy date updated for project {}.", project.name);
228+
// Update the deployment's feed version IDs with the latest (and pinned) feed versions.
229+
deployment.feedVersionIds = updatedFeedVersionIds;
230+
project.lastAutoDeploy = new Date();
231+
Persistence.deployments.replace(deployment.id, deployment);
232+
Persistence.projects.replace(project.id, project);
233+
status.completeSuccessfully("Auto deploy started new deploy job.");
234+
} else {
235+
String message = String.format(
236+
"Auto-deployment to %s should occur after active deployment for project %s completes.",
237+
server.name,
238+
project.name
253239
);
254-
} finally {
255-
lockedProjects.remove(project.id);
256-
LOG.info("Auto deploy lock removed for project id: {}", project.id);
240+
LOG.info(message);
241+
status.completeSuccessfully(message);
257242
}
258243
}
259244
}

0 commit comments

Comments
 (0)