Skip to content

Commit

Permalink
Bug fix on lock management and few performance improvements
Browse files Browse the repository at this point in the history
* Release lock before response back to caller for update/delete API
* Release lock in background task for creation API
* Change index settings to improve indexing performance

Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 committed May 16, 2023
1 parent c77c5af commit 68cf11d
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,11 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request,
}
try {
deleteDatasource(request.getName());
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
exception -> { log.error("Failed to release the lock", exception); }
)
);
}
}, exception -> { listener.onFailure(exception); }));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ protected void doExecute(final Task task, final PutDatasourceRequest request, fi
try {
internalDoExecute(request, lock, listener);
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
);
}
}, exception -> { listener.onFailure(exception); }));
}

/**
* This method takes lock as a parameter and is responsible for releasing lock
* unless exception is thrown
*/
@VisibleForTesting
protected void internalDoExecute(
final PutDatasourceRequest request,
Expand All @@ -100,30 +100,42 @@ protected void internalDoExecute(
datasourceFacade.createIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> {
Datasource datasource = Datasource.Builder.build(request);
datasourceFacade.putDatasource(
datasource,
getIndexResponseListener(datasource, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), listener)
);
}, exception -> listener.onFailure(exception));
datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, lock, listener));
}, exception -> {
lockService.releaseLock(lock);
listener.onFailure(exception);
});
}

/**
* This method takes lock as a parameter and is responsible for releasing lock
* unless exception is thrown
*/
@VisibleForTesting
protected ActionListener<IndexResponse> getIndexResponseListener(
final Datasource datasource,
final Runnable renewLock,
final LockModel lock,
final ActionListener<AcknowledgedResponse> listener
) {
return new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
// pool.
threadPool.generic().submit(() -> { createDatasource(datasource, renewLock); });
threadPool.generic().submit(() -> {
AtomicReference<LockModel> lockReference = new AtomicReference<>(lock);
try {
createDatasource(datasource, lockService.getRenewLockRunnable(lockReference));
} finally {
lockService.releaseLock(lockReference.get());
}
});
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(final Exception e) {
lockService.releaseLock(lock);
if (e instanceof VersionConflictEngineException) {
listener.onFailure(new ResourceAlreadyExistsException("datasource [{}] already exists", datasource.getName()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand Down Expand Up @@ -81,27 +80,21 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
);
return;
}

try {
Datasource datasource = datasourceFacade.getDatasource(request.getName());
if (datasource == null) {
listener.onFailure(new ResourceNotFoundException("no such datasource exist"));
return;
throw new ResourceNotFoundException("no such datasource exist");
}
validate(request, datasource);
updateIfChanged(request, datasource);
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
exception -> { log.error("Failed to release the lock", exception); }
)
);
}
}, exception -> { listener.onFailure(exception); }));
}, exception -> listener.onFailure(exception)));
}

private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
Expand All @@ -67,6 +66,8 @@ public class GeoIpDataFacade {
private static final String IP_RANGE_FIELD_NAME = "_cidr";
private static final String DATA_FIELD_NAME = "_data";
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1);
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_REPLICAS = new Tuple<>("index.number_of_replicas", 0);
private static final Tuple<String, Integer> INDEX_SETTING_REFRESH_INTERVAL = new Tuple<>("index.refresh_interval", -1);
private static final Tuple<String, String> INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all");
private static final Tuple<String, Boolean> INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true);
private static final Tuple<String, Boolean> INDEX_SETTING_READ_ONLY_ALLOW_DELETE = new Tuple<>(
Expand All @@ -84,7 +85,12 @@ public GeoIpDataFacade(final ClusterService clusterService, final Client client)
}

/**
* Create an index of single shard with auto expand replicas to all nodes
* Create an index for GeoIP data
*
* Index setting start with single shard, zero replica, no refresh interval, and hidden.
* Once the GeoIP data is indexed, do refresh and force merge.
* Then, change the index setting to expand replica to all nodes, and read only allow delete.
* See {@link #freezeIndex}
*
* @param indexName index name
*/
Expand All @@ -94,7 +100,8 @@ public void createIndexIfNotExists(final String indexName) {
}
final Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_REFRESH_INTERVAL.v1(), INDEX_SETTING_REFRESH_INTERVAL.v2());
indexSettings.put(INDEX_SETTING_NUM_OF_REPLICAS.v1(), INDEX_SETTING_NUM_OF_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping());
StashedThreadContext.run(
Expand All @@ -103,6 +110,24 @@ public void createIndexIfNotExists(final String indexName) {
);
}

private void freezeIndex(final String indexName) {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
Map<String, Object> settings = new HashMap<>();
settings.put(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2());
settings.put(INDEX_SETTING_NUM_OF_REPLICAS.v1(), null);
settings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(settings)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
});
}

/**
* Generate XContentBuilder representing datasource database index mapping
*
Expand Down Expand Up @@ -349,7 +374,7 @@ public void putGeoIpData(
@NonNull final Runnable renewLock
) {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
final BulkRequest bulkRequest = new BulkRequest();
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
Expand All @@ -368,16 +393,7 @@ public void putGeoIpData(
}
renewLock.run();
}
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
.execute()
.actionGet(timeout);
});
freezeIndex(indexName);
}

public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.client.Client;
Expand All @@ -22,6 +24,7 @@
/**
* A wrapper of job scheduler's lock service for datasource
*/
@Log4j2
public class Ip2GeoLockService {
public static final long LOCK_DURATION_IN_SECONDS = 300l;
public static final long RENEW_AFTER_IN_SECONDS = 120l;
Expand Down Expand Up @@ -57,10 +60,12 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco
* Wrapper method of LockService#release
*
* @param lockModel the lock model
* @param listener the listener
*/
public void releaseLock(final LockModel lockModel, final ActionListener<Boolean> listener) {
lockService.release(lockModel, listener);
public void releaseLock(final LockModel lockModel) {
lockService.release(
lockModel,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
);
}

/**
Expand Down Expand Up @@ -110,7 +115,6 @@ public Runnable getRenewLockRunnable(final AtomicReference<LockModel> lockModel)
if (Instant.now().isBefore(preLock.getLockTime().plusSeconds(RENEW_AFTER_IN_SECONDS))) {
return;
}

lockModel.set(renewLock(lockModel.get()));
if (lockModel.get() == null) {
new OpenSearchException("failed to renew a lock [{}]", preLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,7 @@ protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParamet
try {
updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock)));
} finally {
ip2GeoLockService.releaseLock(
lock,
ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock, exception); })
);
ip2GeoLockService.releaseLock(lock);
}
}, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName(), exception); }));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void validateDoExecute(final LockModel lockModel, final Exception except
verify(listener).onFailure(any(OpenSearchException.class));
} else {
verify(listener).onResponse(new AcknowledgedResponse(true));
verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class));
verify(ip2GeoLockService).releaseLock(eq(lockModel));
}
} else {
// Run
Expand Down
Loading

0 comments on commit 68cf11d

Please sign in to comment.