Skip to content

Commit d00f8e7

Browse files
committed
Add wrapper class of job scheduler lock service
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent be4aa03 commit d00f8e7

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.geospatial.ip2geo.common;
10+
11+
import static org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension.JOB_INDEX_NAME;
12+
13+
import java.util.concurrent.CountDownLatch;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicReference;
16+
17+
import org.opensearch.action.ActionListener;
18+
import org.opensearch.client.Client;
19+
import org.opensearch.cluster.service.ClusterService;
20+
import org.opensearch.common.inject.Inject;
21+
import org.opensearch.common.unit.TimeValue;
22+
import org.opensearch.jobscheduler.spi.LockModel;
23+
import org.opensearch.jobscheduler.spi.utils.LockService;
24+
25+
/**
26+
* A wrapper of job scheduler's lock service for datasource
27+
*/
28+
public class Ip2GeoLockService {
29+
private final ClusterService clusterService;
30+
private final Client client;
31+
private final LockService lockService;
32+
33+
/**
34+
* Default constructor
35+
*
36+
* @param clusterService the cluster service
37+
* @param client the client
38+
*/
39+
@Inject
40+
public Ip2GeoLockService(final ClusterService clusterService, final Client client) {
41+
this.clusterService = clusterService;
42+
this.client = client;
43+
this.lockService = new LockService(client, clusterService);
44+
}
45+
46+
/**
47+
* Wrapper method of LockService#acquireLockWithId
48+
*
49+
* Datasource use its name as doc id in job scheduler. Therefore, we can use datasource name to acquire
50+
* a lock on a datasource.
51+
*
52+
* @param datasourceName datasourceName to acquire lock on
53+
* @param lockDurationSeconds the lock duration in seconds
54+
* @param listener the listener
55+
*/
56+
public void acquireLock(final String datasourceName, final Long lockDurationSeconds, final ActionListener<LockModel> listener) {
57+
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, datasourceName, listener);
58+
}
59+
60+
/**
61+
* Wrapper method of LockService#release
62+
*
63+
* @param lockModel the lock model
64+
* @param listener the listener
65+
*/
66+
public void releaseLock(final LockModel lockModel, final ActionListener<Boolean> listener) {
67+
lockService.release(lockModel, listener);
68+
}
69+
70+
/**
71+
* Synchronous method of LockService#renewLock
72+
*
73+
* @param lockModel lock to renew
74+
* @param timeout timeout in milliseconds precise
75+
* @return renewed lock if renew succeed and null otherwise
76+
*/
77+
public LockModel renewLock(final LockModel lockModel, final TimeValue timeout) {
78+
AtomicReference<LockModel> lockReference = new AtomicReference();
79+
CountDownLatch countDownLatch = new CountDownLatch(1);
80+
lockService.renewLock(lockModel, new ActionListener<>() {
81+
@Override
82+
public void onResponse(final LockModel lockModel) {
83+
lockReference.set(lockModel);
84+
countDownLatch.countDown();
85+
}
86+
87+
@Override
88+
public void onFailure(final Exception e) {
89+
lockReference.set(null);
90+
countDownLatch.countDown();
91+
}
92+
});
93+
94+
try {
95+
countDownLatch.await(timeout.getMillis(), TimeUnit.MILLISECONDS);
96+
return lockReference.get();
97+
} catch (InterruptedException e) {
98+
return null;
99+
}
100+
}
101+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.geospatial.ip2geo.common;
10+
11+
import static org.mockito.Mockito.mock;
12+
13+
import java.time.Instant;
14+
15+
import org.junit.Before;
16+
import org.opensearch.action.ActionListener;
17+
import org.opensearch.common.unit.TimeValue;
18+
import org.opensearch.geospatial.GeospatialTestHelper;
19+
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
20+
21+
public class Ip2GeoLockServiceTests extends Ip2GeoTestCase {
22+
private Ip2GeoLockService ip2GeoLockService;
23+
24+
@Before
25+
public void init() {
26+
ip2GeoLockService = new Ip2GeoLockService(clusterService, client);
27+
}
28+
29+
public void testAcquireLock_whenValidInput_thenSucceed() {
30+
// Cannot test because LockService is final class
31+
// Simply calling method to increase coverage
32+
ip2GeoLockService.acquireLock(GeospatialTestHelper.randomLowerCaseString(), randomPositiveLong(), mock(ActionListener.class));
33+
}
34+
35+
public void testReleaseLock_whenValidInput_thenSucceed() {
36+
// Cannot test because LockService is final class
37+
// Simply calling method to increase coverage
38+
ip2GeoLockService.releaseLock(null, mock(ActionListener.class));
39+
}
40+
41+
public void testRenewLock_whenCalled_thenNotBlocked() {
42+
long timeoutInMillis = 10000;
43+
long expectedDurationInMillis = 1000;
44+
Instant before = Instant.now();
45+
assertNull(ip2GeoLockService.renewLock(null, TimeValue.timeValueMillis(timeoutInMillis)));
46+
Instant after = Instant.now();
47+
assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis);
48+
}
49+
}

0 commit comments

Comments
 (0)