Skip to content

Commit

Permalink
Resolution for previous concurrency issue (#802)
Browse files Browse the repository at this point in the history
* Resolution for previous concurrency issue

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* resolve comments

Signed-off-by: Louis Chu <clingzhi@amazon.com>

---------

Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger authored Oct 24, 2024
1 parent 34fdcab commit 54f4fa7
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val internalSchedulingService =
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
val externalSchedulingService =
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ object FlintSparkIndexOptions {
.externalSchedulerIntervalThreshold())
case (false, _, Some("external")) =>
throw new IllegalArgumentException(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")
case _ =>
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
*/
class FlintSparkJobInternalSchedulingService(
spark: SparkSession,
flintSparkConf: FlintSparkConf,
flintIndexMonitor: FlintSparkIndexMonitor)
extends FlintSparkJobSchedulingService
with Logging {
Expand All @@ -55,12 +56,9 @@ class FlintSparkJobInternalSchedulingService(
index: FlintSparkIndex,
action: AsyncQuerySchedulerAction): Option[String] = {
val indexName = index.name()

action match {
case AsyncQuerySchedulerAction.SCHEDULE => None // No-op
case AsyncQuerySchedulerAction.UPDATE =>
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
startRefreshingJob(index)
case AsyncQuerySchedulerAction.UNSCHEDULE =>
logInfo("Stopping index state monitor")
Expand All @@ -81,7 +79,17 @@ class FlintSparkJobInternalSchedulingService(
private def startRefreshingJob(index: FlintSparkIndex): Option[String] = {
logInfo(s"Starting refreshing job for index ${index.name()}")
val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index)
indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava))
val jobId = indexRefresh.start(spark, flintSparkConf)

// NOTE: Resolution for previous concurrency issue
// This code addresses a previously identified concurrency issue with recoverIndex
// where scheduled FlintSparkIndexMonitorTask couldn't detect the active Spark streaming job ID. The issue
// was caused by starting the FlintSparkIndexMonitor before the Spark streaming job was fully
// initialized. In this fixed version, we start the monitor after the streaming job has been
// initiated, ensuring that the job ID is available for detection.
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(index.name())
jobId
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService {
if (isExternalSchedulerEnabled(index)) {
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)
} else {
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class FlintSparkIndexBuilderSuite
None,
None,
Some(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")),
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")),
(
"set external mode when interval above threshold and no mode specified",
true,
Expand Down

0 comments on commit 54f4fa7

Please sign in to comment.