Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ plugins {

allprojects {
group = "io.flamingock"
version = "1.0.0-rc.1"
version = "1.0.0-rc.2"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,35 +72,98 @@ public CommunityExecutionPlanner(RunnerId instanceId,
this.configuration = coreConfiguration;
}

/**
* Gets the next execution plan using a two-phase audit read to prevent race conditions in concurrent executions.
*
* <p><b>How it works:</b></p>
*
* <ol>
* <li><b>Initial Audit Read (Optimistic)</b> - Reads the audit log without holding the lock to determine
* if any changes need execution. This avoids acquiring the lock when all changes are already executed.</li>
*
* <li><b>Early Exit Check</b> - If no changes require execution based on the initial read, returns immediately
* without acquiring the lock.</li>
*
* <li><b>Lock Acquisition</b> - Acquires the lock to ensure only one instance executes changes.
* If another instance holds the lock, this call blocks until the lock becomes available.</li>
*
* <li><b>Validated Audit Read (With Lock)</b> - Re-reads the audit log while holding the lock to get
* the authoritative state. This detects if another instance executed changes while we waited for the lock.</li>
*
* <li><b>Plan Validation</b> - Rebuilds the execution plan with the validated audit data and checks if
* execution is still needed. If another instance already executed the changes, releases the lock
* and returns without executing.</li>
*
* <li><b>Concurrent Execution Detection</b> - Logs any differences between the initial and validated plans
* to track when concurrent instances attempted to execute the same changes.</li>
*
* <li><b>Lock Refresh Daemon</b> - If enabled, starts a background daemon to periodically refresh the lock
* during long-running executions to prevent lock expiration.</li>
*
* <li><b>Execution</b> - Returns an execution plan containing the next stage to execute while holding the lock.
* The lock will be released after execution completes.</li>
* </ol>
*
* <p><b>Concurrent Execution Handling:</b></p>
* <pre>
* Instance A: Read audit → Get lock → Re-read → Execute → Release
* Instance B: Read audit → Wait for lock → Get lock → Re-read → Detect already executed → Release
* </pre>
*
* <p><b>Error Handling:</b> If any exception occurs after acquiring the lock, the lock is released
* in the catch block to prevent lock leaks.</p>
*
* @param loadedStages the list of loaded stages containing all defined changes
* @return ExecutionPlan containing either stages to execute (with lock held) or CONTINUE (no lock)
* @throws LockException if unable to acquire the distributed lock within the configured timeout
*/
@Override
public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) throws LockException {
Map<String, AuditEntry> auditSnapshot = auditReader.getAuditSnapshotByChangeId();
logger.debug("Pulled remote state:\n{}", auditSnapshot);

List<ExecutableStage> executableStages = loadedStages
.stream()
.map(loadedStage -> {
// Convert audit status to action plan using the new action-based architecture
ChangeActionMap changeActionMap = CommunityChangeActionBuilder.build(loadedStage.getTasks(), auditSnapshot);
return loadedStage.applyActions(changeActionMap);
})
.collect(Collectors.toList());
Map<String, AuditEntry> initialSnapshot = auditReader.getAuditSnapshotByChangeId();
logger.debug("Pulled initial remote state:\n{}", initialSnapshot);

Optional<ExecutableStage> nextStageOpt = executableStages.stream()
.filter(ExecutableStage::isExecutionRequired)
.findFirst();
List<ExecutableStage> initialStages = buildExecutableStages(loadedStages, initialSnapshot);

if (!hasExecutableStages(initialStages)) {
return ExecutionPlan.CONTINUE(initialStages);
}

Lock lock = acquireLock();

try {
Map<String, AuditEntry> validatedSnapshot = auditReader.getAuditSnapshotByChangeId();

List<ExecutableStage> validatedStages = buildExecutableStages(loadedStages, validatedSnapshot);

Optional<ExecutableStage> nextStageOpt = getFirstExecutableStage(validatedStages);

if (!nextStageOpt.isPresent()) {
logger.info(
"Execution plan invalidated after lock acquisition. " +
"All changes were executed by another instance during lock wait. " +
"Releasing lock and continuing."
);
lock.release();
return ExecutionPlan.CONTINUE(validatedStages);
}

logPlanChanges(initialStages, validatedStages);

if (nextStageOpt.isPresent()) {
Lock lock = acquireLock();
if (configuration.isEnableRefreshDaemon()) {
new LockRefreshDaemon(lock, TimeService.getDefault()).start();
}

String executionId = ExecutionId.getNewExecutionId();
return ExecutionPlan.newExecution(executionId, lock, Collections.singletonList(nextStageOpt.get()));
return ExecutionPlan.newExecution(
executionId,
lock,
Collections.singletonList(nextStageOpt.get())
);

} else {
return ExecutionPlan.CONTINUE(executableStages);
} catch (Exception e) {
logger.error("Error during execution planning - releasing lock", e);
lock.release();
throw e;
}
}

Expand All @@ -115,6 +178,86 @@ private Lock acquireLock() {
);
}

/**
* Builds executable stages from audit snapshot.
*
* @param loadedStages the loaded stages to process
* @param auditSnapshot the audit snapshot containing change states
* @return list of executable stages
*/
private List<ExecutableStage> buildExecutableStages(
List<AbstractLoadedStage> loadedStages,
Map<String, AuditEntry> auditSnapshot) {

return loadedStages.stream()
.map(loadedStage -> {
ChangeActionMap changeActionMap = CommunityChangeActionBuilder.build(
loadedStage.getTasks(),
auditSnapshot
);
return loadedStage.applyActions(changeActionMap);
})
.collect(Collectors.toList());
}

/**
* Checks if any stage requires execution.
*
* @param stages the list of executable stages
* @return true if at least one stage requires execution
*/
private boolean hasExecutableStages(List<ExecutableStage> stages) {
return stages.stream().anyMatch(ExecutableStage::isExecutionRequired);
}

/**
* Gets the first executable stage.
*
* @param stages the list of executable stages
* @return optional containing the first executable stage, or empty if none
*/
private Optional<ExecutableStage> getFirstExecutableStage(List<ExecutableStage> stages) {
return stages.stream()
.filter(ExecutableStage::isExecutionRequired)
.findFirst();
}

/**
* Logs differences between initial and validated plans to detect concurrent executions.
*
* @param initialStages the initially planned stages
* @param validatedStages the validated stages after lock acquisition
*/
private void logPlanChanges(List<ExecutableStage> initialStages, List<ExecutableStage> validatedStages) {
long initialCount = countExecutableTasks(initialStages);
long validatedCount = countExecutableTasks(validatedStages);

if (initialCount != validatedCount) {
logger.warn(
"Execution plan changed during lock acquisition: {} -> {} executable tasks. " +
"This indicates concurrent execution - {} tasks were executed by another instance.",
initialCount,
validatedCount,
initialCount - validatedCount
);
} else {
logger.debug("Execution plan validated after lock acquisition: {} executable tasks", validatedCount);
}
}

/**
* Counts the number of executable tasks across all stages.
*
* @param stages the list of stages
* @return total count of executable tasks
*/
private long countExecutableTasks(List<ExecutableStage> stages) {
return stages.stream()
.filter(ExecutableStage::isExecutionRequired)
.mapToLong(stage -> stage.getTasks().size())
.sum();
}


public static class Builder {
private RunnerId runnerId;
Expand Down
Loading