Skip to content

Commit 82c4c98

Browse files
committed
Implement get datasource api
Signed-off-by: Heemin Kim <heemin@amazon.com>
1 parent 74c6903 commit 82c4c98

14 files changed

+849
-24
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.geospatial.ip2geo.action;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Ip2Geo datasource get action
15+
*/
16+
public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
17+
/**
18+
* Get datasource action instance
19+
*/
20+
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
21+
/**
22+
* Name of a get datasource action
23+
*/
24+
public static final String NAME = "cluster:admin/geospatial/datasource/get";
25+
26+
private GetDatasourceAction() {
27+
super(NAME, GetDatasourceResponse::new);
28+
}
29+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.geospatial.ip2geo.action;
10+
11+
import java.io.IOException;
12+
13+
import lombok.EqualsAndHashCode;
14+
import lombok.Getter;
15+
import lombok.Setter;
16+
import lombok.extern.log4j.Log4j2;
17+
18+
import org.opensearch.action.ActionRequest;
19+
import org.opensearch.action.ActionRequestValidationException;
20+
import org.opensearch.common.io.stream.StreamInput;
21+
import org.opensearch.common.io.stream.StreamOutput;
22+
23+
/**
24+
* Ip2Geo datasource get request
25+
*/
26+
@Getter
27+
@Setter
28+
@Log4j2
29+
@EqualsAndHashCode
30+
public class GetDatasourceRequest extends ActionRequest {
31+
/**
32+
* @param names the datasource names
33+
* @return the datasource names
34+
*/
35+
private String[] names;
36+
37+
/**
38+
* Constructs a new get datasource request with a list of datasources.
39+
*
40+
* If the list of datasources is empty or it contains a single element "_all", all registered datasources
41+
* are returned.
42+
*
43+
* @param names list of datasource names
44+
*/
45+
public GetDatasourceRequest(final String[] names) {
46+
this.names = names;
47+
}
48+
49+
/**
50+
* Constructor with stream input
51+
* @param in the stream input
52+
* @throws IOException IOException
53+
*/
54+
public GetDatasourceRequest(final StreamInput in) throws IOException {
55+
super(in);
56+
this.names = in.readStringArray();
57+
}
58+
59+
@Override
60+
public ActionRequestValidationException validate() {
61+
return null;
62+
}
63+
64+
@Override
65+
public void writeTo(final StreamOutput out) throws IOException {
66+
super.writeTo(out);
67+
out.writeStringArray(names);
68+
}
69+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.geospatial.ip2geo.action;
10+
11+
import java.io.IOException;
12+
import java.time.Instant;
13+
import java.util.List;
14+
15+
import lombok.EqualsAndHashCode;
16+
import lombok.Getter;
17+
import lombok.Setter;
18+
import lombok.extern.log4j.Log4j2;
19+
20+
import org.opensearch.action.ActionResponse;
21+
import org.opensearch.common.io.stream.StreamInput;
22+
import org.opensearch.common.io.stream.StreamOutput;
23+
import org.opensearch.core.xcontent.ToXContentObject;
24+
import org.opensearch.core.xcontent.XContentBuilder;
25+
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
26+
27+
/**
28+
* Ip2Geo datasource get request
29+
*/
30+
@Getter
31+
@Setter
32+
@Log4j2
33+
@EqualsAndHashCode
34+
public class GetDatasourceResponse extends ActionResponse implements ToXContentObject {
35+
private static final String FIELD_NAME_DATASOURCES = "datasources";
36+
private static final String FIELD_NAME_NAME = "name";
37+
private static final String FIELD_NAME_STATE = "state";
38+
private static final String FIELD_NAME_ENDPOINT = "endpoint";
39+
private static final String FIELD_NAME_UPDATE_INTERVAL = "update_interval_in_days";
40+
private static final String FIELD_NAME_NEXT_UPDATE_AT = "next_update_at_in_epoch_millis";
41+
private static final String FIELD_NAME_NEXT_UPDATE_AT_READABLE = "next_update_at";
42+
private static final String FIELD_NAME_DATABASE = "database";
43+
private static final String FIELD_NAME_UPDATE_STATS = "update_stats";
44+
private List<Datasource> datasources;
45+
46+
/**
47+
* Default constructor
48+
*
49+
* @param datasources List of datasources
50+
*/
51+
public GetDatasourceResponse(final List<Datasource> datasources) {
52+
this.datasources = datasources;
53+
}
54+
55+
/**
56+
* Constructor with StreamInput
57+
*
58+
* @param in the stream input
59+
*/
60+
public GetDatasourceResponse(final StreamInput in) throws IOException {
61+
datasources = in.readList(Datasource::new);
62+
}
63+
64+
@Override
65+
public void writeTo(final StreamOutput out) throws IOException {
66+
out.writeList(datasources);
67+
}
68+
69+
@Override
70+
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
71+
builder.startObject();
72+
builder.startArray(FIELD_NAME_DATASOURCES);
73+
for (Datasource datasource : datasources) {
74+
builder.startObject();
75+
builder.field(FIELD_NAME_NAME, datasource.getName());
76+
builder.field(FIELD_NAME_STATE, datasource.getState());
77+
builder.field(FIELD_NAME_ENDPOINT, datasource.getEndpoint());
78+
builder.field(FIELD_NAME_UPDATE_INTERVAL, datasource.getSchedule().getInterval());
79+
builder.timeField(
80+
FIELD_NAME_NEXT_UPDATE_AT,
81+
FIELD_NAME_NEXT_UPDATE_AT_READABLE,
82+
datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
83+
);
84+
builder.field(FIELD_NAME_DATABASE, datasource.getDatabase());
85+
builder.field(FIELD_NAME_UPDATE_STATS, datasource.getUpdateStats());
86+
builder.endObject();
87+
}
88+
builder.endArray();
89+
builder.endObject();
90+
return builder;
91+
}
92+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.geospatial.ip2geo.action;
10+
11+
import java.util.List;
12+
13+
import lombok.extern.log4j.Log4j2;
14+
15+
import org.opensearch.action.ActionListener;
16+
import org.opensearch.action.support.ActionFilters;
17+
import org.opensearch.action.support.HandledTransportAction;
18+
import org.opensearch.common.inject.Inject;
19+
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
20+
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
21+
import org.opensearch.tasks.Task;
22+
import org.opensearch.transport.TransportService;
23+
24+
/**
25+
* Transport action to get datasource
26+
*/
27+
@Log4j2
28+
public class GetDatasourceTransportAction extends HandledTransportAction<GetDatasourceRequest, GetDatasourceResponse> {
29+
private final DatasourceFacade datasourceFacade;
30+
31+
/**
32+
* Default constructor
33+
* @param transportService the transport service
34+
* @param actionFilters the action filters
35+
* @param datasourceFacade the datasource facade
36+
*/
37+
@Inject
38+
public GetDatasourceTransportAction(
39+
final TransportService transportService,
40+
final ActionFilters actionFilters,
41+
final DatasourceFacade datasourceFacade
42+
) {
43+
super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new);
44+
this.datasourceFacade = datasourceFacade;
45+
}
46+
47+
@Override
48+
protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener<GetDatasourceResponse> listener) {
49+
if (request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0]))) {
50+
// We don't expect too many data sources. Therefore, querying all data sources without pagination should be fine.
51+
datasourceFacade.getAllDatasources(new ActionListener<>() {
52+
@Override
53+
public void onResponse(final List<Datasource> datasources) {
54+
listener.onResponse(new GetDatasourceResponse(datasources));
55+
}
56+
57+
@Override
58+
public void onFailure(final Exception e) {
59+
listener.onFailure(e);
60+
}
61+
});
62+
} else {
63+
datasourceFacade.getDatasources(request.getNames(), new ActionListener<>() {
64+
@Override
65+
public void onResponse(final List<Datasource> datasources) {
66+
listener.onResponse(new GetDatasourceResponse(datasources));
67+
}
68+
69+
@Override
70+
public void onFailure(final Exception e) {
71+
listener.onFailure(e);
72+
}
73+
});
74+
}
75+
}
76+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.geospatial.ip2geo.action;
10+
11+
import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER;
12+
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix;
13+
import static org.opensearch.rest.RestRequest.Method.GET;
14+
15+
import java.util.List;
16+
17+
import org.opensearch.client.node.NodeClient;
18+
import org.opensearch.common.Strings;
19+
import org.opensearch.rest.BaseRestHandler;
20+
import org.opensearch.rest.RestRequest;
21+
import org.opensearch.rest.action.RestToXContentListener;
22+
23+
/**
24+
* Rest handler for Ip2Geo datasource get request
25+
*/
26+
public class RestGetDatasourceHandler extends BaseRestHandler {
27+
private static final String ACTION_NAME = "ip2geo_datasource_get";
28+
29+
@Override
30+
public String getName() {
31+
return ACTION_NAME;
32+
}
33+
34+
@Override
35+
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
36+
final String[] names = request.paramAsStringArray("name", Strings.EMPTY_ARRAY);
37+
final GetDatasourceRequest getDatasourceRequest = new GetDatasourceRequest(names);
38+
39+
return channel -> client.executeLocally(GetDatasourceAction.INSTANCE, getDatasourceRequest, new RestToXContentListener<>(channel));
40+
}
41+
42+
@Override
43+
public List<Route> routes() {
44+
String path1 = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource");
45+
String path2 = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}");
46+
return List.of(new Route(GET, path1), new Route(GET, path2));
47+
}
48+
}

src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*
4040
*/
4141
public class RestPutDatasourceHandler extends BaseRestHandler {
42-
private static final String ACTION_NAME = "ip2geo_datasource";
42+
private static final String ACTION_NAME = "ip2geo_datasource_put";
4343
private final ClusterSettings clusterSettings;
4444

4545
public RestPutDatasourceHandler(final ClusterSettings clusterSettings) {
@@ -53,7 +53,7 @@ public String getName() {
5353

5454
@Override
5555
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
56-
final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("id"));
56+
final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name"));
5757
if (request.hasContentOrSourceParam()) {
5858
try (XContentParser parser = request.contentOrSourceParamParser()) {
5959
PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null);
@@ -70,7 +70,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
7070

7171
@Override
7272
public List<Route> routes() {
73-
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{id}");
73+
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}");
7474
return List.of(new Route(PUT, path));
7575
}
7676
}

0 commit comments

Comments
 (0)