Skip to content

Commit a6a17d8

Browse files
fix(FeedUpdater): Prevent incorrect published status update on server startup.
1 parent 3f80a65 commit a6a17d8

File tree

2 files changed

+101
-27
lines changed

2 files changed

+101
-27
lines changed

src/main/java/com/conveyal/datatools/manager/jobs/FeedUpdater.java

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public Map<String, String> checkForUpdatedFeeds() {
158158
String keyName = objSummary.getKey();
159159
LOG.debug("{} etag = {}", keyName, eTag);
160160

161+
// Don't add object if it is a dir
161162
if (keyName.equals(bucketFolder)) continue;
162163
String filename = keyName.split("/")[1];
163164
String feedId = filename.replace(".zip", "");
@@ -166,16 +167,20 @@ public Map<String, String> checkForUpdatedFeeds() {
166167
LOG.error("No feed source found for feed ID {}", feedId);
167168
continue;
168169
}
170+
// Skip object if the filename is null
171+
if ("null".equals(feedId)) continue;
169172

170-
if (shouldMarkFeedAsProcessed(eTag, feedSource)) {
171-
// Don't add object if it is a dir
172-
// Skip object if the filename is null
173-
if ("null".equals(feedId)) continue;
173+
FeedVersion latestVersionSentForPublishing = getLatestVersionSentForPublishing(feedId, feedSource);
174+
if (shouldMarkFeedAsProcessed(eTag, latestVersionSentForPublishing)) {
174175
try {
175-
LOG.info("New version found for {} at s3://{}/{}. ETag = {}.", feedId, feedBucket, keyName, eTag);
176-
updatePublishedFeedVersion(feedId, feedSource);
177-
// TODO: Explore if MD5 checksum can be used to find matching feed version.
178-
// findMatchingFeedVersion(md5, feedId, feedSource);
176+
// Don't mark a feed version as published if previous published version is before sentToExternalPublisher.
177+
if (!objSummary.getLastModified().before(latestVersionSentForPublishing.sentToExternalPublisher)) {
178+
LOG.info("New version found for {} at s3://{}/{}. ETag = {}.", feedId, feedBucket, keyName, eTag);
179+
updatePublishedFeedVersion(feedId, latestVersionSentForPublishing);
180+
// TODO: Explore if MD5 checksum can be used to find matching feed version.
181+
// findMatchingFeedVersion(md5, feedId, feedSource);
182+
}
183+
179184
} catch (Exception e) {
180185
LOG.warn("Could not load feed " + keyName, e);
181186
} finally {
@@ -200,7 +205,7 @@ private FeedSource getFeedSource(String feedId) {
200205
and(eq("value", feedId), eq("name", AGENCY_ID_FIELDNAME))
201206
);
202207
if (properties.size() > 1) {
203-
LOG.warn("Found multiple feed sources for {}: {}",
208+
LOG.warn("Found multiple feed sources for {}: {}. The published status on some feed versions will be incorrect.",
204209
feedId,
205210
properties.stream().map(p -> p.feedSourceId).collect(Collectors.joining(",")));
206211
}
@@ -216,22 +221,20 @@ private FeedSource getFeedSource(String feedId) {
216221
/**
217222
* @return true if the feed with the corresponding etag should be mark as processed, false otherwise.
218223
*/
219-
private boolean shouldMarkFeedAsProcessed(String eTag, FeedSource feedSource) {
224+
private boolean shouldMarkFeedAsProcessed(String eTag, FeedVersion publishedVersion) {
220225
if (eTagForFeed.containsValue(eTag)) return false;
221-
222-
FeedVersion publishedVersion = getLatestPublishedVersion(feedSource);
223226
if (publishedVersion == null) return false;
227+
224228
return versionsToMarkAsProcessed.contains(publishedVersion.id);
225229
}
226230

227231
/**
228232
* Update the published feed version for the feed source.
229233
* @param feedId the unique ID used by MTC to identify a feed source
230-
* @param feedSource the feed source for which a newly published version should be registered
234+
* @param publishedVersion the feed version to be registered
231235
*/
232-
private void updatePublishedFeedVersion(String feedId, FeedSource feedSource) {
236+
private void updatePublishedFeedVersion(String feedId, FeedVersion publishedVersion) {
233237
try {
234-
FeedVersion publishedVersion = getLatestPublishedVersion(feedSource);
235238
if (publishedVersion != null) {
236239
if (publishedVersion.sentToExternalPublisher == null) {
237240
LOG.warn("Not updating published version for {} (version was never sent to external publisher)", feedId);
@@ -240,13 +243,18 @@ private void updatePublishedFeedVersion(String feedId, FeedSource feedSource) {
240243
// Set published namespace to the feed version and set the processedByExternalPublisher timestamp.
241244
LOG.info("Latest published version (sent at {}) for {} is {}", publishedVersion.sentToExternalPublisher, feedId, publishedVersion.id);
242245
Persistence.feedVersions.updateField(publishedVersion.id, PROCESSED_BY_EXTERNAL_PUBLISHER_FIELD, new Date());
243-
Persistence.feedSources.updateField(feedSource.id, "publishedVersionId", publishedVersion.namespace);
246+
Persistence.feedSources.updateField(publishedVersion.feedSourceId, "publishedVersionId", publishedVersion.namespace);
244247
} else {
245-
LOG.error("No published versions found for {} ({} id={})", feedId, feedSource.name, feedSource.id);
248+
LOG.error(
249+
"No published versions found for {} ({} id={})",
250+
feedId,
251+
publishedVersion.parentFeedSource().name,
252+
publishedVersion.feedSourceId
253+
);
246254
}
247255
} catch (Exception e) {
248256
e.printStackTrace();
249-
LOG.error("Error encountered while checking for latest published version for {}", feedId);
257+
LOG.error("Error encountered while updating the latest published version for {}", feedId);
250258
}
251259
}
252260

@@ -256,13 +264,19 @@ private void updatePublishedFeedVersion(String feedId, FeedSource feedSource) {
256264
* could be that more than one versions were recently "published" and the latest published version was a bad
257265
* feed that failed processing by RTD.
258266
*/
259-
private static FeedVersion getLatestPublishedVersion(FeedSource feedSource) {
260-
// Collect the feed versions for the feed source.
261-
Collection<FeedVersion> versions = feedSource.retrieveFeedVersions();
262-
Optional<FeedVersion> lastPublishedVersionCandidate = versions
263-
.stream()
264-
.min(Comparator.comparing(v -> v.sentToExternalPublisher, Comparator.nullsLast(Comparator.reverseOrder())));
265-
return lastPublishedVersionCandidate.orElse(null);
267+
private static FeedVersion getLatestVersionSentForPublishing(String feedId, FeedSource feedSource) {
268+
try {
269+
// Collect the feed versions for the feed source.
270+
Collection<FeedVersion> versions = feedSource.retrieveFeedVersions();
271+
Optional<FeedVersion> lastPublishedVersionCandidate = versions
272+
.stream()
273+
.min(Comparator.comparing(v -> v.sentToExternalPublisher, Comparator.nullsLast(Comparator.reverseOrder())));
274+
return lastPublishedVersionCandidate.orElse(null);
275+
} catch (Exception e) {
276+
e.printStackTrace();
277+
LOG.error("Error encountered while checking for latest published version for {}", feedId);
278+
return null;
279+
}
266280
}
267281

268282
/**

src/test/java/com/conveyal/datatools/manager/jobs/AutoPublishJobTest.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.google.common.collect.Lists;
1313
import org.junit.jupiter.api.AfterAll;
1414
import org.junit.jupiter.api.BeforeAll;
15+
import org.junit.jupiter.api.Test;
1516
import org.junit.jupiter.params.ParameterizedTest;
1617
import org.junit.jupiter.params.provider.Arguments;
1718
import org.junit.jupiter.params.provider.MethodSource;
@@ -163,7 +164,7 @@ void shouldUpdateFeedInfoAfterPublishComplete(String agencyId, boolean isUnknown
163164
assertTrue(etags.isEmpty());
164165

165166
// Simulate completion of feed publishing.
166-
completedFeedRetriever.isPublishingComplete = true;
167+
completedFeedRetriever.makePublished();
167168

168169
// The etags should contain the id of the agency.
169170
// If a feed has been republished since last check, it will have a new etag/file hash,
@@ -209,13 +210,62 @@ private static Stream<Arguments> createUpdateFeedInfoCases() {
209210
);
210211
}
211212

213+
/**
214+
* This test ensures that, upon server startup,
215+
* feeds that meet all these criteria should not be updated/marked as published:
216+
* - the feed has been sent to publisher (RTD),
217+
* - the publisher has not published the feed,
218+
* - a previous version of the feed was already published.
219+
*/
220+
@Test
221+
void shouldNotUpdateFromAPreviouslyPublishedVersionOnStartup() {
222+
final int TWO_DAYS_MILLIS = 48 * 3600000;
223+
224+
// Set up a test FeedUpdater instance that fakes an external published date in the past.
225+
TestCompletedFeedRetriever completedFeedRetriever = new TestCompletedFeedRetriever(TEST_AGENCY);
226+
FeedUpdater feedUpdater = FeedUpdater.createForTest(completedFeedRetriever);
227+
completedFeedRetriever.makePublished(new Date(System.currentTimeMillis() - TWO_DAYS_MILLIS));
228+
229+
// Add the version to the feed source, with
230+
// sentToExternalPublisher set to a date after a previous publish date.
231+
FeedVersion createdVersion = createFeedVersionFromGtfsZip(feedSource, "bart_new_lite.zip");
232+
createdVersion.sentToExternalPublisher = new Date();
233+
Persistence.feedVersions.replace(createdVersion.id, createdVersion);
234+
235+
// The list of feeds processed externally (completed) should contain an entry for the agency we want.
236+
Map<String, String> etags = feedUpdater.checkForUpdatedFeeds();
237+
assertNotNull(etags.get(TEST_AGENCY));
238+
239+
// Make sure that the feed remains unpublished.
240+
FeedVersion updatedFeedVersion = Persistence.feedVersions.getById(createdVersion.id);
241+
assertNull(updatedFeedVersion.processedByExternalPublisher);
242+
243+
// Now perform publishing.
244+
AutoPublishJob autoPublishJob = new AutoPublishJob(feedSource, user);
245+
autoPublishJob.run();
246+
assertFalse(autoPublishJob.status.error);
247+
248+
// Simulate another publishing process
249+
completedFeedRetriever.makePublished(new Date());
250+
251+
// The list of feeds processed externally (completed) should contain an entry for the agency we want.
252+
Map<String, String> etagsAfter = feedUpdater.checkForUpdatedFeeds();
253+
assertNotNull(etagsAfter.get(TEST_AGENCY));
254+
255+
// The feed should be published.
256+
FeedVersion publishedFeedVersion = Persistence.feedVersions.getById(createdVersion.id);
257+
assertNotNull(publishedFeedVersion.processedByExternalPublisher);
258+
259+
}
260+
212261
/**
213262
* Mocks the results of an {@link S3ObjectSummary} retrieval before/after the
214263
* external MTC publishing process is complete.
215264
*/
216265
private static class TestCompletedFeedRetriever implements FeedUpdater.CompletedFeedRetriever {
217266
private final String agencyId;
218-
public boolean isPublishingComplete;
267+
private boolean isPublishingComplete;
268+
private Date publishDate;
219269

220270
public TestCompletedFeedRetriever(String agencyId) {
221271
this.agencyId = agencyId;
@@ -229,8 +279,18 @@ public List<S3ObjectSummary> retrieveCompletedFeeds() {
229279
S3ObjectSummary objSummary = new S3ObjectSummary();
230280
objSummary.setETag("test-etag");
231281
objSummary.setKey(String.format("%s/%s", TEST_COMPLETED_FOLDER, agencyId));
282+
objSummary.setLastModified(publishDate);
232283
return Lists.newArrayList(objSummary);
233284
}
234285
}
286+
287+
public void makePublished() {
288+
makePublished(new Date());
289+
}
290+
291+
public void makePublished(Date publishDate) {
292+
isPublishingComplete = true;
293+
this.publishDate = publishDate;
294+
}
235295
}
236296
}

0 commit comments

Comments
 (0)