Skip to content

Commit 521487b

Browse files
authored
Add ConcurrentModificationException (#308)
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent dc7fb09 commit 521487b

File tree

6 files changed

+92
-6
lines changed

6 files changed

+92
-6
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.exceptions;
7+
8+
import java.io.IOException;
9+
10+
import org.opensearch.OpenSearchException;
11+
import org.opensearch.common.io.stream.StreamInput;
12+
import org.opensearch.rest.RestStatus;
13+
14+
/**
15+
* General ConcurrentModificationException corresponding to the {@link RestStatus#BAD_REQUEST} status code
16+
*
17+
* The exception is thrown when multiple mutation API is called for a same resource at the same time
18+
*/
19+
public class ConcurrentModificationException extends OpenSearchException {
20+
21+
public ConcurrentModificationException(String msg, Object... args) {
22+
super(msg, args);
23+
}
24+
25+
public ConcurrentModificationException(String msg, Throwable cause, Object... args) {
26+
super(msg, cause, args);
27+
}
28+
29+
public ConcurrentModificationException(StreamInput in) throws IOException {
30+
super(in);
31+
}
32+
33+
@Override
34+
public final RestStatus status() {
35+
return RestStatus.BAD_REQUEST;
36+
}
37+
}

src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99

1010
import lombok.extern.log4j.Log4j2;
1111

12-
import org.opensearch.OpenSearchException;
1312
import org.opensearch.ResourceNotFoundException;
1413
import org.opensearch.action.ActionListener;
1514
import org.opensearch.action.support.ActionFilters;
1615
import org.opensearch.action.support.HandledTransportAction;
1716
import org.opensearch.action.support.master.AcknowledgedResponse;
1817
import org.opensearch.common.inject.Inject;
1918
import org.opensearch.geospatial.annotation.VisibleForTesting;
19+
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
2020
import org.opensearch.geospatial.exceptions.ResourceInUseException;
2121
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
2222
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
@@ -73,7 +73,9 @@ public DeleteDatasourceTransportAction(
7373
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
7474
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
7575
if (lock == null) {
76-
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
76+
listener.onFailure(
77+
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
78+
);
7779
return;
7880
}
7981
try {

src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import lombok.extern.log4j.Log4j2;
1414

15-
import org.opensearch.OpenSearchException;
1615
import org.opensearch.ResourceAlreadyExistsException;
1716
import org.opensearch.action.ActionListener;
1817
import org.opensearch.action.StepListener;
@@ -22,6 +21,7 @@
2221
import org.opensearch.action.support.master.AcknowledgedResponse;
2322
import org.opensearch.common.inject.Inject;
2423
import org.opensearch.geospatial.annotation.VisibleForTesting;
24+
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
2525
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
2626
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
2727
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
@@ -72,7 +72,9 @@ public PutDatasourceTransportAction(
7272
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
7373
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
7474
if (lock == null) {
75-
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
75+
listener.onFailure(
76+
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
77+
);
7678
return;
7779
}
7880
try {

src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.action.support.HandledTransportAction;
2222
import org.opensearch.action.support.master.AcknowledgedResponse;
2323
import org.opensearch.common.inject.Inject;
24+
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
2425
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
2526
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
2627
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
@@ -74,7 +75,9 @@ public UpdateDatasourceTransportAction(
7475
protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
7576
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
7677
if (lock == null) {
77-
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
78+
listener.onFailure(
79+
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
80+
);
7881
return;
7982
}
8083
try {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.geospatial.exceptions;
7+
8+
import lombok.SneakyThrows;
9+
10+
import org.opensearch.common.io.stream.BytesStreamInput;
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.rest.RestStatus;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
public class ConcurrentModificationExceptionTests extends OpenSearchTestCase {
16+
public void testConstructor_whenCreated_thenSucceed() {
17+
ConcurrentModificationException exception = new ConcurrentModificationException("Resource is being modified by another processor");
18+
assertEquals(RestStatus.BAD_REQUEST, exception.status());
19+
}
20+
21+
public void testConstructor_whenCreatedWithRootCause_thenSucceed() {
22+
ConcurrentModificationException exception = new ConcurrentModificationException(
23+
"Resource is being modified by another processor",
24+
new RuntimeException()
25+
);
26+
assertEquals(RestStatus.BAD_REQUEST, exception.status());
27+
}
28+
29+
@SneakyThrows
30+
public void testConstructor_whenCreatedWithStream_thenSucceed() {
31+
ConcurrentModificationException exception = new ConcurrentModificationException(
32+
"New datasource is not compatible with existing datasource"
33+
);
34+
35+
BytesStreamOutput output = new BytesStreamOutput();
36+
exception.writeTo(output);
37+
BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes);
38+
ConcurrentModificationException copiedException = new ConcurrentModificationException(input);
39+
assertEquals(exception.getMessage(), copiedException.getMessage());
40+
assertEquals(exception.status(), copiedException.status());
41+
}
42+
}

src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ protected Datasource randomDatasource() {
183183
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
184184
Datasource datasource = new Datasource();
185185
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
186-
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS));
186+
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
187187
datasource.setState(randomState());
188188
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
189189
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));

0 commit comments

Comments
 (0)