From 3f7b6e903bb2068ff080f278833c4263b35a963d Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Thu, 4 May 2023 12:08:29 -0700 Subject: [PATCH] Implement get datasource api (#279) Signed-off-by: Heemin Kim --- .../ip2geo/action/GetDatasourceAction.java | 29 +++++ .../ip2geo/action/GetDatasourceRequest.java | 69 ++++++++++++ .../ip2geo/action/GetDatasourceResponse.java | 91 ++++++++++++++++ .../action/GetDatasourceTransportAction.java | 76 +++++++++++++ .../action/RestGetDatasourceHandler.java | 49 +++++++++ .../action/RestPutDatasourceHandler.java | 6 +- .../ip2geo/common/DatasourceFacade.java | 89 +++++++++++++++ .../ip2geo/jobscheduler/Datasource.java | 96 ++++++++++++++--- .../geospatial/plugin/GeospatialPlugin.java | 13 ++- .../geospatial/ip2geo/Ip2GeoTestCase.java | 21 ++-- .../action/GetDatasourceRequestTests.java | 46 ++++++++ .../action/GetDatasourceResponseTests.java | 99 +++++++++++++++++ .../GetDatasourceTransportActionTests.java | 88 +++++++++++++++ .../action/PutDatasourceRequestTests.java | 2 +- .../action/RestGetDatasourceHandlerTests.java | 81 ++++++++++++++ .../ip2geo/common/DatasourceFacadeTests.java | 101 ++++++++++++++++-- .../DatasourceExtensionTests.java | 3 +- .../ip2geo/jobscheduler/DatasourceTests.java | 8 +- .../plugin/GeospatialPluginTests.java | 4 +- 19 files changed, 929 insertions(+), 42 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java new file mode 100644 index 00000000..09f90934 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import org.opensearch.action.ActionType; + +/** + * Ip2Geo datasource get action + */ +public class GetDatasourceAction extends ActionType { + /** + * Get datasource action instance + */ + public static final GetDatasourceAction INSTANCE = new GetDatasourceAction(); + /** + * Name of a get datasource action + */ + public static final String NAME = "cluster:admin/geospatial/datasource/get"; + + private GetDatasourceAction() { + super(NAME, GetDatasourceResponse::new); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java new file mode 100644 index 00000000..a3de669d --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; + +import lombok.Getter; +import lombok.Setter; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +/** + * Ip2Geo datasource get request + */ +@Getter +@Setter +public class GetDatasourceRequest extends ActionRequest { + /** + * @param names the datasource names + * @return the datasource names + */ + private String[] names; + + /** + * Constructs a new get datasource request with a list of datasources. + * + * If the list of datasources is empty or it contains a single element "_all", all registered datasources + * are returned. + * + * @param names list of datasource names + */ + public GetDatasourceRequest(final String[] names) { + this.names = names; + } + + /** + * Constructor with stream input + * @param in the stream input + * @throws IOException IOException + */ + public GetDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.names = in.readStringArray(); + } + + @Override + public ActionRequestValidationException validate() { + if (names == null) { + throw new OpenSearchException("names should not be null"); + } + return null; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(names); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java new file mode 100644 index 00000000..e4915fa8 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; + +/** + * Ip2Geo datasource get request + */ +@Getter +@Setter +@EqualsAndHashCode +public class GetDatasourceResponse extends ActionResponse implements ToXContentObject { + private static final ParseField FIELD_NAME_DATASOURCES = new ParseField("datasources"); + private static final ParseField FIELD_NAME_NAME = new ParseField("name"); + private static final ParseField FIELD_NAME_STATE = new ParseField("state"); + private static final ParseField FIELD_NAME_ENDPOINT = new ParseField("endpoint"); + private static final ParseField FIELD_NAME_UPDATE_INTERVAL = new ParseField("update_interval_in_days"); + private static final ParseField FIELD_NAME_NEXT_UPDATE_AT = new ParseField("next_update_at_in_epoch_millis"); + private static final ParseField FIELD_NAME_NEXT_UPDATE_AT_READABLE = new ParseField("next_update_at"); + private static final ParseField FIELD_NAME_DATABASE = new ParseField("database"); + private static final ParseField FIELD_NAME_UPDATE_STATS = new ParseField("update_stats"); + private List datasources; + + /** + * Default constructor + * + * @param datasources List of datasources + */ + public GetDatasourceResponse(final List datasources) { + this.datasources = datasources; + } + + /** + * Constructor with StreamInput + * + * @param in the stream input + */ + public GetDatasourceResponse(final StreamInput in) throws IOException { + datasources = in.readList(Datasource::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeList(datasources); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + builder.startArray(FIELD_NAME_DATASOURCES.getPreferredName()); + for (Datasource datasource : datasources) { + builder.startObject(); + builder.field(FIELD_NAME_NAME.getPreferredName(), datasource.getName()); + builder.field(FIELD_NAME_STATE.getPreferredName(), datasource.getState()); + builder.field(FIELD_NAME_ENDPOINT.getPreferredName(), datasource.getEndpoint()); + builder.field(FIELD_NAME_UPDATE_INTERVAL.getPreferredName(), datasource.getSchedule().getInterval()); + builder.timeField( + FIELD_NAME_NEXT_UPDATE_AT.getPreferredName(), + FIELD_NAME_NEXT_UPDATE_AT_READABLE.getPreferredName(), + datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli() + ); + builder.field(FIELD_NAME_DATABASE.getPreferredName(), datasource.getDatabase()); + builder.field(FIELD_NAME_UPDATE_STATS.getPreferredName(), datasource.getUpdateStats()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java new file mode 100644 index 00000000..4bc69949 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.util.List; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * Transport action to get datasource + */ +public class GetDatasourceTransportAction extends HandledTransportAction { + private final DatasourceFacade datasourceFacade; + + /** + * Default constructor + * @param transportService the transport service + * @param actionFilters the action filters + * @param datasourceFacade the datasource facade + */ + @Inject + public GetDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final DatasourceFacade datasourceFacade + ) { + super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new); + this.datasourceFacade = datasourceFacade; + } + + @Override + protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener listener) { + if (shouldGetAllDatasource(request)) { + // We don't expect too many data sources. Therefore, querying all data sources without pagination should be fine. + datasourceFacade.getAllDatasources(newActionListener(listener)); + } else { + datasourceFacade.getDatasources(request.getNames(), newActionListener(listener)); + } + } + + private boolean shouldGetAllDatasource(final GetDatasourceRequest request) { + if (request.getNames() == null) { + throw new OpenSearchException("names in a request should not be null"); + } + + return request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0])); + } + + private ActionListener> newActionListener(final ActionListener listener) { + return new ActionListener<>() { + @Override + public void onResponse(final List datasources) { + listener.onResponse(new GetDatasourceResponse(datasources)); + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + }; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java new file mode 100644 index 00000000..0dc50530 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; +import static org.opensearch.rest.RestRequest.Method.GET; + +import java.util.List; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +/** + * Rest handler for Ip2Geo datasource get request + */ +public class RestGetDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "ip2geo_datasource_get"; + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final String[] names = request.paramAsStringArray("name", Strings.EMPTY_ARRAY); + final GetDatasourceRequest getDatasourceRequest = new GetDatasourceRequest(names); + + return channel -> client.executeLocally(GetDatasourceAction.INSTANCE, getDatasourceRequest, new RestToXContentListener<>(channel)); + } + + @Override + public List routes() { + return List.of( + new Route(GET, String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource")), + new Route(GET, String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}")) + ); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java index 3ccffa06..d35b7751 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java @@ -39,7 +39,7 @@ * */ public class RestPutDatasourceHandler extends BaseRestHandler { - private static final String ACTION_NAME = "ip2geo_datasource"; + private static final String ACTION_NAME = "ip2geo_datasource_put"; private final ClusterSettings clusterSettings; public RestPutDatasourceHandler(final ClusterSettings clusterSettings) { @@ -53,7 +53,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("id")); + final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name")); if (request.hasContentOrSourceParam()) { try (XContentParser parser = request.contentOrSourceParamParser()) { PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null); @@ -70,7 +70,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No @Override public List routes() { - String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{id}"); + String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}"); return List.of(new Route(PUT, path)); } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java index 4d964273..25d2f5af 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java @@ -10,16 +10,25 @@ import java.io.IOException; import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; +import org.opensearch.action.get.MultiGetItemResponse; +import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentFactory; @@ -30,12 +39,15 @@ import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; /** * Facade class for datasource */ @Log4j2 public class DatasourceFacade { + private static final Integer MAX_SIZE = 1000; private final Client client; private final ClusterSettings clusterSettings; @@ -120,4 +132,81 @@ public void onFailure(final Exception e) { } }); } + + /** + * Get datasources from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param names the array of datasource names + * @param actionListener the action listener + */ + public void getDatasources(final String[] names, final ActionListener> actionListener) { + client.prepareMultiGet() + .add(DatasourceExtension.JOB_INDEX_NAME, names) + .execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener)); + } + + /** + * Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param actionListener the action listener + */ + public void getAllDatasources(final ActionListener> actionListener) { + client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) + .setQuery(QueryBuilders.matchAllQuery()) + .setSize(MAX_SIZE) + .execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener)); + } + + private ActionListener createGetDataSourceQueryActionLister( + final Class response, + final ActionListener> actionListener + ) { + return new ActionListener() { + @Override + public void onResponse(final T response) { + try { + List bytesReferences = toBytesReferences(response); + List datasources = bytesReferences.stream() + .map(bytesRef -> toDatasource(bytesRef)) + .collect(Collectors.toList()); + actionListener.onResponse(datasources); + } catch (Exception e) { + actionListener.onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + }; + } + + private List toBytesReferences(final Object response) { + if (response instanceof SearchResponse) { + SearchResponse searchResponse = (SearchResponse) response; + return Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getSourceRef).collect(Collectors.toList()); + } else if (response instanceof MultiGetResponse) { + MultiGetResponse multiGetResponse = (MultiGetResponse) response; + return Arrays.stream(multiGetResponse.getResponses()) + .map(MultiGetItemResponse::getResponse) + .filter(Objects::nonNull) + .filter(GetResponse::isExists) + .map(GetResponse::getSourceAsBytesRef) + .collect(Collectors.toList()); + } else { + throw new OpenSearchException("No supported instance type[{}] is provided", response.getClass()); + } + } + + private Datasource toDatasource(final BytesReference bytesReference) { + try { + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + bytesReference + ); + return Datasource.PARSER.parse(parser, null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 2f93cff5..7c7eae55 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -25,6 +25,9 @@ import lombok.Setter; import lombok.ToString; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.ConstructingObjectParser; import org.opensearch.core.xcontent.ToXContent; @@ -45,7 +48,7 @@ @ToString @EqualsAndHashCode @AllArgsConstructor -public class Datasource implements ScheduledJobParameter { +public class Datasource implements Writeable, ScheduledJobParameter { /** * Prefix of indices having Ip2Geo data */ @@ -142,7 +145,7 @@ public class Datasource implements ScheduledJobParameter { "datasource_metadata", true, args -> { - String id = (String) args[0]; + String name = (String) args[0]; Instant lastUpdateTime = Instant.ofEpochMilli((long) args[1]); Instant enabledTime = args[2] == null ? null : Instant.ofEpochMilli((long) args[2]); boolean isEnabled = (boolean) args[3]; @@ -153,7 +156,7 @@ public class Datasource implements ScheduledJobParameter { Database database = (Database) args[8]; UpdateStats updateStats = (UpdateStats) args[9]; Datasource parameter = new Datasource( - id, + name, lastUpdateTime, enabledTime, isEnabled, @@ -187,9 +190,9 @@ public Datasource() { this(null, null, null); } - public Datasource(final String id, final IntervalSchedule schedule, final String endpoint) { + public Datasource(final String name, final IntervalSchedule schedule, final String endpoint) { this( - id, + name, Instant.now().truncatedTo(ChronoUnit.MILLIS), null, false, @@ -202,6 +205,33 @@ public Datasource(final String id, final IntervalSchedule schedule, final String ); } + public Datasource(final StreamInput in) throws IOException { + name = in.readString(); + lastUpdateTime = toInstant(in.readVLong()); + enabledTime = toInstant(in.readOptionalVLong()); + isEnabled = in.readBoolean(); + schedule = new IntervalSchedule(in); + endpoint = in.readString(); + state = DatasourceState.valueOf(in.readString()); + indices = in.readStringList(); + database = new Database(in); + updateStats = new UpdateStats(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeString(name); + out.writeVLong(lastUpdateTime.toEpochMilli()); + out.writeOptionalVLong(enabledTime == null ? null : enabledTime.toEpochMilli()); + out.writeBoolean(isEnabled); + schedule.writeTo(out); + out.writeString(endpoint); + out.writeString(state.name()); + out.writeStringCollection(indices); + database.writeTo(out); + updateStats.writeTo(out); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); @@ -378,6 +408,10 @@ public boolean isCompatible(final List fields) { return true; } + private static Instant toInstant(final Long epochMilli) { + return epochMilli == null ? null : Instant.ofEpochMilli(epochMilli); + } + /** * Database of a datasource */ @@ -387,11 +421,11 @@ public boolean isCompatible(final List fields) { @EqualsAndHashCode @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) - public static class Database implements ToXContent { + public static class Database implements Writeable, ToXContent { private static final ParseField PROVIDER_FIELD = new ParseField("provider"); private static final ParseField SHA256_HASH_FIELD = new ParseField("sha256_hash"); - private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at"); - private static final ParseField UPDATED_AT_FIELD_READABLE = new ParseField("updated_at_field"); + private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at_in_epoch_millis"); + private static final ParseField UPDATED_AT_FIELD_READABLE = new ParseField("updated_at"); private static final ParseField FIELDS_FIELD = new ParseField("fields"); private static final ParseField VALID_FOR_IN_DAYS_FIELD = new ParseField("valid_for_in_days"); @@ -441,6 +475,23 @@ public static class Database implements ToXContent { PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), FIELDS_FIELD); } + public Database(final StreamInput in) throws IOException { + provider = in.readOptionalString(); + sha256Hash = in.readOptionalString(); + updatedAt = toInstant(in.readOptionalVLong()); + validForInDays = in.readOptionalVLong(); + fields = in.readOptionalStringList(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalString(provider); + out.writeOptionalString(sha256Hash); + out.writeOptionalVLong(updatedAt.toEpochMilli()); + out.writeOptionalVLong(validForInDays); + out.writeOptionalStringCollection(fields); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); @@ -481,14 +532,14 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa @EqualsAndHashCode @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) - public static class UpdateStats implements ToXContent { - private static final ParseField LAST_SUCCEEDED_AT_FIELD = new ParseField("last_succeeded_at"); - private static final ParseField LAST_SUCCEEDED_AT_FIELD_READABLE = new ParseField("last_succeeded_at_field"); + public static class UpdateStats implements Writeable, ToXContent { + private static final ParseField LAST_SUCCEEDED_AT_FIELD = new ParseField("last_succeeded_at_in_epoch_millis"); + private static final ParseField LAST_SUCCEEDED_AT_FIELD_READABLE = new ParseField("last_succeeded_at"); private static final ParseField LAST_PROCESSING_TIME_IN_MILLIS_FIELD = new ParseField("last_processing_time_in_millis"); - private static final ParseField LAST_FAILED_AT_FIELD = new ParseField("last_failed_at"); - private static final ParseField LAST_FAILED_AT_FIELD_READABLE = new ParseField("last_failed_at_field"); - private static final ParseField LAST_SKIPPED_AT = new ParseField("last_skipped_at"); - private static final ParseField LAST_SKIPPED_AT_READABLE = new ParseField("last_skipped_at_field"); + private static final ParseField LAST_FAILED_AT_FIELD = new ParseField("last_failed_at_in_epoch_millis"); + private static final ParseField LAST_FAILED_AT_FIELD_READABLE = new ParseField("last_failed_at"); + private static final ParseField LAST_SKIPPED_AT = new ParseField("last_skipped_at_in_epoch_millis"); + private static final ParseField LAST_SKIPPED_AT_READABLE = new ParseField("last_skipped_at"); /** * @param lastSucceededAt The last time when GeoIP data update was succeeded @@ -530,6 +581,21 @@ public static class UpdateStats implements ToXContent { PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_SKIPPED_AT); } + public UpdateStats(final StreamInput in) throws IOException { + lastSucceededAt = toInstant(in.readOptionalVLong()); + lastProcessingTimeInMillis = in.readOptionalVLong(); + lastFailedAt = toInstant(in.readOptionalVLong()); + lastSkippedAt = toInstant(in.readOptionalVLong()); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalVLong(lastSucceededAt.toEpochMilli()); + out.writeOptionalVLong(lastProcessingTimeInMillis); + out.writeOptionalVLong(lastFailedAt.toEpochMilli()); + out.writeOptionalVLong(lastSkippedAt.toEpochMilli()); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 66752fc5..fc29b165 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -36,8 +36,11 @@ import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper; import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldTypeParser; import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder; +import org.opensearch.geospatial.ip2geo.action.GetDatasourceAction; +import org.opensearch.geospatial.ip2geo.action.GetDatasourceTransportAction; import org.opensearch.geospatial.ip2geo.action.PutDatasourceAction; import org.opensearch.geospatial.ip2geo.action.PutDatasourceTransportAction; +import org.opensearch.geospatial.ip2geo.action.RestGetDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; @@ -153,7 +156,12 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - return List.of(new RestUploadStatsAction(), new RestUploadGeoJSONAction(), new RestPutDatasourceHandler(clusterSettings)); + return List.of( + new RestUploadStatsAction(), + new RestUploadGeoJSONAction(), + new RestPutDatasourceHandler(clusterSettings), + new RestGetDatasourceHandler() + ); } @Override @@ -161,7 +169,8 @@ public List getRestHandlers( return List.of( new ActionHandler<>(UploadGeoJSONAction.INSTANCE, UploadGeoJSONTransportAction.class), new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class), - new ActionHandler<>(PutDatasourceAction.INSTANCE, PutDatasourceTransportAction.class) + new ActionHandler<>(PutDatasourceAction.INSTANCE, PutDatasourceTransportAction.class), + new ActionHandler<>(GetDatasourceAction.INSTANCE, GetDatasourceTransportAction.class) ); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 1051693a..48451a9c 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -116,7 +116,7 @@ public void clean() throws Exception { verifyingClient.close(); } - public DatasourceState randomStateExcept(DatasourceState state) { + protected DatasourceState randomStateExcept(DatasourceState state) { assertNotNull(state); return Arrays.stream(DatasourceState.values()) .sequential() @@ -125,14 +125,14 @@ public DatasourceState randomStateExcept(DatasourceState state) { .get(Randomness.createSecure().nextInt(DatasourceState.values().length - 2)); } - public DatasourceState randomState() { + protected DatasourceState randomState() { return Arrays.stream(DatasourceState.values()) .sequential() .collect(Collectors.toList()) .get(Randomness.createSecure().nextInt(DatasourceState.values().length - 1)); } - public String randomIpAddress() { + protected String randomIpAddress() { return String.format( Locale.ROOT, "%d.%d.%d.%d", @@ -144,12 +144,12 @@ public String randomIpAddress() { } @SuppressForbidden(reason = "unit test") - public String sampleManifestUrl() throws Exception { + protected String sampleManifestUrl() throws Exception { return Paths.get(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").toURI()).toUri().toURL().toExternalForm(); } @SuppressForbidden(reason = "unit test") - public String sampleManifestUrlWithInvalidUrl() throws Exception { + protected String sampleManifestUrlWithInvalidUrl() throws Exception { return Paths.get(this.getClass().getClassLoader().getResource("ip2geo/manifest_invalid_url.json").toURI()) .toUri() .toURL() @@ -157,11 +157,16 @@ public String sampleManifestUrlWithInvalidUrl() throws Exception { } @SuppressForbidden(reason = "unit test") - public File sampleIp2GeoFile() { + protected File sampleIp2GeoFile() { return new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); } - public Datasource randomDatasource() { + protected long randomPositiveLong() { + long value = Randomness.get().nextLong(); + return value < 0 ? -value : value; + } + + protected Datasource randomDatasource() { Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); Datasource datasource = new Datasource(); datasource.setName(GeospatialTestHelper.randomLowerCaseString()); @@ -178,7 +183,7 @@ public Datasource randomDatasource() { datasource.getUpdateStats().setLastSkippedAt(now); datasource.getUpdateStats().setLastSucceededAt(now); datasource.getUpdateStats().setLastFailedAt(now); - datasource.getUpdateStats().setLastProcessingTimeInMillis(Randomness.get().nextLong()); + datasource.getUpdateStats().setLastProcessingTimeInMillis(randomPositiveLong()); datasource.setLastUpdateTime(now); if (Randomness.get().nextInt() % 2 == 0) { datasource.enable(); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java new file mode 100644 index 00000000..41dc2d97 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class GetDatasourceRequestTests extends Ip2GeoTestCase { + public void testStreamInOut_whenEmptyNames_thenSucceed() throws Exception { + String[] names = new String[0]; + GetDatasourceRequest request = new GetDatasourceRequest(names); + assertNull(request.validate()); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + GetDatasourceRequest copiedRequest = new GetDatasourceRequest(input); + + // Verify + assertArrayEquals(request.getNames(), copiedRequest.getNames()); + } + + public void testStreamInOut_whenNames_thenSucceed() throws Exception { + String[] names = { GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString() }; + GetDatasourceRequest request = new GetDatasourceRequest(names); + assertNull(request.validate()); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + GetDatasourceRequest copiedRequest = new GetDatasourceRequest(input); + + // Verify + assertArrayEquals(request.getNames(), copiedRequest.getNames()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java new file mode 100644 index 00000000..e07feaaa --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; + +public class GetDatasourceResponseTests extends Ip2GeoTestCase { + + public void testStreamInOut_whenValidInput_thenSucceed() throws Exception { + List datasourceList = Arrays.asList(randomDatasource(), randomDatasource()); + GetDatasourceResponse response = new GetDatasourceResponse(datasourceList); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + response.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + GetDatasourceResponse copiedResponse = new GetDatasourceResponse(input); + + // Verify + assertArrayEquals(response.getDatasources().toArray(), copiedResponse.getDatasources().toArray()); + } + + public void testToXContent_whenValidInput_thenSucceed() throws Exception { + List datasourceList = Arrays.asList(randomDatasource(), randomDatasource()); + GetDatasourceResponse response = new GetDatasourceResponse(datasourceList); + String json = Strings.toString(response.toXContent(JsonXContent.contentBuilder(), null)); + for (Datasource datasource : datasourceList) { + assertTrue(json.contains(String.format(Locale.ROOT, "\"name\":\"%s\"", datasource.getName()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"state\":\"%s\"", datasource.getState()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"endpoint\":\"%s\"", datasource.getEndpoint()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"update_interval_in_days\":%d", datasource.getSchedule().getInterval()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"next_update_at_in_epoch_millis\""))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"provider\":\"%s\"", datasource.getDatabase().getProvider()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"sha256_hash\":\"%s\"", datasource.getDatabase().getSha256Hash()))); + assertTrue( + json.contains( + String.format(Locale.ROOT, "\"updated_at_in_epoch_millis\":%d", datasource.getDatabase().getUpdatedAt().toEpochMilli()) + ) + ); + assertTrue(json.contains(String.format(Locale.ROOT, "\"valid_for_in_days\":%d", datasource.getDatabase().getValidForInDays()))); + for (String field : datasource.getDatabase().getFields()) { + assertTrue(json.contains(field)); + } + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_succeeded_at_in_epoch_millis\":%d", + datasource.getUpdateStats().getLastSucceededAt().toEpochMilli() + ) + ) + ); + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_processing_time_in_millis\":%d", + datasource.getUpdateStats().getLastProcessingTimeInMillis() + ) + ) + ); + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_failed_at_in_epoch_millis\":%d", + datasource.getUpdateStats().getLastFailedAt().toEpochMilli() + ) + ) + ); + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_skipped_at_in_epoch_millis\":%d", + datasource.getUpdateStats().getLastSkippedAt().toEpochMilli() + ) + ) + ); + + } + } + +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java new file mode 100644 index 00000000..bf8de2eb --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.List; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.action.ActionListener; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.tasks.Task; + +public class GetDatasourceTransportActionTests extends Ip2GeoTestCase { + private GetDatasourceTransportAction action; + + @Before + public void init() { + action = new GetDatasourceTransportAction(transportService, actionFilters, datasourceFacade); + } + + public void testDoExecute_whenAll_thenSucceed() throws Exception { + Task task = mock(Task.class); + GetDatasourceRequest request = new GetDatasourceRequest(new String[] { "_all" }); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor>> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).getAllDatasources(captor.capture()); + + // Run + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + captor.getValue().onResponse(datasources); + + // Verify + verify(listener).onResponse(new GetDatasourceResponse(datasources)); + + // Run + RuntimeException exception = new RuntimeException(); + captor.getValue().onFailure(exception); + + // Verify + verify(listener).onFailure(exception); + } + + public void testDoExecute_whenNames_thenSucceed() { + Task task = mock(Task.class); + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + String[] datasourceNames = datasources.stream().map(Datasource::getName).toArray(String[]::new); + + GetDatasourceRequest request = new GetDatasourceRequest(datasourceNames); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor>> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).getDatasources(eq(datasourceNames), captor.capture()); + + // Run + captor.getValue().onResponse(datasources); + + // Verify + verify(listener).onResponse(new GetDatasourceResponse(datasources)); + + // Run + RuntimeException exception = new RuntimeException(); + captor.getValue().onFailure(exception); + + // Verify + verify(listener).onFailure(exception); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java index 634d9581..01d3025c 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java @@ -105,7 +105,7 @@ public void testValidateDatasourceNames() throws Exception { String domain = GeospatialTestHelper.randomLowerCaseString(); PutDatasourceRequest request = new PutDatasourceRequest(validDatasourceName); request.setEndpoint(sampleManifestUrl()); - request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(30) + 1)); + request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(10) + 1)); // Run ActionRequestValidationException exception = request.validate(); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java new file mode 100644 index 00000000..5d4bef42 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; + +import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.StringUtils; +import org.junit.Before; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +public class RestGetDatasourceHandlerTests extends RestActionTestCase { + private String PATH_FOR_ALL = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource"); + private String path; + private RestGetDatasourceHandler action; + + @Before + public void setupAction() { + action = new RestGetDatasourceHandler(); + controller().registerHandler(action); + path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/%s"); + } + + public void testPrepareRequest_whenNames_thenSucceed() { + String dsName1 = GeospatialTestHelper.randomLowerCaseString(); + String dsName2 = GeospatialTestHelper.randomLowerCaseString(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath(String.format(Locale.ROOT, path, StringUtils.joinWith(",", dsName1, dsName2))) + .build(); + + AtomicBoolean isExecuted = new AtomicBoolean(false); + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { + // Verifying + assertTrue(actionRequest instanceof GetDatasourceRequest); + GetDatasourceRequest getDatasourceRequest = (GetDatasourceRequest) actionRequest; + assertArrayEquals(new String[] { dsName1, dsName2 }, getDatasourceRequest.getNames()); + isExecuted.set(true); + return null; + }); + + // Run + dispatchRequest(request); + + // Verify + assertTrue(isExecuted.get()); + } + + public void testPrepareRequest_whenAll_thenSucceed() { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath(PATH_FOR_ALL) + .build(); + + AtomicBoolean isExecuted = new AtomicBoolean(false); + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { + // Verifying + assertTrue(actionRequest instanceof GetDatasourceRequest); + GetDatasourceRequest getDatasourceRequest = (GetDatasourceRequest) actionRequest; + assertArrayEquals(new String[] {}, getDatasourceRequest.getNames()); + isExecuted.set(true); + return null; + }); + + // Run + dispatchRequest(request); + + // Verify + assertTrue(isExecuted.get()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java index 4a34dcb3..aacd940a 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java @@ -16,14 +16,24 @@ import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import org.apache.lucene.search.TotalHits; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; +import org.opensearch.action.get.MultiGetItemResponse; +import org.opensearch.action.get.MultiGetRequest; +import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.Randomness; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -33,7 +43,10 @@ import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; public class DatasourceFacadeTests extends Ip2GeoTestCase { private DatasourceFacade datasourceFacade; @@ -46,7 +59,7 @@ public void init() { ); } - public void testUpdateDatasource() throws Exception { + public void testUpdateDatasource_whenValidInput_thenSucceed() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Datasource datasource = new Datasource( datasourceName, @@ -69,29 +82,29 @@ public void testUpdateDatasource() throws Exception { assertTrue(previousTime.isBefore(datasource.getLastUpdateTime())); } - public void testGetDatasourceException() throws Exception { + public void testGetDatasource_whenException_thenNull() throws Exception { Datasource datasource = setupClientForGetRequest(true, new IndexNotFoundException(DatasourceExtension.JOB_INDEX_NAME)); assertNull(datasourceFacade.getDatasource(datasource.getName())); } - public void testGetDatasourceExist() throws Exception { + public void testGetDatasource_whenExist_thenReturnDatasource() throws Exception { Datasource datasource = setupClientForGetRequest(true, null); assertEquals(datasource, datasourceFacade.getDatasource(datasource.getName())); } - public void testGetDatasourceNotExist() throws Exception { + public void testGetDatasource_whenNotExist_thenNull() throws Exception { Datasource datasource = setupClientForGetRequest(false, null); assertNull(datasourceFacade.getDatasource(datasource.getName())); } - public void testGetDatasourceExistWithListener() { + public void testGetDatasource_whenExistWithListener_thenListenerIsCalledWithDatasource() { Datasource datasource = setupClientForGetRequest(true, null); ActionListener listener = mock(ActionListener.class); datasourceFacade.getDatasource(datasource.getName(), listener); verify(listener).onResponse(eq(datasource)); } - public void testGetDatasourceNotExistWithListener() { + public void testGetDatasource_whenExistWithListener_thenListenerIsCalledWithNull() { Datasource datasource = setupClientForGetRequest(false, null); ActionListener listener = mock(ActionListener.class); datasourceFacade.getDatasource(datasource.getName(), listener); @@ -115,6 +128,76 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime return datasource; } + public void testGetDatasources_whenValidInput_thenSucceed() { + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + String[] names = datasources.stream().map(Datasource::getName).toArray(String[]::new); + ActionListener> listener = mock(ActionListener.class); + MultiGetItemResponse[] multiGetItemResponses = datasources.stream().map(datasource -> { + GetResponse getResponse = getMockedGetResponse(datasource); + MultiGetItemResponse multiGetItemResponse = mock(MultiGetItemResponse.class); + when(multiGetItemResponse.getResponse()).thenReturn(getResponse); + return multiGetItemResponse; + }).toArray(MultiGetItemResponse[]::new); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + // Verify + assertTrue(actionRequest instanceof MultiGetRequest); + MultiGetRequest request = (MultiGetRequest) actionRequest; + assertEquals(2, request.getItems().size()); + for (MultiGetRequest.Item item : request.getItems()) { + assertEquals(DatasourceExtension.JOB_INDEX_NAME, item.index()); + assertTrue(datasources.stream().filter(datasource -> datasource.getName().equals(item.id())).findAny().isPresent()); + } + + MultiGetResponse response = mock(MultiGetResponse.class); + when(response.getResponses()).thenReturn(multiGetItemResponses); + return response; + }); + + // Run + datasourceFacade.getDatasources(names, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(listener).onResponse(captor.capture()); + assertEquals(datasources, captor.getValue()); + + } + + public void testGetAllDatasources_whenValidInput_thenSucceed() { + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + ActionListener> listener = mock(ActionListener.class); + SearchHits searchHits = getMockedSearchHits(datasources); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + // Verify + assertTrue(actionRequest instanceof SearchRequest); + SearchRequest request = (SearchRequest) actionRequest; + assertEquals(1, request.indices().length); + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]); + assertEquals(QueryBuilders.matchAllQuery(), request.source().query()); + assertEquals(1000, request.source().size()); + + SearchResponse response = mock(SearchResponse.class); + when(response.getHits()).thenReturn(searchHits); + return response; + }); + + // Run + datasourceFacade.getAllDatasources(listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(listener).onResponse(captor.capture()); + assertEquals(datasources, captor.getValue()); + } + + private SearchHits getMockedSearchHits(List datasources) { + SearchHit[] searchHitArray = datasources.stream().map(this::toBytesReference).map(this::toSearchHit).toArray(SearchHit[]::new); + + return new SearchHits(searchHitArray, new TotalHits(1l, TotalHits.Relation.EQUAL_TO), 1); + } + private GetResponse getMockedGetResponse(Datasource datasource) { GetResponse response = mock(GetResponse.class); when(response.isExists()).thenReturn(datasource != null); @@ -133,4 +216,10 @@ private BytesReference toBytesReference(Datasource datasource) { throw new RuntimeException(e); } } + + private SearchHit toSearchHit(BytesReference bytesReference) { + SearchHit searchHit = new SearchHit(Randomness.get().nextInt()); + searchHit.sourceRef(bytesReference); + return searchHit; + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java index 3632d9e9..18fc8628 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java @@ -13,7 +13,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; -import org.opensearch.common.Randomness; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; @@ -39,7 +38,7 @@ public void testParser() throws Exception { .parse( createParser(datasource.toXContent(XContentFactory.jsonBuilder(), null)), GeospatialTestHelper.randomLowerCaseString(), - new JobDocVersion(Randomness.get().nextLong(), Randomness.get().nextLong(), Randomness.get().nextLong()) + new JobDocVersion(randomPositiveLong(), randomPositiveLong(), randomPositiveLong()) ); assertTrue(datasource.equals(anotherDatasource)); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index d1a19c0c..1f547bf9 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -21,11 +21,11 @@ import org.opensearch.common.Randomness; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; -import org.opensearch.test.OpenSearchTestCase; -public class DatasourceTests extends OpenSearchTestCase { +public class DatasourceTests extends Ip2GeoTestCase { public void testParser() throws Exception { String id = GeospatialTestHelper.randomLowerCaseString(); @@ -38,7 +38,7 @@ public void testParser() throws Exception { datasource.getDatabase().setUpdatedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); datasource.getDatabase().setSha256Hash(GeospatialTestHelper.randomLowerCaseString()); datasource.getDatabase().setValidForInDays(1l); - datasource.getUpdateStats().setLastProcessingTimeInMillis(Randomness.get().nextLong()); + datasource.getUpdateStats().setLastProcessingTimeInMillis(randomPositiveLong()); datasource.getUpdateStats().setLastSucceededAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); datasource.getUpdateStats().setLastSkippedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); datasource.getUpdateStats().setLastFailedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); @@ -67,7 +67,7 @@ public void testCurrentIndexName() { } public void testGetIndexNameFor() { - long updatedAt = Randomness.get().nextLong(); + long updatedAt = randomPositiveLong(); DatasourceManifest manifest = mock(DatasourceManifest.class); when(manifest.getUpdatedAt()).thenReturn(updatedAt); diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index 6d9430bc..3917e7c7 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -32,6 +32,7 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; +import org.opensearch.geospatial.ip2geo.action.RestGetDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; @@ -59,7 +60,8 @@ public class GeospatialPluginTests extends OpenSearchTestCase { private final List SUPPORTED_REST_HANDLERS = List.of( new RestUploadGeoJSONAction(), new RestUploadStatsAction(), - new RestPutDatasourceHandler(clusterSettings) + new RestPutDatasourceHandler(clusterSettings), + new RestGetDatasourceHandler() ); private final Set SUPPORTED_SYSTEM_INDEX_PATTERN = Set.of(IP2GEO_DATA_INDEX_NAME_PREFIX);