Skip to content

Commit

Permalink
Merge pull request #5264 from entur/limit_retry_attempts_in_siri_sx_u…
Browse files Browse the repository at this point in the history
…pdater

Limit retry attempts in SIRI-SX updater
  • Loading branch information
vpaturet authored Aug 15, 2023
2 parents cf2d3a6 + 7df26a6 commit 06d64b4
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 36 deletions.
101 changes: 65 additions & 36 deletions src/ext/java/org/opentripplanner/ext/siri/updater/SiriSXUpdater.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.opentripplanner.ext.siri.updater;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.UUID;
import org.opentripplanner.ext.siri.SiriAlertsUpdateHandler;
import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher;
import org.opentripplanner.framework.io.OtpHttpClientException;
import org.opentripplanner.framework.retry.OtpRetry;
import org.opentripplanner.framework.retry.OtpRetryBuilder;
import org.opentripplanner.routing.impl.TransitAlertServiceImpl;
import org.opentripplanner.routing.services.TransitAlertService;
import org.opentripplanner.transit.service.DefaultTransitService;
Expand All @@ -20,16 +23,23 @@
public class SiriSXUpdater extends PollingGraphUpdater implements TransitAlertProvider {

private static final Logger LOG = LoggerFactory.getLogger(SiriSXUpdater.class);
private static final long RETRY_INTERVAL_MILLIS = 5000;
private static final int RETRY_MAX_ATTEMPTS = 3;
private static final Duration RETRY_INITIAL_DELAY = Duration.ofSeconds(5);
private static final int RETRY_BACKOFF = 2;

private final String url;
private final String originalRequestorRef;
private final TransitAlertService transitAlertService;
private final SiriAlertsUpdateHandler updateHandler;
private WriteToGraphCallback saveResultOnGraph;
private ZonedDateTime lastTimestamp = ZonedDateTime.now().minusWeeks(1);
private String requestorRef;
/**
* Global retry counter used to create a new unique requestorRef after each retry.
*/
private int retryCount = 0;
private final SiriHttpLoader siriHttpLoader;
private final OtpRetry retry;

public SiriSXUpdater(SiriSXUpdaterParameters config, TransitModel transitModel) {
super(config);
Expand All @@ -55,6 +65,16 @@ public SiriSXUpdater(SiriSXUpdaterParameters config, TransitModel transitModel)
);
siriHttpLoader = new SiriHttpLoader(url, config.timeout(), config.requestHeaders());

retry =
new OtpRetryBuilder()
.withName("SIRI-SX Update")
.withMaxAttempts(RETRY_MAX_ATTEMPTS)
.withInitialRetryInterval(RETRY_INITIAL_DELAY)
.withBackoffMultiplier(RETRY_BACKOFF)
.withRetryableException(OtpHttpClientException.class::isInstance)
.withOnRetry(this::updateRequestorRef)
.build();

LOG.info(
"Creating real-time alert updater (SIRI SX) running every {} seconds : {}",
pollingPeriod(),
Expand All @@ -76,38 +96,30 @@ public String toString() {
}

@Override
protected void runPolling() throws InterruptedException {
try {
boolean moreData = false;
do {
Siri updates = getUpdates();
if (updates != null) {
ServiceDelivery serviceDelivery = updates.getServiceDelivery();
moreData = Boolean.TRUE.equals(serviceDelivery.isMoreData());
// Mark this updater as primed after last page of updates. Copy moreData into a final
// primitive, because the object moreData persists across iterations.
final boolean markPrimed = !moreData;
if (serviceDelivery.getSituationExchangeDeliveries() != null) {
saveResultOnGraph.execute((graph, transitModel) -> {
updateHandler.update(serviceDelivery);
if (markPrimed) primed = true;
});
}
}
} while (moreData);
} catch (OtpHttpClientException e) {
final long sleepTime = RETRY_INTERVAL_MILLIS + RETRY_INTERVAL_MILLIS * retryCount;

retryCount++;

LOG.info("Caught timeout - retry no. {} after {} millis", retryCount, sleepTime);

Thread.sleep(sleepTime);
protected void runPolling() {
retry.execute(this::updateSiri);
}

// Creating new requestorRef so all data is refreshed
requestorRef = originalRequestorRef + "-retry-" + retryCount;
runPolling();
}
private void updateSiri() {
boolean moreData = false;
do {
Siri updates = getUpdates();
if (updates != null) {
ServiceDelivery serviceDelivery = updates.getServiceDelivery();
moreData = Boolean.TRUE.equals(serviceDelivery.isMoreData());
// Mark this updater as primed after last page of updates. Copy moreData into a final
// primitive, because the object moreData persists across iterations.
final boolean markPrimed = !moreData;
if (serviceDelivery.getSituationExchangeDeliveries() != null) {
saveResultOnGraph.execute((graph, transitModel) -> {
updateHandler.update(serviceDelivery);
if (markPrimed) {
primed = true;
}
});
}
}
} while (moreData);
}

private Siri getUpdates() {
Expand All @@ -129,13 +141,30 @@ private Siri getUpdates() {
lastTimestamp = responseTimestamp;
return siri;
} catch (OtpHttpClientException e) {
LOG.info("Failed after {} ms", (System.currentTimeMillis() - t1));
LOG.error("Error reading SIRI feed from " + url, e);
LOG.info(
"Retryable exception while reading SIRI feed from {} after {} ms",
url,
(System.currentTimeMillis() - t1)
);
throw e;
} catch (Exception e) {
LOG.info("Failed after {} ms", (System.currentTimeMillis() - t1));
LOG.error("Error reading SIRI feed from " + url, e);
LOG.error(
"Non-retryable exception while reading SIRI feed from {} after {} ms",
url,
(System.currentTimeMillis() - t1)
);
}
return null;
}

/**
* Reset the session with the SIRI-SX server by creating a new unique requestorRef. This is
* required if a network error causes a request to fail and let the session in an undetermined
* state. Using a new requestorRef will force the SIRI-SX server to send again all available
* messages.
*/
private void updateRequestorRef() {
retryCount++;
requestorRef = originalRequestorRef + "-retry-" + retryCount;
}
}
72 changes: 72 additions & 0 deletions src/main/java/org/opentripplanner/framework/retry/OtpRetry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.opentripplanner.framework.retry;

import java.time.Duration;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Retry an operation with a configurable number of attempts.
*/
public class OtpRetry {

private static final Logger LOG = LoggerFactory.getLogger(OtpRetry.class);

private final String name;
private final int maxAttempts;
private final Duration initialRetryInterval;
private final int backoffMultiplier;
private final Runnable onRetry;
private final Predicate<Exception> retryableException;

OtpRetry(
String name,
int maxAttempts,
Duration initialRetryInterval,
int backoffMultiplier,
Predicate<Exception> retryableException,
Runnable onRetry
) {
this.name = name;
this.maxAttempts = maxAttempts;
this.initialRetryInterval = initialRetryInterval;
this.backoffMultiplier = backoffMultiplier;
this.retryableException = retryableException;
this.onRetry = onRetry;
}

public void execute(Runnable retryable) {
int attempts = 0;
long sleepTime = initialRetryInterval.toMillis();
while (true) {
try {
retryable.run();
return;
} catch (Exception e) {
if (!retryableException.test(e)) {
throw new OtpRetryException("Operation failed with non-retryable exception", e);
}
attempts++;
if (attempts > maxAttempts) {
throw new OtpRetryException("Operation failed after " + attempts + " attempts", e);
}
LOG.info(
"Operation {} failed with retryable exception: {}. Retrying {}/{} in {} millis",
name,
e.getMessage(),
attempts,
maxAttempts,
sleepTime
);
onRetry.run();
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new OtpRetryException(ex);
}
sleepTime = sleepTime * backoffMultiplier;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.opentripplanner.framework.retry;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.function.Predicate;

public class OtpRetryBuilder {

public static final int DEFAULT_MAX_ATTEMPTS = 3;
public static final Duration DEFAULT_INITIAL_RETRYABLE_INTERVAL = Duration.of(
1,
ChronoUnit.SECONDS
);
/**
* Retry all exceptions by default.
*/
public static final Predicate<Exception> DEFAULT_RETRYABLE_EXCEPTION = e -> true;
public static final Runnable DEFAULT_ON_RETRY = () -> {};
private String name;
private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
private Duration initialRetryInterval = DEFAULT_INITIAL_RETRYABLE_INTERVAL;
private int backoffMultiplier;
private Predicate<Exception> retryableException = DEFAULT_RETRYABLE_EXCEPTION;
private Runnable onRetry = DEFAULT_ON_RETRY;

/**
* Name used in log messages to identify the retried operation.
*/
public OtpRetryBuilder withName(String name) {
this.name = name;
return this;
}

/**
* Maximum number of additional attempts after the initial failure.
* With maxAttempts=0 no retry is performed after the initial failure.
*/
public OtpRetryBuilder withMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}

/**
* Initial delay before the first retry.
*/
public OtpRetryBuilder withInitialRetryInterval(Duration initialRetryInterval) {
this.initialRetryInterval = initialRetryInterval;
return this;
}

/**
* Backoff multiplier applied to the initial delay.
*/
public OtpRetryBuilder withBackoffMultiplier(int backoffMultiplier) {
this.backoffMultiplier = backoffMultiplier;
return this;
}

/**
* Predicate identifying the exceptions that should be retried.
* Other exceptions are re-thrown.
*/
public OtpRetryBuilder withRetryableException(Predicate<Exception> retryableException) {
this.retryableException = retryableException;
return this;
}

/**
* Callback invoked before executing each retry.
*/
public OtpRetryBuilder withOnRetry(Runnable onRetry) {
this.onRetry = onRetry;
return this;
}

public OtpRetry build() {
return new OtpRetry(
name,
maxAttempts,
initialRetryInterval,
backoffMultiplier,
retryableException,
onRetry
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opentripplanner.framework.retry;

public class OtpRetryException extends RuntimeException {

public OtpRetryException(String message, Throwable cause) {
super(message, cause);
}

public OtpRetryException(Throwable cause) {
super(cause);
}
}
Loading

0 comments on commit 06d64b4

Please sign in to comment.