Skip to content

Commit 094aa83

Browse files
committed
Delete index once it is expired
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent 4dd91a6 commit 094aa83

File tree

9 files changed

+326
-37
lines changed

9 files changed

+326
-37
lines changed

src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.io.IOException;
99
import java.net.URL;
1010
import java.security.InvalidParameterException;
11+
import java.time.Instant;
1112
import java.time.temporal.ChronoUnit;
1213
import java.util.List;
1314
import java.util.Locale;
@@ -26,6 +27,7 @@
2627
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
2728
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
2829
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
30+
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
2931
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
3032
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
3133
import org.opensearch.tasks.Task;
@@ -97,21 +99,16 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
9799
}, exception -> listener.onFailure(exception)));
98100
}
99101

100-
private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
102+
private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
101103
boolean isChanged = false;
102104
if (isEndpointChanged(request, datasource)) {
103105
datasource.setEndpoint(request.getEndpoint());
104106
isChanged = true;
105107
}
106-
107-
if (isUpdateIntervalChanged(request, datasource)) {
108-
datasource.setUserSchedule(
109-
new IntervalSchedule(
110-
datasource.getUserSchedule().getStartTime(),
111-
(int) request.getUpdateInterval().getDays(),
112-
ChronoUnit.DAYS
113-
)
114-
);
108+
if (isUpdateIntervalChanged(request)) {
109+
datasource.setUserSchedule(new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS));
110+
datasource.setSystemSchedule(datasource.getUserSchedule());
111+
datasource.setTask(DatasourceTask.ALL);
115112
isChanged = true;
116113
}
117114

@@ -138,6 +135,21 @@ private void updateIfChanged(final UpdateDatasourceRequest request, final Dataso
138135
private void validate(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
139136
validateFieldsCompatibility(request, datasource);
140137
validateUpdateIntervalIsLessThanValidForInDays(request, datasource);
138+
validateNextUpdateScheduleIsBeforeExpirationDay(request, datasource);
139+
}
140+
141+
private void validateNextUpdateScheduleIsBeforeExpirationDay(final UpdateDatasourceRequest request, final Datasource datasource) {
142+
if (request.getUpdateInterval() == null) {
143+
return;
144+
}
145+
146+
IntervalSchedule newSchedule = new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS);
147+
148+
if (newSchedule.getNextExecutionTime(Instant.now()).isAfter(datasource.expirationDay())) {
149+
throw new IllegalArgumentException(
150+
String.format(Locale.ROOT, "datasource will expire at %s with the update interval", datasource.expirationDay().toString())
151+
);
152+
}
141153
}
142154

143155
private void validateFieldsCompatibility(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
@@ -157,15 +169,15 @@ private void validateFieldsCompatibility(final UpdateDatasourceRequest request,
157169

158170
private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasourceRequest request, final Datasource datasource)
159171
throws IOException {
160-
if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request, datasource) == false) {
172+
if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request) == false) {
161173
return;
162174
}
163175

164176
long validForInDays = isEndpointChanged(request, datasource)
165177
? DatasourceManifest.Builder.build(new URL(request.getEndpoint())).getValidForInDays()
166178
: datasource.getDatabase().getValidForInDays();
167179

168-
long updateInterval = isUpdateIntervalChanged(request, datasource)
180+
long updateInterval = isUpdateIntervalChanged(request)
169181
? request.getUpdateInterval().days()
170182
: datasource.getUserSchedule().getInterval();
171183

@@ -180,8 +192,14 @@ private boolean isEndpointChanged(final UpdateDatasourceRequest request, final D
180192
return request.getEndpoint() != null && request.getEndpoint().equals(datasource.getEndpoint()) == false;
181193
}
182194

183-
private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
184-
return request.getUpdateInterval() != null
185-
&& (int) request.getUpdateInterval().days() != datasource.getUserSchedule().getInterval();
195+
/**
196+
* Update interval is changed as long as user provide one because
197+
* start time will get updated even if the update interval is same as current one.
198+
*
199+
* @param request the update datasource request
200+
* @return true if update interval is changed, and false otherwise
201+
*/
202+
private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request) {
203+
return request.getUpdateInterval() != null;
186204
}
187205
}

src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,38 @@ private String indexNameFor(final long suffix) {
374374
/**
375375
* Checks if datasource is expired or not
376376
*
377-
* @return true if datasource is expired false otherwise
377+
* @return true if datasource is expired, and false otherwise
378378
*/
379379
public boolean isExpired() {
380+
return willExpire(Instant.now());
381+
}
382+
383+
/**
384+
* Checks if datasource will expire at given time
385+
*
386+
* @return true if datasource will expired at given time, and false otherwise
387+
*/
388+
public boolean willExpire(Instant instant) {
380389
if (database.validForInDays == null) {
381390
return false;
382391
}
383392

393+
return instant.isAfter(expirationDay());
394+
}
395+
396+
/**
397+
* Day when datasource will expire
398+
*
399+
* @return Day when datasource will expire
400+
*/
401+
public Instant expirationDay() {
402+
if (database.validForInDays == null) {
403+
return Instant.MAX;
404+
}
405+
return lastCheckedAt().plus(database.validForInDays, ChronoUnit.DAYS);
406+
}
407+
408+
private Instant lastCheckedAt() {
384409
Instant lastCheckedAt;
385410
if (updateStats.lastSkippedAt == null) {
386411
lastCheckedAt = updateStats.lastSucceededAt;
@@ -389,7 +414,7 @@ public boolean isExpired() {
389414
? updateStats.lastSkippedAt
390415
: updateStats.lastSucceededAt;
391416
}
392-
return Instant.now().isAfter(lastCheckedAt.plus(database.validForInDays, ChronoUnit.DAYS));
417+
return lastCheckedAt;
393418
}
394419

395420
/**

src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.io.IOException;
99
import java.time.Instant;
10+
import java.time.temporal.ChronoUnit;
1011
import java.util.concurrent.atomic.AtomicReference;
1112

1213
import lombok.extern.log4j.Log4j2;
@@ -21,6 +22,7 @@
2122
import org.opensearch.jobscheduler.spi.JobExecutionContext;
2223
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
2324
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
25+
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
2426

2527
/**
2628
* Datasource update task
@@ -29,6 +31,8 @@
2931
*/
3032
@Log4j2
3133
public class DatasourceRunner implements ScheduledJobRunner {
34+
private static final int DELETE_INDEX_RETRY_IN_MIN = 15;
35+
private static final int DELETE_INDEX_DELAY_IN_MILLIS = 10000;
3236

3337
private static DatasourceRunner INSTANCE;
3438

@@ -141,12 +145,37 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter, final
141145

142146
try {
143147
datasourceUpdateService.deleteUnusedIndices(datasource);
144-
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
148+
if (DatasourceTask.DELETE_UNUSED_INDICES.equals(datasource.getTask()) == false) {
149+
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
150+
}
145151
datasourceUpdateService.deleteUnusedIndices(datasource);
146152
} catch (Exception e) {
147153
log.error("Failed to update datasource for {}", datasource.getName(), e);
148154
datasource.getUpdateStats().setLastFailedAt(Instant.now());
149155
datasourceFacade.updateDatasource(datasource);
156+
} finally {
157+
postProcessing(datasource);
158+
}
159+
}
160+
161+
private void postProcessing(final Datasource datasource) {
162+
if (datasource.isExpired()) {
163+
// Try to delete again as it could have just been expired
164+
datasourceUpdateService.deleteUnusedIndices(datasource);
165+
datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL);
166+
return;
167+
}
168+
169+
if (datasource.willExpire(datasource.getUserSchedule().getNextExecutionTime(Instant.now()))) {
170+
IntervalSchedule intervalSchedule = new IntervalSchedule(
171+
datasource.expirationDay(),
172+
DELETE_INDEX_RETRY_IN_MIN,
173+
ChronoUnit.MINUTES,
174+
DELETE_INDEX_DELAY_IN_MILLIS
175+
);
176+
datasourceUpdateService.updateDatasource(datasource, intervalSchedule, DatasourceTask.DELETE_UNUSED_INDICES);
177+
} else {
178+
datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL);
150179
}
151180
}
152181
}

src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
2727
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
2828
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
29+
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
2930

3031
@Log4j2
3132
public class DatasourceUpdateService {
@@ -117,23 +118,46 @@ public List<String> getHeaderFields(String manifestUrl) throws IOException {
117118
/**
118119
* Delete all indices except the one which are being used
119120
*
120-
* @param parameter
121+
* @param datasource
121122
*/
122-
public void deleteUnusedIndices(final Datasource parameter) {
123+
public void deleteUnusedIndices(final Datasource datasource) {
123124
try {
124-
List<String> indicesToDelete = parameter.getIndices()
125+
List<String> indicesToDelete = datasource.getIndices()
125126
.stream()
126-
.filter(index -> index.equals(parameter.currentIndexName()) == false)
127+
.filter(index -> index.equals(datasource.currentIndexName()) == false)
127128
.collect(Collectors.toList());
128129

129130
List<String> deletedIndices = deleteIndices(indicesToDelete);
130131

131132
if (deletedIndices.isEmpty() == false) {
132-
parameter.getIndices().removeAll(deletedIndices);
133-
datasourceFacade.updateDatasource(parameter);
133+
datasource.getIndices().removeAll(deletedIndices);
134+
datasourceFacade.updateDatasource(datasource);
134135
}
135136
} catch (Exception e) {
136-
log.error("Failed to delete old indices for {}", parameter.getName(), e);
137+
log.error("Failed to delete old indices for {}", datasource.getName(), e);
138+
}
139+
}
140+
141+
/**
142+
* Update datasource with given systemSchedule and task
143+
*
144+
* @param datasource datasource to update
145+
* @param systemSchedule new system schedule value
146+
* @param task new task value
147+
*/
148+
public void updateDatasource(final Datasource datasource, final IntervalSchedule systemSchedule, final DatasourceTask task) {
149+
boolean updated = false;
150+
if (datasource.getSystemSchedule().equals(systemSchedule) == false) {
151+
datasource.setSystemSchedule(systemSchedule);
152+
updated = true;
153+
}
154+
if (datasource.getTask().equals(task) == false) {
155+
datasource.setTask(task);
156+
updated = true;
157+
}
158+
159+
if (updated) {
160+
datasourceFacade.updateDatasource(datasource);
137161
}
138162
}
139163

src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,25 @@ protected long randomPositiveLong() {
186186
return value < 0 ? -value : value;
187187
}
188188

189-
protected Datasource randomDatasource() {
190-
int validForInDays = Randomness.get().nextInt(30);
189+
/**
190+
* Update interval should be > 0 and < validForInDays.
191+
* For an update test to work, there should be at least one eligible value other than current update interval.
192+
* Therefore, the smallest value for validForInDays is 2.
193+
* Update interval is random value from 1 to validForInDays - 2.
194+
* The new update value will be validForInDays - 1.
195+
*/
196+
protected Datasource randomDatasource(final Instant updateStartTime) {
197+
int validForInDays = 3 + Randomness.get().nextInt(30);
191198
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
192199
Datasource datasource = new Datasource();
193200
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
194-
datasource.setUserSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
201+
datasource.setUserSchedule(
202+
new IntervalSchedule(
203+
updateStartTime.truncatedTo(ChronoUnit.MILLIS),
204+
1 + Randomness.get().nextInt(validForInDays - 2),
205+
ChronoUnit.DAYS
206+
)
207+
);
195208
datasource.setSystemSchedule(datasource.getUserSchedule());
196209
datasource.setTask(randomTask());
197210
datasource.setState(randomState());
@@ -216,6 +229,10 @@ protected Datasource randomDatasource() {
216229
return datasource;
217230
}
218231

232+
protected Datasource randomDatasource() {
233+
return randomDatasource(Instant.now());
234+
}
235+
219236
protected LockModel randomLockModel() {
220237
LockModel lockModel = new LockModel(
221238
GeospatialTestHelper.randomLowerCaseString(),

0 commit comments

Comments
 (0)