From ea1fe540ea3c3d731c426d071f520f58594f31d7 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Thu, 31 Oct 2024 16:06:41 -0700 Subject: [PATCH] Extract source table names from mv query (#854) * add sourceTables to MV index metadata properties Signed-off-by: Sean Kao * parse source tables from mv query Signed-off-by: Sean Kao * test cases for parse source tables from mv query Signed-off-by: Sean Kao * use constant for metadata cache version Signed-off-by: Sean Kao * write source tables to metadata cache Signed-off-by: Sean Kao * address comment Signed-off-by: Sean Kao * generate source tables for old mv without new prop Signed-off-by: Sean Kao * syntax fix Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../opensearch/flint/spark/FlintSpark.scala | 6 +- .../flint/spark/FlintSparkIndexBuilder.scala | 2 +- .../flint/spark/FlintSparkIndexFactory.scala | 37 ++++- .../spark/FlintSparkValidationHelper.scala | 11 +- .../metadatacache/FlintMetadataCache.scala | 17 ++- .../spark/mv/FlintSparkMaterializedView.scala | 27 +++- .../spark/FlintSparkIndexFactorySuite.scala | 50 +++++++ .../FlintMetadataCacheSuite.scala | 89 ++++++++++-- .../mv/FlintSparkMaterializedViewSuite.scala | 43 +++--- .../FlintSparkMaterializedViewITSuite.scala | 48 ++++++- ...OpenSearchMetadataCacheWriterITSuite.scala | 136 ++++++++++++------ 11 files changed, 369 insertions(+), 97 deletions(-) create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexFactorySuite.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index b4412a3d4..360bba720 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -183,7 +183,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w attachLatestLogEntry(indexName, metadata) } .toList - .flatMap(FlintSparkIndexFactory.create) + .flatMap(metadata => FlintSparkIndexFactory.create(spark, metadata)) } else { Seq.empty } @@ -202,7 +202,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w if (flintClient.exists(indexName)) { val metadata = flintIndexMetadataService.getIndexMetadata(indexName) val metadataWithEntry = attachLatestLogEntry(indexName, metadata) - FlintSparkIndexFactory.create(metadataWithEntry) + FlintSparkIndexFactory.create(spark, metadataWithEntry) } else { Option.empty } @@ -327,7 +327,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w val index = describeIndex(indexName) if (index.exists(_.options.autoRefresh())) { - val updatedIndex = FlintSparkIndexFactory.createWithDefaultOptions(index.get).get + val updatedIndex = FlintSparkIndexFactory.createWithDefaultOptions(spark, index.get).get FlintSparkIndexRefresh .create(updatedIndex.name(), updatedIndex) .validate(spark) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 0391741cf..2ff2883a9 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -92,7 +92,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { val updatedMetadata = index .metadata() .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) - validateIndex(FlintSparkIndexFactory.create(updatedMetadata).get) + validateIndex(FlintSparkIndexFactory.create(flint.spark, updatedMetadata).get) } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 78636d992..ca659550d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -25,6 +25,7 @@ import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession /** * Flint Spark index factory that encapsulates specific Flint index instance creation. This is for @@ -35,14 +36,16 @@ object FlintSparkIndexFactory extends Logging { /** * Creates Flint index from generic Flint metadata. * + * @param spark + * Spark session * @param metadata * Flint metadata * @return * Flint index instance, or None if any error during creation */ - def create(metadata: FlintMetadata): Option[FlintSparkIndex] = { + def create(spark: SparkSession, metadata: FlintMetadata): Option[FlintSparkIndex] = { try { - Some(doCreate(metadata)) + Some(doCreate(spark, metadata)) } catch { case e: Exception => logWarning(s"Failed to create Flint index from metadata $metadata", e) @@ -53,24 +56,26 @@ object FlintSparkIndexFactory extends Logging { /** * Creates Flint index with default options. * + * @param spark + * Spark session * @param index * Flint index - * @param metadata - * Flint metadata * @return * Flint index with default options */ - def createWithDefaultOptions(index: FlintSparkIndex): Option[FlintSparkIndex] = { + def createWithDefaultOptions( + spark: SparkSession, + index: FlintSparkIndex): Option[FlintSparkIndex] = { val originalOptions = index.options val updatedOptions = FlintSparkIndexOptions.updateOptionsWithDefaults(index.name(), originalOptions) val updatedMetadata = index .metadata() .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) - this.create(updatedMetadata) + this.create(spark, updatedMetadata) } - private def doCreate(metadata: FlintMetadata): FlintSparkIndex = { + private def doCreate(spark: SparkSession, metadata: FlintMetadata): FlintSparkIndex = { val indexOptions = FlintSparkIndexOptions( metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) val latestLogEntry = metadata.latestLogEntry @@ -118,6 +123,7 @@ object FlintSparkIndexFactory extends Logging { FlintSparkMaterializedView( metadata.name, metadata.source, + getMvSourceTables(spark, metadata), metadata.indexedColumns.map { colInfo => getString(colInfo, "columnName") -> getString(colInfo, "columnType") }.toMap, @@ -134,6 +140,15 @@ object FlintSparkIndexFactory extends Logging { .toMap } + private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = { + val sourceTables = getArrayString(metadata.properties, "sourceTables") + if (sourceTables.isEmpty) { + FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source) + } else { + sourceTables + } + } + private def getString(map: java.util.Map[String, AnyRef], key: String): String = { map.get(key).asInstanceOf[String] } @@ -146,4 +161,12 @@ object FlintSparkIndexFactory extends Logging { Some(value.asInstanceOf[String]) } } + + private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = { + map.get(key) match { + case list: java.util.ArrayList[_] => + list.toArray.map(_.toString) + case _ => Array.empty[String] + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index 1aaa85075..7e9922655 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -11,9 +11,8 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} +import org.apache.spark.sql.flint.{loadTable, parseTableName} /** * Flint Spark validation helper. @@ -31,16 +30,10 @@ trait FlintSparkValidationHelper extends Logging { * true if all non Hive, otherwise false */ def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = { - // Extract source table name (possibly more than one for MV query) val tableNames = index match { case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName) case covering: FlintSparkCoveringIndex => Seq(covering.tableName) - case mv: FlintSparkMaterializedView => - spark.sessionState.sqlParser - .parsePlan(mv.query) - .collect { case relation: UnresolvedRelation => - qualifyTableName(spark, relation.tableName) - } + case mv: FlintSparkMaterializedView => mv.sourceTables.toSeq } // Validate if any source table is not supported (currently Hive only) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala index e8a91e1be..e1c0f318c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala @@ -10,6 +10,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark.FlintSparkIndexOptions +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser /** @@ -46,9 +47,7 @@ case class FlintMetadataCache( object FlintMetadataCache { - // TODO: constant for version - val mockTableName = - "dataSourceName.default.logGroups(logGroupIdentifier:['arn:aws:logs:us-east-1:123456:test-llt-xa', 'arn:aws:logs:us-east-1:123456:sample-lg-1'])" + val metadataCacheVersion = "1.0" def apply(metadata: FlintMetadata): FlintMetadataCache = { val indexOptions = FlintSparkIndexOptions( @@ -61,6 +60,15 @@ object FlintMetadataCache { } else { None } + val sourceTables = metadata.kind match { + case MV_INDEX_TYPE => + metadata.properties.get("sourceTables") match { + case list: java.util.ArrayList[_] => + list.toArray.map(_.toString) + case _ => Array.empty[String] + } + case _ => Array(metadata.source) + } val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry => entry.lastRefreshCompleteTime match { case FlintMetadataLogEntry.EMPTY_TIMESTAMP => None @@ -68,7 +76,6 @@ object FlintMetadataCache { } } - // TODO: get source tables from metadata - FlintMetadataCache("1.0", refreshInterval, Array(mockTableName), lastRefreshTime) + FlintMetadataCache(metadataCacheVersion, refreshInterval, sourceTables, lastRefreshTime) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index caa75be75..aecfc99df 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * MV name * @param query * source query that generates MV data + * @param sourceTables + * source table names * @param outputSchema * output schema * @param options @@ -44,6 +46,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap case class FlintSparkMaterializedView( mvName: String, query: String, + sourceTables: Array[String], outputSchema: Map[String, String], override val options: FlintSparkIndexOptions = empty, override val latestLogEntry: Option[FlintMetadataLogEntry] = None) @@ -64,6 +67,7 @@ case class FlintSparkMaterializedView( metadataBuilder(this) .name(mvName) .source(query) + .addProperty("sourceTables", sourceTables) .indexedColumns(indexColumnMaps) .schema(schema) .build() @@ -165,10 +169,30 @@ object FlintSparkMaterializedView { flintIndexNamePrefix(mvName) } + /** + * Extract source table names (possibly more than one) from the query. + * + * @param spark + * Spark session + * @param query + * source query that generates MV data + * @return + * source table names + */ + def extractSourceTableNames(spark: SparkSession, query: String): Array[String] = { + spark.sessionState.sqlParser + .parsePlan(query) + .collect { case relation: UnresolvedRelation => + qualifyTableName(spark, relation.tableName) + } + .toArray + } + /** Builder class for MV build */ class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { private var mvName: String = "" private var query: String = "" + private var sourceTables: Array[String] = Array.empty[String] /** * Set MV name. @@ -193,6 +217,7 @@ object FlintSparkMaterializedView { */ def query(query: String): Builder = { this.query = query + this.sourceTables = extractSourceTableNames(flint.spark, query) this } @@ -221,7 +246,7 @@ object FlintSparkMaterializedView { field.name -> field.dataType.simpleString } .toMap - FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions) + FlintSparkMaterializedView(mvName, query, sourceTables, outputSchema, indexOptions) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexFactorySuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexFactorySuite.scala new file mode 100644 index 000000000..07720ff24 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexFactorySuite.scala @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.scalatest.matchers.should.Matchers._ + +import org.apache.spark.FlintSuite + +class FlintSparkIndexFactorySuite extends FlintSuite { + + test("create mv should generate source tables if missing in metadata") { + val testTable = "spark_catalog.default.mv_build_test" + val testMvName = "spark_catalog.default.mv" + val testQuery = s"SELECT * FROM $testTable" + + val content = + s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "indexedColumns": [ + | { + | "columnType": "int", + | "columnName": "age" + | } + | ], + | "name": "$testMvName", + | "source": "$testQuery" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + + val metadata = FlintOpenSearchIndexMetadataService.deserialize(content) + val index = FlintSparkIndexFactory.create(spark, metadata) + index shouldBe defined + index.get + .asInstanceOf[FlintSparkMaterializedView] + .sourceTables should contain theSameElementsAs Array(testTable) + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala index c6d2cf12a..6ec6cf696 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala @@ -7,6 +7,9 @@ package org.opensearch.flint.spark.metadatacache import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -21,11 +24,12 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { "", Map.empty[String, Any]) - it should "construct from FlintMetadata" in { + it should "construct from skipping index FlintMetadata" in { val content = - """ { + s""" { | "_meta": { - | "kind": "test_kind", + | "kind": "$SKIPPING_INDEX_TYPE", + | "source": "spark_catalog.default.test_table", | "options": { | "auto_refresh": "true", | "refresh_interval": "10 Minutes" @@ -43,18 +47,85 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { .copy(latestLogEntry = Some(flintMetadataLogEntry)) val metadataCache = FlintMetadataCache(metadata) - metadataCache.metadataCacheVersion shouldBe "1.0" + metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion metadataCache.refreshInterval.get shouldBe 600 - metadataCache.sourceTables shouldBe Array(FlintMetadataCache.mockTableName) + metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table") + metadataCache.lastRefreshTime.get shouldBe 1234567890123L + } + + it should "construct from covering index FlintMetadata" in { + val content = + s""" { + | "_meta": { + | "kind": "$COVERING_INDEX_TYPE", + | "source": "spark_catalog.default.test_table", + | "options": { + | "auto_refresh": "true", + | "refresh_interval": "10 Minutes" + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + + val metadataCache = FlintMetadataCache(metadata) + metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion + metadataCache.refreshInterval.get shouldBe 600 + metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table") + metadataCache.lastRefreshTime.get shouldBe 1234567890123L + } + + it should "construct from materialized view FlintMetadata" in { + val content = + s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "source": "spark_catalog.default.wrong_table", + | "options": { + | "auto_refresh": "true", + | "refresh_interval": "10 Minutes" + | }, + | "properties": { + | "sourceTables": [ + | "spark_catalog.default.test_table", + | "spark_catalog.default.another_table" + | ] + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + + val metadataCache = FlintMetadataCache(metadata) + metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion + metadataCache.refreshInterval.get shouldBe 600 + metadataCache.sourceTables shouldBe Array( + "spark_catalog.default.test_table", + "spark_catalog.default.another_table") metadataCache.lastRefreshTime.get shouldBe 1234567890123L } it should "construct from FlintMetadata excluding invalid fields" in { // Set auto_refresh = false and lastRefreshCompleteTime = 0 val content = - """ { + s""" { | "_meta": { - | "kind": "test_kind", + | "kind": "$SKIPPING_INDEX_TYPE", + | "source": "spark_catalog.default.test_table", | "options": { | "refresh_interval": "10 Minutes" | } @@ -71,9 +142,9 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { .copy(latestLogEntry = Some(flintMetadataLogEntry.copy(lastRefreshCompleteTime = 0L))) val metadataCache = FlintMetadataCache(metadata) - metadataCache.metadataCacheVersion shouldBe "1.0" + metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion metadataCache.refreshInterval shouldBe empty - metadataCache.sourceTables shouldBe Array(FlintMetadataCache.mockTableName) + metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table") metadataCache.lastRefreshTime shouldBe empty } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c1df42883..1c9a9e83c 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -10,7 +10,7 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv import org.opensearch.flint.spark.FlintSparkIndexOptions import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedViewSuite.{streamingRelation, StreamingDslLogicalPlan} -import org.scalatest.matchers.should.Matchers.{contain, convertToAnyShouldWrapper, the} +import org.scalatest.matchers.should.Matchers._ import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite @@ -37,31 +37,34 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testQuery = "SELECT 1" test("get mv name") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(testMvName, testQuery, Array.empty, Map.empty) mv.name() shouldBe "flint_spark_catalog_default_mv" } test("get mv name with dots") { val testMvNameDots = "spark_catalog.default.mv.2023.10" - val mv = FlintSparkMaterializedView(testMvNameDots, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(testMvNameDots, testQuery, Array.empty, Map.empty) mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10" } test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("mv", testQuery, Map.empty).name() + FlintSparkMaterializedView("mv", testQuery, Array.empty, Map.empty).name() the[IllegalArgumentException] thrownBy - FlintSparkMaterializedView("default.mv", testQuery, Map.empty).name() + FlintSparkMaterializedView("default.mv", testQuery, Array.empty, Map.empty).name() } test("get metadata") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map("test_col" -> "integer")) + val mv = + FlintSparkMaterializedView(testMvName, testQuery, Array.empty, Map("test_col" -> "integer")) val metadata = mv.metadata() metadata.name shouldBe mv.mvName metadata.kind shouldBe MV_INDEX_TYPE metadata.source shouldBe "SELECT 1" + metadata.properties should contain key "sourceTables" + metadata.properties.get("sourceTables").asInstanceOf[Array[String]] should have size 0 metadata.indexedColumns shouldBe Array( Map("columnName" -> "test_col", "columnType" -> "integer").asJava) metadata.schema shouldBe Map("test_col" -> Map("type" -> "integer").asJava).asJava @@ -74,6 +77,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val mv = FlintSparkMaterializedView( testMvName, testQuery, + Array.empty, Map("test_col" -> "integer"), indexOptions) @@ -83,12 +87,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build batch data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(testMvName, testQuery, Array.empty, Map.empty) mv.build(spark, None).collect() shouldBe Array(Row(1)) } test("should fail if build given other source data frame") { - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView(testMvName, testQuery, Array.empty, Map.empty) the[IllegalArgumentException] thrownBy mv.build(spark, Some(mock[DataFrame])) } @@ -103,7 +107,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { |""".stripMargin val options = Map("watermark_delay" -> "30 Seconds") - withAggregateMaterializedView(testQuery, options) { actualPlan => + withAggregateMaterializedView(testQuery, Array(testTable), options) { actualPlan => comparePlans( actualPlan, streamingRelation(testTable) @@ -128,7 +132,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { |""".stripMargin val options = Map("watermark_delay" -> "30 Seconds") - withAggregateMaterializedView(testQuery, options) { actualPlan => + withAggregateMaterializedView(testQuery, Array(testTable), options) { actualPlan => comparePlans( actualPlan, streamingRelation(testTable) @@ -144,7 +148,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { test("build stream with non-aggregate query") { val testQuery = s"SELECT name, age FROM $testTable WHERE age > 30" - withAggregateMaterializedView(testQuery, Map.empty) { actualPlan => + withAggregateMaterializedView(testQuery, Array(testTable), Map.empty) { actualPlan => comparePlans( actualPlan, streamingRelation(testTable) @@ -158,7 +162,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testQuery = s"SELECT name, age FROM $testTable" val options = Map("extra_options" -> s"""{"$testTable": {"maxFilesPerTrigger": "1"}}""") - withAggregateMaterializedView(testQuery, options) { actualPlan => + withAggregateMaterializedView(testQuery, Array(testTable), options) { actualPlan => comparePlans( actualPlan, streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")) @@ -175,6 +179,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val mv = FlintSparkMaterializedView( testMvName, s"SELECT name, COUNT(*) AS count FROM $testTable GROUP BY name", + Array(testTable), Map.empty) the[IllegalStateException] thrownBy @@ -182,14 +187,20 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } } - private def withAggregateMaterializedView(query: String, options: Map[String, String])( - codeBlock: LogicalPlan => Unit): Unit = { + private def withAggregateMaterializedView( + query: String, + sourceTables: Array[String], + options: Map[String, String])(codeBlock: LogicalPlan => Unit): Unit = { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = - FlintSparkMaterializedView(testMvName, query, Map.empty, FlintSparkIndexOptions(options)) + FlintSparkMaterializedView( + testMvName, + query, + sourceTables, + Map.empty, + FlintSparkIndexOptions(options)) val actualPlan = mv.buildStream(spark).queryExecution.logical codeBlock(actualPlan) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 14d41c2bb..fc77faaea 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -17,9 +17,10 @@ import org.opensearch.flint.common.FlintVersion.current import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTableNames, getFlintIndexName} import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler -import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.sql.{DataFrame, Row} @@ -51,6 +52,29 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { deleteTestIndex(testFlintIndex) } + test("extract source table names from materialized view source query successfully") { + val testComplexQuery = s""" + | SELECT * + | FROM ( + | SELECT 1 + | FROM table1 + | LEFT JOIN `table2` + | ) + | UNION ALL + | SELECT 1 + | FROM spark_catalog.default.`table/3` + | INNER JOIN spark_catalog.default.`table.4` + |""".stripMargin + extractSourceTableNames(flint.spark, testComplexQuery) should contain theSameElementsAs + Array( + "spark_catalog.default.table1", + "spark_catalog.default.table2", + "spark_catalog.default.`table/3`", + "spark_catalog.default.`table.4`") + + extractSourceTableNames(flint.spark, "SELECT 1") should have size 0 + } + test("create materialized view with metadata successfully") { withTempDir { checkpointDir => val indexOptions = @@ -91,7 +115,9 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | "scheduler_mode":"internal" | }, | "latestId": "$testLatestId", - | "properties": {} + | "properties": { + | "sourceTables": ["$testTable"] + | } | }, | "properties": { | "startTime": { @@ -107,6 +133,22 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } + test("create materialized view should parse source tables successfully") { + val indexOptions = FlintSparkIndexOptions(Map.empty) + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .options(indexOptions, testFlintIndex) + .create() + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + index.get + .asInstanceOf[FlintSparkMaterializedView] + .sourceTables should contain theSameElementsAs Array(testTable) + } + test("create materialized view with default checkpoint location successfully") { withTempDir { checkpointDir => setFlintSparkConf( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala index c04209f06..c0d253fd3 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala @@ -17,7 +17,9 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.scalatest.Entry import org.scalatest.matchers.should.Matchers @@ -78,9 +80,9 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat | { | "_meta": { | "version": "${current()}", - | "name": "${testFlintIndex}", + | "name": "$testFlintIndex", | "kind": "test_kind", - | "source": "test_source_table", + | "source": "$testTable", | "indexedColumns": [ | { | "test_field": "spark_type" @@ -90,12 +92,12 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat | "refresh_interval": "10 Minutes" | }, | "properties": { - | "metadataCacheVersion": "1.0", + | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", | "refreshInterval": 600, - | "sourceTables": ["${FlintMetadataCache.mockTableName}"], - | "lastRefreshTime": ${testLastRefreshCompleteTime} + | "sourceTables": ["$testTable"], + | "lastRefreshTime": $testLastRefreshCompleteTime | }, - | "latestId": "${testLatestId}" + | "latestId": "$testLatestId" | }, | "properties": { | "test_field": { @@ -107,7 +109,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val builder = new FlintMetadata.Builder builder.name(testFlintIndex) builder.kind("test_kind") - builder.source("test_source_table") + builder.source(testTable) builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) builder.options( Map("auto_refresh" -> "true", "refresh_interval" -> "10 Minutes") @@ -129,12 +131,71 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties should have size 3 - properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + properties should contain allOf (Entry( + "metadataCacheVersion", + FlintMetadataCache.metadataCacheVersion), Entry("lastRefreshTime", testLastRefreshCompleteTime)) + } + + Seq(SKIPPING_INDEX_TYPE, COVERING_INDEX_TYPE).foreach { case kind => + test(s"write metadata cache to $kind index mappings with source tables") { + val content = + s""" { + | "_meta": { + | "kind": "$kind", + | "source": "$testTable" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(testTable) + } + } + + test(s"write metadata cache to materialized view index mappings with source tables") { + val testTable2 = "spark_catalog.default.metadatacache_test2" + val content = + s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "properties": { + | "sourceTables": [ + | "$testTable", "$testTable2" + | ] + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties .get("sourceTables") .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + .toArray should contain theSameElementsAs Array(testTable, testTable2) } test("write metadata cache to index mappings with refresh interval") { @@ -162,13 +223,11 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties should have size 4 - properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + properties should contain allOf (Entry( + "metadataCacheVersion", + FlintMetadataCache.metadataCacheVersion), Entry("refreshInterval", 600), Entry("lastRefreshTime", testLastRefreshCompleteTime)) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) } test("exclude refresh interval in metadata cache when auto refresh is false") { @@ -195,12 +254,10 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties should have size 3 - properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + properties should contain allOf (Entry( + "metadataCacheVersion", + FlintMetadataCache.metadataCacheVersion), Entry("lastRefreshTime", testLastRefreshCompleteTime)) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) } test("exclude last refresh time in metadata cache when index has not been refreshed") { @@ -212,11 +269,8 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties should have size 2 - properties should contain(Entry("metadataCacheVersion", "1.0")) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + properties should contain( + Entry("metadataCacheVersion", FlintMetadataCache.metadataCacheVersion)) } test("write metadata cache to index mappings and preserve other index metadata") { @@ -246,12 +300,10 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 var properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties should have size 3 - properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + properties should contain allOf (Entry( + "metadataCacheVersion", + FlintMetadataCache.metadataCacheVersion), Entry("lastRefreshTime", testLastRefreshCompleteTime)) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) val newContent = """ { @@ -278,12 +330,10 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties should have size 3 - properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + properties should contain allOf (Entry( + "metadataCacheVersion", + FlintMetadataCache.metadataCacheVersion), Entry("lastRefreshTime", testLastRefreshCompleteTime)) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) } Seq( @@ -296,9 +346,9 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat "checkpoint_location" -> "s3a://test/"), s""" | { - | "metadataCacheVersion": "1.0", + | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", | "refreshInterval": 600, - | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | "sourceTables": ["$testTable"] | } |""".stripMargin), ( @@ -306,8 +356,8 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat Map.empty[String, String], s""" | { - | "metadataCacheVersion": "1.0", - | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", + | "sourceTables": ["$testTable"] | } |""".stripMargin), ( @@ -315,8 +365,8 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), s""" | { - | "metadataCacheVersion": "1.0", - | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", + | "sourceTables": ["$testTable"] | } |""".stripMargin)).foreach { case (refreshMode, optionsMap, expectedJson) => test(s"write metadata cache for $refreshMode") { @@ -389,9 +439,9 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat flintMetadataCacheWriter.serialize(index.get.metadata())) \ "_meta" \ "properties")) propertiesJson should matchJson(s""" | { - | "metadataCacheVersion": "1.0", + | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", | "refreshInterval": 600, - | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | "sourceTables": ["$testTable"] | } |""".stripMargin)