Skip to content

Commit c535196

Browse files
committed
Reduce lock duration and renew the lock during update (opensearch-project#299)
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent 8bcc523 commit c535196

15 files changed

+401
-88
lines changed

src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55

66
package org.opensearch.geospatial.ip2geo.action;
77

8-
import java.io.IOException;
8+
import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;
9+
910
import java.time.Instant;
11+
import java.util.concurrent.atomic.AtomicReference;
1012

1113
import lombok.extern.log4j.Log4j2;
1214

15+
import org.opensearch.OpenSearchException;
1316
import org.opensearch.ResourceAlreadyExistsException;
1417
import org.opensearch.action.ActionListener;
1518
import org.opensearch.action.StepListener;
@@ -21,9 +24,11 @@
2124
import org.opensearch.geospatial.annotation.VisibleForTesting;
2225
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
2326
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
27+
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
2428
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
2529
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
2630
import org.opensearch.index.engine.VersionConflictEngineException;
31+
import org.opensearch.jobscheduler.spi.LockModel;
2732
import org.opensearch.tasks.Task;
2833
import org.opensearch.threadpool.ThreadPool;
2934
import org.opensearch.transport.TransportService;
@@ -36,6 +41,7 @@ public class PutDatasourceTransportAction extends HandledTransportAction<PutData
3641
private final ThreadPool threadPool;
3742
private final DatasourceFacade datasourceFacade;
3843
private final DatasourceUpdateService datasourceUpdateService;
44+
private final Ip2GeoLockService lockService;
3945

4046
/**
4147
* Default constructor
@@ -44,50 +50,73 @@ public class PutDatasourceTransportAction extends HandledTransportAction<PutData
4450
* @param threadPool the thread pool
4551
* @param datasourceFacade the datasource facade
4652
* @param datasourceUpdateService the datasource update service
53+
* @param lockService the lock service
4754
*/
4855
@Inject
4956
public PutDatasourceTransportAction(
5057
final TransportService transportService,
5158
final ActionFilters actionFilters,
5259
final ThreadPool threadPool,
5360
final DatasourceFacade datasourceFacade,
54-
final DatasourceUpdateService datasourceUpdateService
61+
final DatasourceUpdateService datasourceUpdateService,
62+
final Ip2GeoLockService lockService
5563
) {
5664
super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new);
5765
this.threadPool = threadPool;
5866
this.datasourceFacade = datasourceFacade;
5967
this.datasourceUpdateService = datasourceUpdateService;
68+
this.lockService = lockService;
6069
}
6170

6271
@Override
6372
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
64-
try {
65-
StepListener<Void> createIndexStep = new StepListener<>();
66-
datasourceFacade.createIndexIfNotExists(createIndexStep);
67-
createIndexStep.whenComplete(v -> putDatasource(request, listener), exception -> listener.onFailure(exception));
68-
} catch (Exception e) {
69-
listener.onFailure(e);
70-
}
73+
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
74+
if (lock == null) {
75+
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
76+
return;
77+
}
78+
try {
79+
internalDoExecute(request, lock, listener);
80+
} catch (Exception e) {
81+
listener.onFailure(e);
82+
} finally {
83+
lockService.releaseLock(
84+
lock,
85+
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
86+
);
87+
}
88+
}, exception -> { listener.onFailure(exception); }));
7189
}
7290

7391
@VisibleForTesting
74-
protected void putDatasource(final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener)
75-
throws IOException {
76-
Datasource datasource = Datasource.Builder.build(request);
77-
datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, listener));
92+
protected void internalDoExecute(
93+
final PutDatasourceRequest request,
94+
final LockModel lock,
95+
final ActionListener<AcknowledgedResponse> listener
96+
) {
97+
StepListener<Void> createIndexStep = new StepListener<>();
98+
datasourceFacade.createIndexIfNotExists(createIndexStep);
99+
createIndexStep.whenComplete(v -> {
100+
Datasource datasource = Datasource.Builder.build(request);
101+
datasourceFacade.putDatasource(
102+
datasource,
103+
getIndexResponseListener(datasource, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), listener)
104+
);
105+
}, exception -> listener.onFailure(exception));
78106
}
79107

80108
@VisibleForTesting
81109
protected ActionListener<IndexResponse> getIndexResponseListener(
82110
final Datasource datasource,
111+
final Runnable renewLock,
83112
final ActionListener<AcknowledgedResponse> listener
84113
) {
85114
return new ActionListener<>() {
86115
@Override
87116
public void onResponse(final IndexResponse indexResponse) {
88117
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
89118
// pool.
90-
threadPool.generic().submit(() -> { createDatasource(datasource); });
119+
threadPool.generic().submit(() -> { createDatasource(datasource, renewLock); });
91120
listener.onResponse(new AcknowledgedResponse(true));
92121
}
93122

@@ -103,15 +132,15 @@ public void onFailure(final Exception e) {
103132
}
104133

105134
@VisibleForTesting
106-
protected void createDatasource(final Datasource datasource) {
135+
protected void createDatasource(final Datasource datasource, final Runnable renewLock) {
107136
if (DatasourceState.CREATING.equals(datasource.getState()) == false) {
108137
log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.CREATING, datasource.getState());
109138
markDatasourceAsCreateFailed(datasource);
110139
return;
111140
}
112141

113142
try {
114-
datasourceUpdateService.updateOrCreateGeoIpData(datasource);
143+
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
115144
} catch (Exception e) {
116145
log.error("Failed to create datasource for {}", datasource.getName(), e);
117146
markDatasourceAsCreateFailed(datasource);

src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.zip.ZipEntry;
2626
import java.util.zip.ZipInputStream;
2727

28+
import lombok.NonNull;
2829
import lombok.extern.log4j.Log4j2;
2930

3031
import org.apache.commons.csv.CSVFormat;
@@ -332,8 +333,15 @@ public void onFailure(final Exception e) {
332333
* @param fields Field name matching with data in CSVRecord in order
333334
* @param iterator GeoIP data to insert
334335
* @param bulkSize Bulk size of data to process
336+
* @param renewLock Runnable to renew lock
335337
*/
336-
public void putGeoIpData(final String indexName, final String[] fields, final Iterator<CSVRecord> iterator, final int bulkSize) {
338+
public void putGeoIpData(
339+
@NonNull final String indexName,
340+
@NonNull final String[] fields,
341+
@NonNull final Iterator<CSVRecord> iterator,
342+
final int bulkSize,
343+
@NonNull final Runnable renewLock
344+
) {
337345
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
338346
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
339347
while (iterator.hasNext()) {
@@ -352,6 +360,7 @@ public void putGeoIpData(final String indexName, final String[] fields, final It
352360
}
353361
bulkRequest.requests().clear();
354362
}
363+
renewLock.run();
355364
}
356365
StashedThreadContext.run(client, () -> {
357366
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);

src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoLockService.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,25 @@
77

88
import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME;
99

10+
import java.time.Instant;
1011
import java.util.concurrent.CountDownLatch;
1112
import java.util.concurrent.TimeUnit;
1213
import java.util.concurrent.atomic.AtomicReference;
1314

15+
import org.opensearch.OpenSearchException;
1416
import org.opensearch.action.ActionListener;
1517
import org.opensearch.client.Client;
1618
import org.opensearch.cluster.service.ClusterService;
17-
import org.opensearch.common.inject.Inject;
18-
import org.opensearch.common.unit.TimeValue;
1919
import org.opensearch.jobscheduler.spi.LockModel;
2020
import org.opensearch.jobscheduler.spi.utils.LockService;
2121

2222
/**
2323
* A wrapper of job scheduler's lock service for datasource
2424
*/
2525
public class Ip2GeoLockService {
26+
public static final long LOCK_DURATION_IN_SECONDS = 300l;
27+
public static final long RENEW_AFTER_IN_SECONDS = 120l;
2628
private final ClusterService clusterService;
27-
private final Client client;
2829
private final LockService lockService;
2930

3031
/**
@@ -33,10 +34,8 @@ public class Ip2GeoLockService {
3334
* @param clusterService the cluster service
3435
* @param client the client
3536
*/
36-
@Inject
3737
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
3838
this.clusterService = clusterService;
39-
this.client = client;
4039
this.lockService = new LockService(client, clusterService);
4140
}
4241

@@ -68,10 +67,9 @@ public void releaseLock(final LockModel lockModel, final ActionListener<Boolean>
6867
* Synchronous method of LockService#renewLock
6968
*
7069
* @param lockModel lock to renew
71-
* @param timeout timeout in milliseconds precise
7270
* @return renewed lock if renew succeed and null otherwise
7371
*/
74-
public LockModel renewLock(final LockModel lockModel, final TimeValue timeout) {
72+
public LockModel renewLock(final LockModel lockModel) {
7573
AtomicReference<LockModel> lockReference = new AtomicReference();
7674
CountDownLatch countDownLatch = new CountDownLatch(1);
7775
lockService.renewLock(lockModel, new ActionListener<>() {
@@ -89,10 +87,34 @@ public void onFailure(final Exception e) {
8987
});
9088

9189
try {
92-
countDownLatch.await(timeout.getMillis(), TimeUnit.MILLISECONDS);
90+
countDownLatch.await(clusterService.getClusterSettings().get(Ip2GeoSettings.TIMEOUT).getSeconds(), TimeUnit.SECONDS);
9391
return lockReference.get();
9492
} catch (InterruptedException e) {
9593
return null;
9694
}
9795
}
96+
97+
/**
98+
* Return a runnable which can renew the given lock model
99+
*
100+
* The runnable renews the lock and store the renewed lock in the AtomicReference.
101+
* It only renews the lock when it passed {@code RENEW_AFTER_IN_SECONDS} since
102+
* the last time the lock was renewed to avoid resource abuse.
103+
*
104+
* @param lockModel lock model to renew
105+
* @return runnable which can renew the given lock for every call
106+
*/
107+
public Runnable getRenewLockRunnable(final AtomicReference<LockModel> lockModel) {
108+
return () -> {
109+
LockModel preLock = lockModel.get();
110+
if (Instant.now().isBefore(preLock.getLockTime().plusSeconds(RENEW_AFTER_IN_SECONDS))) {
111+
return;
112+
}
113+
114+
lockModel.set(renewLock(lockModel.get()));
115+
if (lockModel.get() == null) {
116+
new OpenSearchException("failed to renew a lock [{}]", preLock);
117+
}
118+
};
119+
}
98120
}

src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest;
3434
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
3535
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
36+
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
3637
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
3738
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
3839
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
@@ -50,7 +51,6 @@ public class Datasource implements Writeable, ScheduledJobParameter {
5051
* Prefix of indices having Ip2Geo data
5152
*/
5253
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data";
53-
private static final long LOCK_DURATION_IN_SECONDS = 60 * 60;
5454
private static final long MAX_JITTER_IN_MINUTES = 5;
5555
private static final long ONE_DAY_IN_HOURS = 24;
5656
private static final long ONE_HOUR_IN_MINUTES = 60;
@@ -282,7 +282,7 @@ public boolean isEnabled() {
282282

283283
@Override
284284
public Long getLockDurationSeconds() {
285-
return LOCK_DURATION_IN_SECONDS;
285+
return Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;
286286
}
287287

288288
/**

src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.io.IOException;
99
import java.time.Instant;
10+
import java.util.concurrent.atomic.AtomicReference;
1011

1112
import lombok.extern.log4j.Log4j2;
1213

@@ -16,10 +17,10 @@
1617
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
1718
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
1819
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
20+
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
1921
import org.opensearch.jobscheduler.spi.JobExecutionContext;
2022
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
2123
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
22-
import org.opensearch.jobscheduler.spi.utils.LockService;
2324

2425
/**
2526
* Datasource update task
@@ -52,6 +53,7 @@ public static DatasourceRunner getJobRunnerInstance() {
5253
private DatasourceUpdateService datasourceUpdateService;
5354
private Ip2GeoExecutor ip2GeoExecutor;
5455
private DatasourceFacade datasourceFacade;
56+
private Ip2GeoLockService ip2GeoLockService;
5557
private boolean initialized;
5658

5759
private DatasourceRunner() {
@@ -65,12 +67,14 @@ public void initialize(
6567
final ClusterService clusterService,
6668
final DatasourceUpdateService datasourceUpdateService,
6769
final Ip2GeoExecutor ip2GeoExecutor,
68-
final DatasourceFacade datasourceFacade
70+
final DatasourceFacade datasourceFacade,
71+
final Ip2GeoLockService ip2GeoLockService
6972
) {
7073
this.clusterService = clusterService;
7174
this.datasourceUpdateService = datasourceUpdateService;
7275
this.ip2GeoExecutor = ip2GeoExecutor;
7376
this.datasourceFacade = datasourceFacade;
77+
this.ip2GeoLockService = ip2GeoLockService;
7478
this.initialized = true;
7579
}
7680

@@ -87,30 +91,27 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC
8791
);
8892
}
8993

90-
ip2GeoExecutor.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter, context));
94+
ip2GeoExecutor.forDatasourceUpdate().submit(updateDatasourceRunner(jobParameter));
9195
}
9296

9397
/**
9498
* Update GeoIP data
9599
*
96100
* Lock is used so that only one of nodes run this task.
97-
* Lock duration is 1 hour to avoid refreshing. This is okay because update interval is 1 day minimum.
98101
*
99102
* @param jobParameter job parameter
100-
* @param context context
101103
*/
102104
@VisibleForTesting
103-
protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter, final JobExecutionContext context) {
104-
final LockService lockService = context.getLockService();
105+
protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter) {
105106
return () -> {
106-
lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> {
107+
ip2GeoLockService.acquireLock(jobParameter.getName(), Ip2GeoLockService.LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
107108
if (lock == null) {
108109
return;
109110
}
110111
try {
111-
updateDatasource(jobParameter);
112+
updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock)));
112113
} finally {
113-
lockService.release(
114+
ip2GeoLockService.releaseLock(
114115
lock,
115116
ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock, exception); })
116117
);
@@ -120,7 +121,7 @@ protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParamet
120121
}
121122

122123
@VisibleForTesting
123-
protected void updateDatasource(final ScheduledJobParameter jobParameter) throws IOException {
124+
protected void updateDatasource(final ScheduledJobParameter jobParameter, final Runnable renewLock) throws IOException {
124125
Datasource datasource = datasourceFacade.getDatasource(jobParameter.getName());
125126
/**
126127
* If delete request comes while update task is waiting on a queue for other update tasks to complete,
@@ -143,7 +144,7 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter) throws
143144

144145
try {
145146
datasourceUpdateService.deleteUnusedIndices(datasource);
146-
datasourceUpdateService.updateOrCreateGeoIpData(datasource);
147+
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
147148
datasourceUpdateService.deleteUnusedIndices(datasource);
148149
} catch (Exception e) {
149150
log.error("Failed to update datasource for {}", datasource.getName(), e);

0 commit comments

Comments
 (0)