From 614d27a114fe289b31abf675b748baae45537187 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 18 May 2023 15:44:57 -0700 Subject: [PATCH] Add scroll search based OpenSearchReader (#1633) Signed-off-by: Peng Huo --- .../flint-test-and-build-workflow.yml | 5 +- flint/build.sbt | 7 +- .../opensearch/flint/core/FlintClient.java | 10 +++ .../flint/core/FlintClientBuilder.java | 18 +++++ .../opensearch/flint/core/FlintOptions.java | 41 ++++++++++ .../core/storage/FlintOpenSearchClient.java | 50 +++++++++++- .../flint/core/storage/FlintReader.java | 27 +++++++ .../flint/core/storage/OpenSearchReader.java | 81 +++++++++++++++++++ .../core/storage/OpenSearchScrollReader.java | 63 +++++++++++++++ .../org/opensearch/io/OpenSearchOptions.java | 35 -------- .../org/opensearch/io/OpenSearchReader.java | 76 ----------------- ...pache.spark.sql.sources.DataSourceRegister | 2 +- .../spark/sql/flint/FlintDataSourceV2.scala | 58 +++++++++++++ .../FlintPartitionReader.scala} | 23 ++---- .../FlintPartitionReaderFactory.scala} | 13 ++- .../FlintScan.scala} | 16 ++-- .../FlintScanBuilder.scala} | 16 ++-- .../FlintTable.scala} | 22 +++-- .../sql/v2/OpenSearchJsonDataSourceV2.scala | 50 ------------ .../scala/org/apache/spark/FirstITSuit.scala | 31 ------- .../org/apache/spark/OpenSearchIndex.scala | 62 -------------- .../org/apache/spark/OpenSearchSuite.scala | 56 ------------- .../opensearch/io/OpenSearchOptionsTest.scala | 56 ------------- .../testcontainers/OpenSearchContainer.java | 0 .../spark/FlintDataSourceV2ITSuite.scala | 56 +++++++++++++ .../org/apache/spark/OpenSearchITSuite.scala | 43 ---------- .../opensearch/flint/OpenSearchSuite.scala | 71 ++++++++++++++++ .../core/FlintOpenSearchClientSuite.scala | 25 ++++-- flint/project/Dependencies.scala | 4 +- 29 files changed, 544 insertions(+), 473 deletions(-) create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintReader.java create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java delete mode 100644 flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchOptions.java delete mode 100644 flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchReader.java create mode 100644 flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintDataSourceV2.scala rename flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/{v2/OpenSearchPartitionReader.scala => flint/FlintPartitionReader.scala} (75%) rename flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/{v2/OpenSearchPartitionReaderFactory.scala => flint/FlintPartitionReaderFactory.scala} (56%) rename flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/{v2/OpenSearchScan.scala => flint/FlintScan.scala} (62%) rename flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/{v2/OpenSearchScanBuilder.scala => flint/FlintScanBuilder.scala} (58%) rename flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/{v2/OpenSearchTable.scala => flint/FlintTable.scala} (69%) delete mode 100644 flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchJsonDataSourceV2.scala delete mode 100644 flint/flint-spark-integration/src/test/scala/org/apache/spark/FirstITSuit.scala delete mode 100644 flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchIndex.scala delete mode 100644 flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchSuite.scala delete mode 100644 flint/flint-spark-integration/src/test/scala/org/opensearch/io/OpenSearchOptionsTest.scala rename flint/{flint-spark-integration => integ-test}/src/test/java/org/opensearch/testcontainers/OpenSearchContainer.java (100%) create mode 100644 flint/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala delete mode 100644 flint/integ-test/src/test/scala/org/apache/spark/OpenSearchITSuite.scala diff --git a/.github/workflows/flint-test-and-build-workflow.yml b/.github/workflows/flint-test-and-build-workflow.yml index 066b5bcfbb..7314ff9ad6 100644 --- a/.github/workflows/flint-test-and-build-workflow.yml +++ b/.github/workflows/flint-test-and-build-workflow.yml @@ -26,5 +26,6 @@ jobs: distribution: 'temurin' java-version: 11 - - name: Build with Sbt - run: sbt test + - name: Integ Test + working-directory: ./flint + run: sbt integtest/test diff --git a/flint/build.sbt b/flint/build.sbt index ca4546e2e3..244e438697 100644 --- a/flint/build.sbt +++ b/flint/build.sbt @@ -59,8 +59,7 @@ lazy val flintCore = (project in file("flint-core")) scalaVersion := scala212, libraryDependencies ++= Seq( "org.opensearch.client" % "opensearch-rest-client" % opensearchVersion, - "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion) - ) + "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion)) lazy val flintSparkIntegration = (project in file("flint-spark-integration")) .dependsOn(flintCore) @@ -70,7 +69,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) name := "flint-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( - "org.testcontainers" % "testcontainers" % "1.18.0" % "test", "org.scalactic" %% "scalactic" % "3.2.15", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", @@ -103,6 +101,7 @@ lazy val integtest = (project in file("integ-test")) libraryDependencies ++= Seq( "org.scalactic" %% "scalactic" % "3.2.15", "org.scalatest" %% "scalatest" % "3.2.15" % "test", - "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"), + "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", + "org.testcontainers" % "testcontainers" % "1.18.0" % "test"), libraryDependencies ++= deps(sparkVersion), Test / fullClasspath += (flintSparkIntegration / assembly).value) diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 0b780d0de9..537bfb0d22 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -6,6 +6,7 @@ package org.opensearch.flint.core; import org.opensearch.flint.core.metadata.FlintMetadata; +import org.opensearch.flint.core.storage.FlintReader; /** * Flint index client that provides API for metadata and data operations @@ -36,4 +37,13 @@ public interface FlintClient { * @return index metadata */ FlintMetadata getIndexMetadata(String indexName); + + /** + * Create {@link FlintReader}. + * + * @param indexName index name. + * @param query DSL query. DSL query is null means match_all + * @return {@link FlintReader}. + */ + FlintReader createReader(String indexName, String query); } diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java new file mode 100644 index 0000000000..a0372a86f3 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClientBuilder.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core; + +import org.opensearch.flint.core.storage.FlintOpenSearchClient; + +/** + * {@link FlintClient} builder. + */ +public class FlintClientBuilder { + + public static FlintClient build(FlintOptions options) { + return new FlintOpenSearchClient(options); + } +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java new file mode 100644 index 0000000000..6a275a5b83 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core; + +import java.io.Serializable; +import java.util.Map; + +/** + * Flint Options include all the flint related configuration. + */ +public class FlintOptions implements Serializable { + + private final Map options; + + public static final String HOST = "host"; + public static final String PORT = "port"; + /** + * Used by {@link org.opensearch.flint.core.storage.OpenSearchScrollReader} + */ + public static final String SCROLL_SIZE = "scroll_size"; + public static final int DEFAULT_SCROLL_SIZE = 100; + + public FlintOptions(Map options) { + this.options = options; + } + + public String getHost() { + return options.getOrDefault(HOST, "localhost"); + } + + public int getPort() { + return Integer.parseInt(options.getOrDefault(PORT, "9200")); + } + + public int getScrollSize() { + return Integer.parseInt(options.getOrDefault(SCROLL_SIZE, String.valueOf(DEFAULT_SCROLL_SIZE))); + } +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index ab366d36d5..dc4a6bda4c 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -6,6 +6,8 @@ package org.opensearch.flint.core.storage; import java.io.IOException; +import java.util.ArrayList; + import org.apache.http.HttpHost; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; @@ -15,24 +17,46 @@ import org.opensearch.client.indices.GetMappingsRequest; import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.Strings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.metadata.FlintMetadata; +import org.opensearch.index.query.AbstractQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.search.SearchModule; +import org.opensearch.search.builder.SearchSourceBuilder; + +import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; /** * Flint client implementation for OpenSearch storage. */ public class FlintOpenSearchClient implements FlintClient { + /** + * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} from DSL query string. + */ + private final static NamedXContentRegistry + xContentRegistry = + new NamedXContentRegistry(new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents()); + /** OpenSearch host name. */ private final String host; /** OpenSearch port number. */ private final int port; - public FlintOpenSearchClient(String host, int port) { - this.host = host; - this.port = port; + private final FlintOptions options; + + public FlintOpenSearchClient(FlintOptions options) { + this.host = options.getHost(); + this.port = options.getPort(); + this.options = options; } @Override @@ -71,6 +95,26 @@ public FlintMetadata getIndexMetadata(String indexName) { } } + /** + * Create {@link FlintReader}. + * + * @param indexName index name. + * @param query DSL query. DSL query is null means match_all. + * @return {@link FlintReader}. + */ + @Override public FlintReader createReader(String indexName, String query) { + try { + QueryBuilder queryBuilder = new MatchAllQueryBuilder(); + if (!Strings.isNullOrEmpty(query)) { + XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry, IGNORE_DEPRECATIONS, query); + queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); + } + return new OpenSearchScrollReader(createClient(), indexName, new SearchSourceBuilder().query(queryBuilder), options); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private RestHighLevelClient createClient() { return new RestHighLevelClient( RestClient.builder(new HttpHost(host, port, "http"))); diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintReader.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintReader.java new file mode 100644 index 0000000000..5c90eee89c --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintReader.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +/** + * Flint Reader Interface + */ +public interface FlintReader { + + /** + * true if next doc exist. + */ + boolean hasNext(); + + /** + * Return next doc in String. + */ + String next(); + + /** + * close. + */ + void close(); +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java new file mode 100644 index 0000000000..96ec74bc2a --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.search.SearchHit; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Abstract OpenSearch Reader. + */ +public abstract class OpenSearchReader implements FlintReader { + + /** Search request source builder. */ + private final SearchRequest searchRequest; + + protected final RestHighLevelClient client; + + /** + * iterator of one-shot search result. + */ + private Iterator iterator = null; + + public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest) { + this.client = client; + this.searchRequest = searchRequest; + } + + @Override public boolean hasNext() { + try { + if (iterator == null || !iterator.hasNext()) { + SearchResponse response = search(searchRequest); + List searchHits = Arrays.asList(response.getHits().getHits()); + iterator = searchHits.iterator(); + } + return iterator.hasNext(); + } catch (IOException e) { + // todo. log error. + throw new RuntimeException(e); + } + } + + @Override public String next() { + return iterator.next().getSourceAsString(); + } + + @Override public void close() { + try { + clean(); + } catch (IOException e) { + // todo. log error. + } finally { + if (client != null) { + try { + client.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * search. + */ + abstract SearchResponse search(SearchRequest request) throws IOException; + + /** + * clean. + */ + abstract void clean() throws IOException; +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java new file mode 100644 index 0000000000..489bb5d179 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.common.Strings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; + +/** + * {@link OpenSearchReader} using scroll search. https://opensearch.org/docs/latest/api-reference/scroll/ + */ +public class OpenSearchScrollReader extends OpenSearchReader { + + /** Default scroll context timeout in minutes. */ + public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5L); + + private final FlintOptions options; + + private String scrollId = null; + + public OpenSearchScrollReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) { + super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize()))); + this.options = options; + } + + /** + * search. + */ + SearchResponse search(SearchRequest request) throws IOException { + if (Strings.isNullOrEmpty(scrollId)) { + // add scroll timeout making the request as scroll search request. + request.scroll(DEFAULT_SCROLL_TIMEOUT); + SearchResponse response = client.search(request, RequestOptions.DEFAULT); + scrollId = response.getScrollId(); + return response; + } else { + return client.scroll(new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId), RequestOptions.DEFAULT); + } + } + + /** + * clean the scroll context. + */ + void clean() throws IOException { + if (Strings.isNullOrEmpty(scrollId)) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + } + } +} diff --git a/flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchOptions.java b/flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchOptions.java deleted file mode 100644 index da710d1643..0000000000 --- a/flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchOptions.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.io; - -import java.util.Map; - -public class OpenSearchOptions { - - public static final String INDEX_NAME = "index"; - - public static final String HOST = "host"; - - public static final String PORT = "port"; - - private final Map options; - - public OpenSearchOptions(Map options) { - this.options = options; - } - - public String getIndexName() { - return options.get(INDEX_NAME); - } - - public String getHost() { - return options.get(HOST); - } - - public int getPort() { - return Integer.parseInt(options.get(PORT)); - } -} diff --git a/flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchReader.java b/flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchReader.java deleted file mode 100644 index 4db65b3067..0000000000 --- a/flint/flint-spark-integration/src/main/java/org/opensearch/io/OpenSearchReader.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.io; - -import org.apache.http.HttpHost; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestClient; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.search.SearchHit; -import org.opensearch.search.builder.SearchSourceBuilder; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; - -/** OpenSearch Reader. Todo. add scroll support. */ -public class OpenSearchReader { - - private final String indexName; - - private final OpenSearchOptions options; - - private RestHighLevelClient client; - - private SearchSourceBuilder sourceBuilder; - - private Iterator iterator = null; - - public OpenSearchReader(String indexName, OpenSearchOptions options) { - this.indexName = indexName; - this.options = options; - sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.from(0); - sourceBuilder.size(10); - } - - public void open() { - client = new RestHighLevelClient(RestClient.builder(new HttpHost(options.getHost(), options.getPort(), "http"))); - } - - public boolean hasNext() { - try { - if (iterator == null) { - SearchResponse - response = - client.search(new SearchRequest().indices(indexName).source(sourceBuilder), RequestOptions.DEFAULT); - iterator = Arrays.asList(response.getHits().getHits()).iterator(); - } - return iterator.hasNext(); - } catch (IOException e) { - // todo, log.error - throw new RuntimeException(e); - } - } - - /** Return each hit doc. */ - public String next() { - return iterator.next().getSourceAsString(); - } - - public void close() { - try { - if (client != null) { - client.close(); - } - } catch (Exception e) { - // todo, log.error - throw new RuntimeException(e); - } - } -} diff --git a/flint/flint-spark-integration/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/flint/flint-spark-integration/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index f878911ce9..3aac35fc88 100644 --- a/flint/flint-spark-integration/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/flint/flint-spark-integration/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -3,4 +3,4 @@ # SPDX-License-Identifier: Apache-2.0 # -org.apache.spark.sql.v2.OpenSearchJsonDataSourceV2 +org.apache.spark.sql.flint.FlintDataSourceV2 diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintDataSourceV2.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintDataSourceV2.scala new file mode 100644 index 0000000000..db2d6740e6 --- /dev/null +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintDataSourceV2.scala @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.sql.flint + +import java.util +import java.util.NoSuchElementException + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class FlintDataSourceV2 extends TableProvider with DataSourceRegister { + + private var table: FlintTable = null + + override def inferSchema(options: CaseInsensitiveStringMap): StructType = { + if (table == null) { + table = getFlintTable(Option.empty, options.asCaseSensitiveMap()) + } + table.schema + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + if (table == null) { + getFlintTable(Some(schema), properties) + } else { + table + } + } + + protected def getTableName(properties: util.Map[String, String]): String = { + if (properties.containsKey("path")) properties.get("path") + else if (properties.containsKey("index")) properties.get("index") + else throw new NoSuchElementException("index or path not found") + } + + protected def getFlintTable( + schema: Option[StructType], + properties: util.Map[String, String]): FlintTable = { + FlintTable(getTableName(properties), SparkSession.active, schema) + } + + /** + * format name. for instance, `sql.read.format("flint")` + */ + override def shortName(): String = "flint" + + override def supportsExternalMetadata(): Boolean = true +} diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchPartitionReader.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReader.scala similarity index 75% rename from flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchPartitionReader.scala rename to flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReader.scala index 33575f3fe1..af2a7187d1 100644 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchPartitionReader.scala +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReader.scala @@ -3,14 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.apache.spark.sql.v2 +package org.apache.spark.sql.flint import java.util.TimeZone -import scala.collection.JavaConverters._ - import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.opensearch.io.{OpenSearchOptions, OpenSearchReader} +import org.opensearch.flint.core.storage.FlintReader import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptionsInRead} @@ -26,10 +24,7 @@ import org.apache.spark.unsafe.types.UTF8String * @param schema * schema */ -class OpenSearchPartitionReader( - tableName: String, - schema: StructType, - options: Map[String, String]) +class FlintPartitionReader(reader: FlintReader, schema: StructType) extends PartitionReader[InternalRow] { lazy val parser = new JacksonParser( @@ -47,12 +42,6 @@ class OpenSearchPartitionReader( schema, parser.options.columnNameOfCorruptRecord) - lazy val openSearchReader = { - val reader = new OpenSearchReader(tableName, new OpenSearchOptions(options.asJava)) - reader.open() - reader - } - var rows: Iterator[InternalRow] = Iterator.empty /** @@ -62,8 +51,8 @@ class OpenSearchPartitionReader( override def next: Boolean = { if (rows.hasNext) { true - } else if (openSearchReader.hasNext) { - rows = safeParser.parse(openSearchReader.next()) + } else if (reader.hasNext) { + rows = safeParser.parse(reader.next()) rows.hasNext } else { false @@ -75,6 +64,6 @@ class OpenSearchPartitionReader( } override def close(): Unit = { - openSearchReader.close() + reader.close() } } diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchPartitionReaderFactory.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala similarity index 56% rename from flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchPartitionReaderFactory.scala rename to flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala index ab6202f96c..6220b35b91 100644 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchPartitionReaderFactory.scala +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionReaderFactory.scala @@ -3,18 +3,23 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.apache.spark.sql.v2 +package org.apache.spark.sql.flint + +import java.util + +import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.types.StructType -case class OpenSearchPartitionReaderFactory( +case class FlintPartitionReaderFactory( tableName: String, schema: StructType, - options: Map[String, String]) + properties: util.Map[String, String]) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - new OpenSearchPartitionReader(tableName, schema, options) + val flintClient = FlintClientBuilder.build(new FlintOptions(properties)) + new FlintPartitionReader(flintClient.createReader(tableName, ""), schema) } } diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchScan.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala similarity index 62% rename from flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchScan.scala rename to flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala index 09e5fb790f..4772029c9c 100644 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchScan.scala +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScan.scala @@ -3,18 +3,14 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.apache.spark.sql.v2 +package org.apache.spark.sql.flint -import scala.collection.JavaConverters._ +import java.util import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap -case class OpenSearchScan( - tableName: String, - schema: StructType, - options: CaseInsensitiveStringMap) +case class FlintScan(tableName: String, schema: StructType, properties: util.Map[String, String]) extends Scan with Batch { @@ -25,13 +21,11 @@ case class OpenSearchScan( } override def createReaderFactory(): PartitionReaderFactory = { - OpenSearchPartitionReaderFactory( - tableName, - schema, - options.asCaseSensitiveMap().asScala.toMap) + FlintPartitionReaderFactory(tableName, schema, properties) } override def toBatch: Batch = this } +// todo. add partition support. private[spark] case class OpenSearchInputPartition() extends InputPartition {} diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchScanBuilder.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala similarity index 58% rename from flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchScanBuilder.scala rename to flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala index 48827ff36e..ec69cbb176 100644 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchScanBuilder.scala +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintScanBuilder.scala @@ -3,20 +3,24 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.apache.spark.sql.v2 +package org.apache.spark.sql.flint +import java.util + +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{Scan, ScanBuilder} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap -case class OpenSearchScanBuilder( +case class FlintScanBuilder( tableName: String, sparkSession: SparkSession, schema: StructType, - options: CaseInsensitiveStringMap) - extends ScanBuilder { + properties: util.Map[String, String]) + extends ScanBuilder + with Logging { + override def build(): Scan = { - OpenSearchScan(tableName, schema, options) + FlintScan(tableName, schema, properties) } } diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchTable.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala similarity index 69% rename from flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchTable.scala rename to flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala index 6b8f49ac28..0b762687fd 100644 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchTable.scala +++ b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintTable.scala @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.apache.spark.sql.v2 +package org.apache.spark.sql.flint import java.util @@ -16,29 +16,35 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * OpenSearchTable represent an index in OpenSearch. - * @param tableName + * @param name * OpenSearch index name. * @param sparkSession * sparkSession - * @param options - * options. * @param userSpecifiedSchema * userSpecifiedSchema */ -case class OpenSearchTable( +case class FlintTable( name: String, sparkSession: SparkSession, - options: CaseInsensitiveStringMap, userSpecifiedSchema: Option[StructType]) extends Table with SupportsRead { - override def schema(): StructType = userSpecifiedSchema.get + var schema: StructType = { + if (schema == null) { + schema = if (userSpecifiedSchema.isDefined) { + userSpecifiedSchema.get + } else { + throw new UnsupportedOperationException("infer schema not supported yet") + } + } + schema + } override def capabilities(): util.Set[TableCapability] = util.EnumSet.of(BATCH_READ) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - OpenSearchScanBuilder(name, sparkSession, schema(), options) + FlintScanBuilder(name, sparkSession, schema, options.asCaseSensitiveMap()) } } diff --git a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchJsonDataSourceV2.scala b/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchJsonDataSourceV2.scala deleted file mode 100644 index 11c3212c05..0000000000 --- a/flint/flint-spark-integration/src/main/scala/org/apache/spark/sql/v2/OpenSearchJsonDataSourceV2.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark.sql.v2 - -import java.util - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.catalog.{Table, TableProvider} -import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -class OpenSearchJsonDataSourceV2 extends TableProvider with DataSourceRegister { - - /** - * Todo. add inferSchema for read path - */ - override def inferSchema(options: CaseInsensitiveStringMap): StructType = { - throw new UnsupportedOperationException("inferSchema is not supported") - } - - override def getTable( - schema: StructType, - partitioning: Array[Transform], - properties: util.Map[String, String]): Table = { - OpenSearchTable( - getTableName(properties), - SparkSession.active, - new CaseInsensitiveStringMap(properties), - Some(schema)) - } - - /** - * format name. for instance, `sql.read.format("flint")` - */ - override def shortName(): String = "flint" - - override def supportsExternalMetadata(): Boolean = true - - /** - * get from paths property. - */ - private def getTableName(properties: util.Map[String, String]): String = { - properties.get("path") - } -} diff --git a/flint/flint-spark-integration/src/test/scala/org/apache/spark/FirstITSuit.scala b/flint/flint-spark-integration/src/test/scala/org/apache/spark/FirstITSuit.scala deleted file mode 100644 index d4407a6ab9..0000000000 --- a/flint/flint-spark-integration/src/test/scala/org/apache/spark/FirstITSuit.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.types.{StringType, StructField, StructType} - -class FirstITSuit extends QueryTest with FlintSuite with OpenSearchSuite with OpenSearchIndex { - test("basic flint read test") { - val indexName = "t0001" - withIndexName(indexName) { - simpleIndex(indexName) - val schema = StructType( - Array( - StructField("accountId", StringType, true), - StructField("eventName", StringType, true), - StructField("eventSource", StringType, true))) - val df = spark.sqlContext.read - .format("flint") - .options(openSearchOptions) - .schema(schema) - .load(indexName) - - assert(df.count() == 1) - checkAnswer(df, Row("123", "event", "source")) - } - } -} diff --git a/flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchIndex.scala b/flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchIndex.scala deleted file mode 100644 index 9b019d1d09..0000000000 --- a/flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchIndex.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark - -import org.opensearch.action.bulk.BulkRequest -import org.opensearch.action.index.IndexRequest -import org.opensearch.client.RequestOptions -import org.opensearch.client.indices.CreateIndexRequest -import org.opensearch.common.xcontent.XContentType - -/** - * Provide OpenSearch Index - */ -trait OpenSearchIndex { self: OpenSearchSuite => - - val oneNodeSetting = """{ - | "number_of_shards": "1", - | "number_of_replicas": "0" - |}""".stripMargin - - def simpleIndex(indexName: String): Unit = { - val mappings = """{ - | "properties": { - | "accountId": { - | "type": "keyword" - | }, - | "eventName": { - | "type": "keyword" - | }, - | "eventSource": { - | "type": "keyword" - | } - | } - |}""".stripMargin - val docs = Seq("""{ - | "accountId": "123", - | "eventName": "event", - | "eventSource": "source" - |}""".stripMargin) - index(indexName, oneNodeSetting, mappings, docs) - } - - def index(index: String, settings: String, mappings: String, doc: Seq[String]): Unit = { - openSearchClient.indices.create( - new CreateIndexRequest(index) - .settings(settings, XContentType.JSON) - .mapping(mappings, XContentType.JSON), - RequestOptions.DEFAULT) - - val indexRequest = doc.foldLeft(new IndexRequest(index))((req, source) => - req.source(source, XContentType.JSON)) - val response = - openSearchClient.bulk(new BulkRequest().add(indexRequest), RequestOptions.DEFAULT) - - assume( - !response.hasFailures, - s"bulk index docs to $index failed: ${response.buildFailureMessage()}") - } -} diff --git a/flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchSuite.scala b/flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchSuite.scala deleted file mode 100644 index ea6286f89f..0000000000 --- a/flint/flint-spark-integration/src/test/scala/org/apache/spark/OpenSearchSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark - -import org.apache.http.HttpHost -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest -import org.opensearch.client.{RequestOptions, RestClient, RestHighLevelClient} -import org.opensearch.testcontainers.OpenSearchContainer -import org.scalatest.{BeforeAndAfterAll, Suite} - -import org.apache.spark.util.Utils - -/** - * Test required OpenSearch domain should extend OpenSearchSuite. - */ -trait OpenSearchSuite extends BeforeAndAfterAll { - self: Suite => - - protected lazy val container = new OpenSearchContainer() - - protected lazy val openSearchPort: Int = container.port() - - protected lazy val openSearchHost: String = container.getHost - - protected lazy val openSearchClient = new RestHighLevelClient( - RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http"))) - - protected lazy val openSearchOptions = - Map("host" -> openSearchHost, "port" -> s"$openSearchPort") - - override def beforeAll(): Unit = { - container.start() - super.beforeAll() - } - - override def afterAll(): Unit = { - container.close() - super.afterAll() - } - - /** - * Delete index `indexNames` after calling `f`. - */ - protected def withIndexName(indexNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - indexNames.foreach { indexName => - openSearchClient - .indices() - .delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT) - } - } - } -} diff --git a/flint/flint-spark-integration/src/test/scala/org/opensearch/io/OpenSearchOptionsTest.scala b/flint/flint-spark-integration/src/test/scala/org/opensearch/io/OpenSearchOptionsTest.scala deleted file mode 100644 index 03849a28ba..0000000000 --- a/flint/flint-spark-integration/src/test/scala/org/opensearch/io/OpenSearchOptionsTest.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.io - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.mockito.Mockito.when -import org.scalatest.BeforeAndAfter -import org.scalatest.matchers.must.Matchers -import org.scalatestplus.mockito.MockitoSugar.mock - -import org.apache.spark.SparkFunSuite - -class OpenSearchOptionsTest extends SparkFunSuite with Matchers with BeforeAndAfter { - var options: mutable.Map[String, String] = _ - var openSearchOptions: OpenSearchOptions = _ - - before { - options = mutable.Map( - OpenSearchOptions.INDEX_NAME -> "myindex", - OpenSearchOptions.HOST -> "localhost", - OpenSearchOptions.PORT -> "9200") - openSearchOptions = new OpenSearchOptions(options.asJava) - } - - test("OpenSearchOptions should return the correct index name") { - assert(openSearchOptions.getIndexName == "myindex") - } - - test("OpenSearchOptions should return the correct host") { - assert(openSearchOptions.getHost == "localhost") - } - - test("OpenSearchOptions should return the correct port") { - assert(openSearchOptions.getPort == 9200) - } - - test("OpenSearchOptions should throw a NumberFormatException when port number is invalid") { - options.put(OpenSearchOptions.PORT, "invalid-port-number") - openSearchOptions = new OpenSearchOptions(options.asJava) - assertThrows[NumberFormatException] { - openSearchOptions.getPort - } - } - - test("OpenSearchOptions should return the mocked port number") { - val mockedOptions = mock[mutable.Map[String, String]] - when(mockedOptions.get(OpenSearchOptions.PORT)).thenReturn(Some("9300")) - openSearchOptions = new OpenSearchOptions(mockedOptions.asJava) - assert(openSearchOptions.getPort == 9300) - } -} diff --git a/flint/flint-spark-integration/src/test/java/org/opensearch/testcontainers/OpenSearchContainer.java b/flint/integ-test/src/test/java/org/opensearch/testcontainers/OpenSearchContainer.java similarity index 100% rename from flint/flint-spark-integration/src/test/java/org/opensearch/testcontainers/OpenSearchContainer.java rename to flint/integ-test/src/test/java/org/opensearch/testcontainers/OpenSearchContainer.java diff --git a/flint/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala b/flint/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala new file mode 100644 index 0000000000..ce0461487a --- /dev/null +++ b/flint/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark + +import org.opensearch.flint.OpenSearchSuite + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.functions.asc +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + +/** + * OpenSearch related integration test. + */ +class FlintDataSourceV2ITSuite extends QueryTest with FlintSuite with OpenSearchSuite { + + test("create dataframe successfully from flint datasource") { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val schema = StructType( + Seq( + StructField("accountId", StringType, true), + StructField("eventName", StringType, true), + StructField("eventSource", StringType, true))) + val df = spark.sqlContext.read + .format("flint") + .options(openSearchOptions) + .schema(schema) + .load(indexName) + + assert(df.count() == 1) + checkAnswer(df, Row("123", "event", "source")) + } + } + + test("scroll api test, force scroll_size = 1") { + val indexName = "t0002" + withIndexName(indexName) { + multipleDocIndex(indexName, 5) + val schema = StructType(Seq(StructField("id", IntegerType, true))) + + val df = spark.sqlContext.read + .format("flint") + .options(openSearchOptions + ("scroll_size" -> "1")) + .schema(schema) + .load(indexName) + .sort(asc("id")) + + assert(df.count() == 5) + checkAnswer(df, (1 to 5).map(i => Row(i))) + } + } +} diff --git a/flint/integ-test/src/test/scala/org/apache/spark/OpenSearchITSuite.scala b/flint/integ-test/src/test/scala/org/apache/spark/OpenSearchITSuite.scala deleted file mode 100644 index b913be1e89..0000000000 --- a/flint/integ-test/src/test/scala/org/apache/spark/OpenSearchITSuite.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.apache.spark - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.types.{StringType, StructField, StructType} - -/** - * OpenSearch related integration test. - */ -class OpenSearchITSuite - extends QueryTest - with FlintSuite - with OpenSearchSuite - with OpenSearchIndex { - - /** - * Todo. could be deleted in future. for demo purpose now. - */ - test("basic flint read test") { - val indexName = "t0001" - withIndexName(indexName) { - simpleIndex(indexName) - - val schema = StructType( - Array( - StructField("accountId", StringType, true), - StructField("eventName", StringType, true), - StructField("eventSource", StringType, true))) - val df = spark.sqlContext.read - .format("flint") - .options(openSearchOptions) - .schema(schema) - .load(indexName) - - assert(df.count() == 1) - checkAnswer(df, Row("123", "event", "source")) - } - } -} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala index f0e16ef562..1f9170c645 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -7,7 +7,12 @@ package org.opensearch.flint import org.apache.http.HttpHost import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.client.{RequestOptions, RestClient, RestHighLevelClient} +import org.opensearch.client.indices.{CreateIndexRequest, GetIndexRequest} +import org.opensearch.common.xcontent.XContentType import org.opensearch.testcontainers.OpenSearchContainer import org.scalatest.{BeforeAndAfterAll, Suite} @@ -53,4 +58,70 @@ trait OpenSearchSuite extends BeforeAndAfterAll { } } } + + val oneNodeSetting = """{ + | "number_of_shards": "1", + | "number_of_replicas": "0" + |}""".stripMargin + + def simpleIndex(indexName: String): Unit = { + val mappings = """{ + | "properties": { + | "accountId": { + | "type": "keyword" + | }, + | "eventName": { + | "type": "keyword" + | }, + | "eventSource": { + | "type": "keyword" + | } + | } + |}""".stripMargin + val docs = Seq("""{ + | "accountId": "123", + | "eventName": "event", + | "eventSource": "source" + |}""".stripMargin) + index(indexName, oneNodeSetting, mappings, docs) + } + + def multipleDocIndex(indexName: String, N: Int): Unit = { + val mappings = """{ + | "properties": { + | "id": { + | "type": "integer" + | } + | } + |}""".stripMargin + + val docs = for (n <- 1 to N) yield s"""{"id": $n}""".stripMargin + index(indexName, oneNodeSetting, mappings, docs) + } + + def index(index: String, settings: String, mappings: String, docs: Seq[String]): Unit = { + openSearchClient.indices.create( + new CreateIndexRequest(index) + .settings(settings, XContentType.JSON) + .mapping(mappings, XContentType.JSON), + RequestOptions.DEFAULT) + + val getIndexResponse = + openSearchClient.indices().get(new GetIndexRequest(index), RequestOptions.DEFAULT) + assume(getIndexResponse.getIndices.contains(index), s"create index $index failed") + + /** + * 1. Wait until refresh the index. + */ + val request = new BulkRequest().setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) + for (doc <- docs) { + request.add(new IndexRequest(index).source(doc, XContentType.JSON)) + } + val response = + openSearchClient.bulk(request, RequestOptions.DEFAULT) + + assume( + !response.hasFailures, + s"bulk index docs to $index failed: ${response.buildFailureMessage()}") + } } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index b890b7d629..dbd488edee 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.core +import scala.collection.JavaConverters._ + import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata @@ -12,13 +14,10 @@ import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -class FlintOpenSearchClientSuite - extends AnyFlatSpec - with OpenSearchSuite - with Matchers { +class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers { /** Lazy initialize after container started. */ - lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort) + lazy val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) behavior of "Flint OpenSearch client" @@ -39,10 +38,24 @@ class FlintOpenSearchClientSuite flintClient.createIndex(indexName, new FlintMetadata(content)) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).getContent should matchJson (content) + flintClient.getIndexMetadata(indexName).getContent should matchJson(content) } it should "return false if index not exist" in { flintClient.exists("non-exist-index") shouldBe false } + + it should "read docs from index as string successfully " in { + val indexName = "t0001" + withIndexName(indexName) { + simpleIndex(indexName) + val match_all = null + val reader = flintClient.createReader(indexName, match_all) + + reader.hasNext shouldBe true + reader.next shouldBe """{"accountId":"123","eventName":"event","eventSource":"source"}""" + reader.hasNext shouldBe false + reader.close() + } + } } diff --git a/flint/project/Dependencies.scala b/flint/project/Dependencies.scala index 5243d266ef..208b55c1c5 100644 --- a/flint/project/Dependencies.scala +++ b/flint/project/Dependencies.scala @@ -8,8 +8,8 @@ import sbt._ object Dependencies { def deps(sparkVersion: String): Seq[ModuleID] = { Seq( - "org.apache.spark" %% "spark-core" % sparkVersion % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion % "provided" withSources (), + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" withSources (), "org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests")