Skip to content

Commit

Permalink
Add scroll search based OpenSearchReader (#1633)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored May 18, 2023
1 parent 84ccf88 commit 614d27a
Show file tree
Hide file tree
Showing 29 changed files with 544 additions and 473 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/flint-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ jobs:
distribution: 'temurin'
java-version: 11

- name: Build with Sbt
run: sbt test
- name: Integ Test
working-directory: ./flint
run: sbt integtest/test
7 changes: 3 additions & 4 deletions flint/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ lazy val flintCore = (project in file("flint-core"))
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion)
)
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion))

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
Expand All @@ -70,7 +69,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
name := "flint-spark-integration",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.scalactic" %% "scalactic" % "3.2.15",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand Down Expand Up @@ -103,6 +101,7 @@ lazy val integtest = (project in file("integ-test"))
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"),
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test"),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath += (flintSparkIntegration / assembly).value)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.core;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintReader;

/**
* Flint index client that provides API for metadata and data operations
Expand Down Expand Up @@ -36,4 +37,13 @@ public interface FlintClient {
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all
* @return {@link FlintReader}.
*/
FlintReader createReader(String indexName, String query);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import org.opensearch.flint.core.storage.FlintOpenSearchClient;

/**
* {@link FlintClient} builder.
*/
public class FlintClientBuilder {

public static FlintClient build(FlintOptions options) {
return new FlintOpenSearchClient(options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import java.io.Serializable;
import java.util.Map;

/**
* Flint Options include all the flint related configuration.
*/
public class FlintOptions implements Serializable {

private final Map<String, String> options;

public static final String HOST = "host";
public static final String PORT = "port";
/**
* Used by {@link org.opensearch.flint.core.storage.OpenSearchScrollReader}
*/
public static final String SCROLL_SIZE = "scroll_size";
public static final int DEFAULT_SCROLL_SIZE = 100;

public FlintOptions(Map<String, String> options) {
this.options = options;
}

public String getHost() {
return options.getOrDefault(HOST, "localhost");
}

public int getPort() {
return Integer.parseInt(options.getOrDefault(PORT, "9200"));
}

public int getScrollSize() {
return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.flint.core.storage;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.http.HttpHost;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
Expand All @@ -15,24 +17,46 @@
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;

import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;

/**
* Flint client implementation for OpenSearch storage.
*/
public class FlintOpenSearchClient implements FlintClient {

/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string.
*/
private final static NamedXContentRegistry
xContentRegistry =
new NamedXContentRegistry(new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents());

/** OpenSearch host name. */
private final String host;

/** OpenSearch port number. */
private final int port;

public FlintOpenSearchClient(String host, int port) {
this.host = host;
this.port = port;
private final FlintOptions options;

public FlintOpenSearchClient(FlintOptions options) {
this.host = options.getHost();
this.port = options.getPort();
this.options = options;
}

@Override
Expand Down Expand Up @@ -71,6 +95,26 @@ public FlintMetadata getIndexMetadata(String indexName) {
}
}

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all.
* @return {@link FlintReader}.
*/
@Override public FlintReader createReader(String indexName, String query) {
try {
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
if (!Strings.isNullOrEmpty(query)) {
XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query);
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(), indexName, new SearchSourceBuilder().query(queryBuilder), options);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private RestHighLevelClient createClient() {
return new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, "http")));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

/**
* Flint Reader Interface
*/
public interface FlintReader {

/**
* true if next doc exist.
*/
boolean hasNext();

/**
* Return next doc in String.
*/
String next();

/**
* close.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.search.SearchHit;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
* Abstract OpenSearch Reader.
*/
public abstract class OpenSearchReader implements FlintReader {

/** Search request source builder. */
private final SearchRequest searchRequest;

protected final RestHighLevelClient client;

/**
* iterator of one-shot search result.
*/
private Iterator<SearchHit> iterator = null;

public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest) {
this.client = client;
this.searchRequest = searchRequest;
}

@Override public boolean hasNext() {
try {
if (iterator == null || !iterator.hasNext()) {
SearchResponse response = search(searchRequest);
List<SearchHit> searchHits = Arrays.asList(response.getHits().getHits());
iterator = searchHits.iterator();
}
return iterator.hasNext();
} catch (IOException e) {
// todo. log error.
throw new RuntimeException(e);
}
}

@Override public String next() {
return iterator.next().getSourceAsString();
}

@Override public void close() {
try {
clean();
} catch (IOException e) {
// todo. log error.
} finally {
if (client != null) {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

/**
* search.
*/
abstract SearchResponse search(SearchRequest request) throws IOException;

/**
* clean.
*/
abstract void clean() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;

/**
* {@link OpenSearchReader} using scroll search. https://opensearch.org/docs/latest/api-reference/scroll/
*/
public class OpenSearchScrollReader extends OpenSearchReader {

/** Default scroll context timeout in minutes. */
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L);

private final FlintOptions options;

private String scrollId = null;

public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize())));
this.options = options;
}

/**
* search.
*/
SearchResponse search(SearchRequest request) throws IOException {
if (Strings.isNullOrEmpty(scrollId)) {
// add scroll timeout making the request as scroll search request.
request.scroll(DEFAULT_SCROLL_TIMEOUT);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
scrollId = response.getScrollId();
return response;
} else {
return client.scroll(new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId), RequestOptions.DEFAULT);
}
}

/**
* clean the scroll context.
*/
void clean() throws IOException {
if (Strings.isNullOrEmpty(scrollId)) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
}
}
Loading

0 comments on commit 614d27a

Please sign in to comment.