Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Oct 11, 2024
1 parent 6622931 commit 03d4c47
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,43 +446,38 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Unit = {
val isAutoRefreshChanged = updatedOptions.autoRefresh() != originalOptions.autoRefresh()
val isSchedulerModeChanged =
updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()

// Prevent changing both auto_refresh and scheduler_mode simultaneously
if (isAutoRefreshChanged && isSchedulerModeChanged) {
throw new IllegalArgumentException(
"Cannot change both auto_refresh and scheduler_mode simultaneously")
}

val changedOptions = updatedOptions.options.filterNot { case (k, v) =>
originalOptions.options.get(k).contains(v)
}.keySet

if (changedOptions.isEmpty) {
throw new IllegalArgumentException("No options updated")
throw new IllegalArgumentException("No index option updated")
}

// Validate based on auto_refresh state and changes
(isAutoRefreshChanged, updatedOptions.autoRefresh()) match {
case (true, true) =>
// Changing from manual to auto refresh
if (updatedOptions.incrementalRefresh()) {
throw new IllegalArgumentException(
"Altering index to auto refresh while incremental refresh remains true")
}

val allowedOptions = Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
SCHEDULER_MODE,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
validateChangedOptions(changedOptions, allowedOptions, "Changing to auto refresh")

validateChangedOptions(changedOptions, allowedOptions, s"Altering index to auto refresh")
case (true, false) =>
val allowedOptions = if (updatedOptions.incrementalRefresh()) {
// Changing from auto refresh to incremental refresh
Set(
AUTO_REFRESH,
INCREMENTAL_REFRESH,
SCHEDULER_MODE,
REFRESH_INTERVAL,
CHECKPOINT_LOCATION,
WATERMARK_DELAY)
Expand All @@ -493,11 +488,14 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
validateChangedOptions(
changedOptions,
allowedOptions,
"Changing to full/incremental refresh")
"Altering index to full/incremental refresh")

case (false, true) =>
// original refresh_mode is auto, only allow changing scheduler_mode
validateChangedOptions(changedOptions, Set(SCHEDULER_MODE), "Auto refresh remains true")
validateChangedOptions(
changedOptions,
Set(SCHEDULER_MODE),
"Altering index when auto_refresh remains true")

case (false, false) =>
// original refresh_mode is full/incremental, not allowed to change any options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ object FlintSparkIndexOptions {
.externalSchedulerIntervalThreshold())
case (false, _, Some("external")) =>
throw new IllegalArgumentException(
"External scheduler mode spark conf is not enabled but refresh interval is set to external scheduler mode")
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")
case _ =>
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.opensearch.index.reindex.DeleteByQueryRequest
import org.scalatest.matchers.must.Matchers._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {

/** Test table and index name */
Expand All @@ -32,6 +34,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
// Delete all test indices
deleteTestIndex(testIndex)
sql(s"DROP TABLE $testTable")
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
}

test("update index with index options successfully") {
Expand Down Expand Up @@ -177,6 +180,121 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
}
}

// Test update options validation failure with external scheduler
Seq(
(
"update index without changing index option",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "true")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("checkpoint_location" -> "s3a://test/")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"))),
"No index option updated"),
(
"update index option when auto_refresh is false",
Seq(
(
Map.empty[String, String],
Map("auto_refresh" -> "false", "checkpoint_location" -> "s3a://test/")),
(
Map.empty[String, String],
Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/")),
(Map.empty[String, String], Map("checkpoint_location" -> "s3a://test/"))),
"No options can be updated when auto_refresh remains false"),
(
"update other index option besides scheduler_mode when auto_refresh is true",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("watermark_delay" -> "1 Minute"))),
"Altering index when auto_refresh remains true only allows changing: Set(scheduler_mode). Invalid options"),
(
"convert to full refresh with disallowed options",
Seq(
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "false", "scheduler_mode" -> "internal")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "false", "refresh_interval" -> "5 Minute")),
(
Map("auto_refresh" -> "true", "checkpoint_location" -> "s3a://test/"),
Map("auto_refresh" -> "false", "watermark_delay" -> "1 Minute"))),
"Altering index to full/incremental refresh only allows changing"),
(
"convert to auto refresh with disallowed options",
Seq(
(
Map.empty[String, String],
Map(
"auto_refresh" -> "true",
"output_mode" -> "complete",
"checkpoint_location" -> "s3a://test/"))),
"Altering index to auto refresh only allows changing: Set(auto_refresh, watermark_delay, scheduler_mode, " +
"refresh_interval, incremental_refresh, checkpoint_location). Invalid options: Set(output_mode)"),
(
"convert to invalid refresh mode",
Seq(
(
Map.empty[String, String],
Map(
"auto_refresh" -> "true",
"incremental_refresh" -> "true",
"checkpoint_location" -> "s3a://test/"))),
"Altering index to auto refresh while incremental refresh remains true"))
.foreach { case (testName, testCases, expectedErrorMessage) =>
test(s"should fail if $testName and external scheduler enabled") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")
testCases.foreach { case (initialOptionsMap, updateOptionsMap) =>
logInfo(s"initialOptionsMap: ${initialOptionsMap}")
logInfo(s"updateOptionsMap: ${updateOptionsMap}")

withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
initialOptionsMap
.get("checkpoint_location")
.map(_ =>
initialOptionsMap
.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(initialOptionsMap)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val index = flint.describeIndex(testIndex).get
val exception = the[IllegalArgumentException] thrownBy {
val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
index,
FlintSparkIndexOptions(
updateOptionsMap
.get("checkpoint_location")
.map(_ =>
updateOptionsMap
.updated("checkpoint_location", checkpointDir.getAbsolutePath))
.getOrElse(updateOptionsMap)))
flint.updateIndex(updatedIndex)
}

exception.getMessage should include(expectedErrorMessage)

deleteTestIndex(testIndex)
}
}
}
}

// Test update options validation success
Seq(
(
Expand Down

0 comments on commit 03d4c47

Please sign in to comment.