Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.net.URL;
import java.security.InvalidParameterException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -97,21 +99,16 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
}, exception -> listener.onFailure(exception)));
}

private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
boolean isChanged = false;
if (isEndpointChanged(request, datasource)) {
datasource.setEndpoint(request.getEndpoint());
isChanged = true;
}

if (isUpdateIntervalChanged(request, datasource)) {
datasource.setUserSchedule(
new IntervalSchedule(
datasource.getUserSchedule().getStartTime(),
(int) request.getUpdateInterval().getDays(),
ChronoUnit.DAYS
)
);
if (isUpdateIntervalChanged(request)) {
datasource.setUserSchedule(new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS));
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(DatasourceTask.ALL);
isChanged = true;
}

Expand All @@ -138,6 +135,21 @@ private void updateIfChanged(final UpdateDatasourceRequest request, final Dataso
private void validate(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
validateFieldsCompatibility(request, datasource);
validateUpdateIntervalIsLessThanValidForInDays(request, datasource);
validateNextUpdateScheduleIsBeforeExpirationDay(request, datasource);
}

private void validateNextUpdateScheduleIsBeforeExpirationDay(final UpdateDatasourceRequest request, final Datasource datasource) {
if (request.getUpdateInterval() == null) {
return;
}

IntervalSchedule newSchedule = new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS);

if (newSchedule.getNextExecutionTime(Instant.now()).isAfter(datasource.expirationDay())) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "datasource will expire at %s with the update interval", datasource.expirationDay().toString())
);
}
}

private void validateFieldsCompatibility(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
Expand All @@ -157,15 +169,15 @@ private void validateFieldsCompatibility(final UpdateDatasourceRequest request,

private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasourceRequest request, final Datasource datasource)
throws IOException {
if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request, datasource) == false) {
if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request) == false) {
return;
}

long validForInDays = isEndpointChanged(request, datasource)
? DatasourceManifest.Builder.build(new URL(request.getEndpoint())).getValidForInDays()
: datasource.getDatabase().getValidForInDays();

long updateInterval = isUpdateIntervalChanged(request, datasource)
long updateInterval = isUpdateIntervalChanged(request)
? request.getUpdateInterval().days()
: datasource.getUserSchedule().getInterval();

Expand All @@ -180,8 +192,14 @@ private boolean isEndpointChanged(final UpdateDatasourceRequest request, final D
return request.getEndpoint() != null && request.getEndpoint().equals(datasource.getEndpoint()) == false;
}

private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
return request.getUpdateInterval() != null
&& (int) request.getUpdateInterval().days() != datasource.getUserSchedule().getInterval();
/**
* Update interval is changed as long as user provide one because
* start time will get updated even if the update interval is same as current one.
*
* @param request the update datasource request
* @return true if update interval is changed, and false otherwise
*/
private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request) {
return request.getUpdateInterval() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,38 @@ private String indexNameFor(final long suffix) {
/**
* Checks if datasource is expired or not
*
* @return true if datasource is expired false otherwise
* @return true if datasource is expired, and false otherwise
*/
public boolean isExpired() {
return willExpire(Instant.now());
}

/**
* Checks if datasource will expire at given time
*
* @return true if datasource will expired at given time, and false otherwise
*/
public boolean willExpire(Instant instant) {
if (database.validForInDays == null) {
return false;
}

return instant.isAfter(expirationDay());
}

/**
* Day when datasource will expire
*
* @return Day when datasource will expire
*/
public Instant expirationDay() {
if (database.validForInDays == null) {
return Instant.MAX;
}
return lastCheckedAt().plus(database.validForInDays, ChronoUnit.DAYS);
}

private Instant lastCheckedAt() {
Instant lastCheckedAt;
if (updateStats.lastSkippedAt == null) {
lastCheckedAt = updateStats.lastSucceededAt;
Expand All @@ -389,7 +414,7 @@ public boolean isExpired() {
? updateStats.lastSkippedAt
: updateStats.lastSucceededAt;
}
return Instant.now().isAfter(lastCheckedAt.plus(database.validForInDays, ChronoUnit.DAYS));
return lastCheckedAt;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.log4j.Log4j2;
Expand All @@ -21,6 +22,7 @@
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

/**
* Datasource update task
Expand All @@ -29,6 +31,8 @@
*/
@Log4j2
public class DatasourceRunner implements ScheduledJobRunner {
private static final int DELETE_INDEX_RETRY_IN_MIN = 15;
private static final int DELETE_INDEX_DELAY_IN_MILLIS = 10000;

private static DatasourceRunner INSTANCE;

Expand Down Expand Up @@ -141,12 +145,37 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter, final

try {
datasourceUpdateService.deleteUnusedIndices(datasource);
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
if (DatasourceTask.DELETE_UNUSED_INDICES.equals(datasource.getTask()) == false) {
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
}
datasourceUpdateService.deleteUnusedIndices(datasource);
} catch (Exception e) {
log.error("Failed to update datasource for {}", datasource.getName(), e);
datasource.getUpdateStats().setLastFailedAt(Instant.now());
datasourceFacade.updateDatasource(datasource);
} finally {
postProcessing(datasource);
}
}

private void postProcessing(final Datasource datasource) {
if (datasource.isExpired()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this a redundant check? datasource.willExpire that is a next call should handle this case, isn't it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If datasource is expired already, we don't want to schedule another task to delete expired datasource. Without this check, next call will trigger another task after 10 seconds and it will repeat again and again.

// Try to delete again as it could have just been expired
datasourceUpdateService.deleteUnusedIndices(datasource);
datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL);
return;
}

if (datasource.willExpire(datasource.getUserSchedule().getNextExecutionTime(Instant.now()))) {
IntervalSchedule intervalSchedule = new IntervalSchedule(
datasource.expirationDay(),
DELETE_INDEX_RETRY_IN_MIN,
ChronoUnit.MINUTES,
DELETE_INDEX_DELAY_IN_MILLIS
);
datasourceUpdateService.updateDatasource(datasource, intervalSchedule, DatasourceTask.DELETE_UNUSED_INDICES);
} else {
datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

@Log4j2
public class DatasourceUpdateService {
Expand Down Expand Up @@ -117,23 +118,46 @@ public List<String> getHeaderFields(String manifestUrl) throws IOException {
/**
* Delete all indices except the one which are being used
*
* @param parameter
* @param datasource
*/
public void deleteUnusedIndices(final Datasource parameter) {
public void deleteUnusedIndices(final Datasource datasource) {
try {
List<String> indicesToDelete = parameter.getIndices()
List<String> indicesToDelete = datasource.getIndices()
.stream()
.filter(index -> index.equals(parameter.currentIndexName()) == false)
.filter(index -> index.equals(datasource.currentIndexName()) == false)
.collect(Collectors.toList());

List<String> deletedIndices = deleteIndices(indicesToDelete);

if (deletedIndices.isEmpty() == false) {
parameter.getIndices().removeAll(deletedIndices);
datasourceFacade.updateDatasource(parameter);
datasource.getIndices().removeAll(deletedIndices);
datasourceFacade.updateDatasource(datasource);
}
} catch (Exception e) {
log.error("Failed to delete old indices for {}", parameter.getName(), e);
log.error("Failed to delete old indices for {}", datasource.getName(), e);
}
}

/**
* Update datasource with given systemSchedule and task
*
* @param datasource datasource to update
* @param systemSchedule new system schedule value
* @param task new task value
*/
public void updateDatasource(final Datasource datasource, final IntervalSchedule systemSchedule, final DatasourceTask task) {
boolean updated = false;
if (datasource.getSystemSchedule().equals(systemSchedule) == false) {
datasource.setSystemSchedule(systemSchedule);
updated = true;
}
if (datasource.getTask().equals(task) == false) {
datasource.setTask(task);
updated = true;
}

if (updated) {
datasourceFacade.updateDatasource(datasource);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,25 @@ protected long randomPositiveLong() {
return value < 0 ? -value : value;
}

protected Datasource randomDatasource() {
int validForInDays = Randomness.get().nextInt(30);
/**
* Update interval should be > 0 and < validForInDays.
* For an update test to work, there should be at least one eligible value other than current update interval.
* Therefore, the smallest value for validForInDays is 2.
* Update interval is random value from 1 to validForInDays - 2.
* The new update value will be validForInDays - 1.
*/
protected Datasource randomDatasource(final Instant updateStartTime) {
int validForInDays = 3 + Randomness.get().nextInt(30);
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setUserSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setUserSchedule(
new IntervalSchedule(
updateStartTime.truncatedTo(ChronoUnit.MILLIS),
1 + Randomness.get().nextInt(validForInDays - 2),
ChronoUnit.DAYS
)
);
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
Expand All @@ -216,6 +229,10 @@ protected Datasource randomDatasource() {
return datasource;
}

protected Datasource randomDatasource() {
return randomDatasource(Instant.now());
}

protected LockModel randomLockModel() {
LockModel lockModel = new LockModel(
GeospatialTestHelper.randomLowerCaseString(),
Expand Down
Loading