|
12 | 12 | use Illuminate\Support\Facades\DB; |
13 | 13 | use Illuminate\Support\Facades\Log; |
14 | 14 | use Illuminate\Support\Facades\Schema; |
| 15 | +use Illuminate\Support\Str; |
15 | 16 | use PDOException; |
16 | 17 | use ProcessMaker\Facades\WorkflowManager; |
17 | 18 | use ProcessMaker\Jobs\StartEventConditional; |
@@ -39,6 +40,17 @@ class TaskSchedulerManager implements JobManagerInterface, EventBusInterface |
39 | 40 |
|
40 | 41 | protected $registerStartEvents = false; |
41 | 42 |
|
| 43 | + /** |
| 44 | + * Timeout in minutes for stale claimed tasks. |
| 45 | + * If a task has been claimed for longer than this, it will be released. |
| 46 | + */ |
| 47 | + private int $claimTimeoutMinutes; |
| 48 | + |
| 49 | + public function __construct() |
| 50 | + { |
| 51 | + $this->claimTimeoutMinutes = (int) config('app.scheduler.claim_timeout_minutes', 5); |
| 52 | + } |
| 53 | + |
42 | 54 | /** |
43 | 55 | * Removes from the process_request_lock table all locks that are active more |
44 | 56 | * time that the threshold configured with BPMN_ACTIONS_MAX_LOCK_TIME env. variable |
@@ -133,89 +145,191 @@ private function scheduleTask( |
133 | 145 | } |
134 | 146 |
|
135 | 147 | /** |
136 | | - * Checks the schedule_tasks table to execute jobs |
| 148 | + * Checks the schedule_tasks table to execute jobs. |
| 149 | + * Uses atomic claim per task to prevent duplicate executions while maintaining |
| 150 | + * the original selection logic (nextDate calculation). |
137 | 151 | */ |
138 | 152 | public function scheduleTasks() |
139 | 153 | { |
140 | 154 | $today = $this->today(); |
| 155 | + $todayFormatted = $today->format('Y-m-d H:i:s'); |
| 156 | + |
141 | 157 | try { |
142 | | - /** |
143 | | - * This validation is removed; the database schema should exist before |
144 | | - * any initiation of 'jobs' and 'schedule'. |
145 | | - * |
146 | | - * if (!Schema::hasTable('scheduled_tasks')) { |
147 | | - * return; |
148 | | - * } |
149 | | - */ |
150 | 158 | $this->removeExpiredLocks(); |
151 | 159 |
|
152 | | - $tasks = ScheduledTask::cursor(); |
| 160 | + // 1. Release stale claims (tasks that were claimed but never completed) |
| 161 | + $this->releaseStaleClaimedTasks(); |
| 162 | + |
| 163 | + // 2. Get candidate tasks using cursor() for memory efficiency |
| 164 | + // We filter by unclaimed tasks only, but evaluate nextDate for each |
| 165 | + $tasks = ScheduledTask::whereNull('claimed_by')->cursor(); |
153 | 166 |
|
154 | 167 | foreach ($tasks as $task) { |
155 | | - try { |
156 | | - $config = json_decode($task->configuration); |
157 | | - |
158 | | - $lastExecution = new DateTime($task->last_execution, new DateTimeZone('UTC')); |
159 | | - |
160 | | - if ($lastExecution === null) { |
161 | | - continue; |
162 | | - } |
163 | | - $owner = $task->processRequestToken ?: $task->processRequest ?: $task->process; |
164 | | - $ownerDateTime = $owner?->created_at; |
165 | | - $nextDate = $this->nextDate($today, $config, $lastExecution, $ownerDateTime); |
166 | | - |
167 | | - // if no execution date exists we go to the next task |
168 | | - if (empty($nextDate)) { |
169 | | - continue; |
170 | | - } |
171 | | - |
172 | | - // Since the task scheduler has a presition of 1 minute (crontab) |
173 | | - // the times must be rounded or trucated to the nearest HH:MM:00 before compare |
174 | | - $method = config('app.timer_events_seconds') . 'DateTime'; |
175 | | - $todayWithoutSeconds = $this->$method($today); |
176 | | - $nextDateWithoutSeconds = $this->$method($nextDate); |
177 | | - if ($nextDateWithoutSeconds <= $todayWithoutSeconds) { |
178 | | - switch ($task->type) { |
179 | | - case 'TIMER_START_EVENT': |
180 | | - $this->executeTimerStartEvent($task, $config); |
181 | | - $task->last_execution = $today->format('Y-m-d H:i:s'); |
182 | | - $task->save(); |
183 | | - break; |
184 | | - case 'INTERMEDIATE_TIMER_EVENT': |
185 | | - $executed = $this->executeIntermediateTimerEvent($task, $config); |
186 | | - $task->last_execution = $today->format('Y-m-d H:i:s'); |
187 | | - if ($executed) { |
188 | | - $task->save(); |
189 | | - } |
190 | | - break; |
191 | | - case 'BOUNDARY_TIMER_EVENT': |
192 | | - $executed = $this->executeBoundaryTimerEvent($task, $config); |
193 | | - $task->last_execution = $today->format('Y-m-d H:i:s'); |
194 | | - if ($executed) { |
195 | | - $task->save(); |
196 | | - } |
197 | | - break; |
198 | | - case 'SCHEDULED_JOB': |
199 | | - $this->executeScheduledJob($config); |
200 | | - $task->last_execution = $today->format('Y-m-d H:i:s'); |
201 | | - $task->save(); |
202 | | - break; |
203 | | - default: |
204 | | - throw new Exception('Unknown timer event: ' . $task->type); |
205 | | - } |
206 | | - } |
207 | | - } catch (\Throwable $ex) { |
208 | | - Log::Error('Failed Scheduled Task: ', [ |
209 | | - 'Task data' => print_r($task->getAttributes(), true), |
210 | | - 'Exception' => $ex->__toString(), |
211 | | - ]); |
212 | | - } |
| 168 | + $this->processTaskWithAtomicClaim($task, $today, $todayFormatted); |
213 | 169 | } |
214 | 170 | } catch (PDOException $e) { |
215 | 171 | Log::error('The connection to the database had problems (scheduleTasks): ' . $e->getMessage()); |
216 | 172 | } |
217 | 173 | } |
218 | 174 |
|
| 175 | + /** |
| 176 | + * Release tasks that have been claimed for too long (stale claims). |
| 177 | + * This handles cases where a process crashed after claiming tasks. |
| 178 | + */ |
| 179 | + private function releaseStaleClaimedTasks(): void |
| 180 | + { |
| 181 | + $staleThreshold = Carbon::now()->subMinutes($this->claimTimeoutMinutes); |
| 182 | + |
| 183 | + ScheduledTask::whereNotNull('claimed_by') |
| 184 | + ->where('claimed_at', '<', $staleThreshold) |
| 185 | + ->update([ |
| 186 | + 'claimed_by' => null, |
| 187 | + 'claimed_at' => null, |
| 188 | + ]); |
| 189 | + } |
| 190 | + |
| 191 | + /** |
| 192 | + * Process a task with atomic claim to prevent duplicate execution. |
| 193 | + * This maintains the original selection logic (nextDate calculation) while |
| 194 | + * adding protection against concurrent execution. |
| 195 | + * |
| 196 | + * @param ScheduledTask $task The task to evaluate and potentially execute |
| 197 | + * @param DateTime $today Current datetime |
| 198 | + * @param string $todayFormatted Formatted datetime string |
| 199 | + */ |
| 200 | + private function processTaskWithAtomicClaim(ScheduledTask $task, DateTime $today, string $todayFormatted): void |
| 201 | + { |
| 202 | + try { |
| 203 | + $config = json_decode($task->configuration); |
| 204 | + $lastExecution = new DateTime($task->last_execution, new DateTimeZone('UTC')); |
| 205 | + |
| 206 | + if ($lastExecution === null) { |
| 207 | + return; |
| 208 | + } |
| 209 | + |
| 210 | + $owner = $task->processRequestToken ?: $task->processRequest ?: $task->process; |
| 211 | + $ownerDateTime = $owner?->created_at; |
| 212 | + $nextDate = $this->nextDate($today, $config, $lastExecution, $ownerDateTime); |
| 213 | + |
| 214 | + // If no execution date exists, skip this task |
| 215 | + if (empty($nextDate)) { |
| 216 | + return; |
| 217 | + } |
| 218 | + |
| 219 | + // Since the task scheduler has a precision of 1 minute (crontab) |
| 220 | + // the times must be rounded or truncated to the nearest HH:MM:00 before compare |
| 221 | + $method = config('app.timer_events_seconds') . 'DateTime'; |
| 222 | + $todayWithoutSeconds = $this->$method($today); |
| 223 | + $nextDateWithoutSeconds = $this->$method($nextDate); |
| 224 | + |
| 225 | + // Only proceed if the task should execute now |
| 226 | + if ($nextDateWithoutSeconds > $todayWithoutSeconds) { |
| 227 | + return; |
| 228 | + } |
| 229 | + |
| 230 | + // Try to atomically claim this specific task |
| 231 | + $claimed = $this->claimTask($task->id, $todayFormatted); |
| 232 | + |
| 233 | + if (!$claimed) { |
| 234 | + // Another process already claimed this task, skip it |
| 235 | + return; |
| 236 | + } |
| 237 | + |
| 238 | + // Re-fetch the task to get fresh data after claiming |
| 239 | + $task = ScheduledTask::find($task->id); |
| 240 | + if (!$task) { |
| 241 | + return; |
| 242 | + } |
| 243 | + |
| 244 | + // Execute the task |
| 245 | + $this->executeTask($task, $config, $todayFormatted); |
| 246 | + } catch (\Throwable $ex) { |
| 247 | + Log::error('Failed Scheduled Task: ', [ |
| 248 | + 'Task data' => print_r($task->getAttributes(), true), |
| 249 | + 'Exception' => $ex->__toString(), |
| 250 | + ]); |
| 251 | + // Release task on error so it can be retried |
| 252 | + $this->releaseTask($task); |
| 253 | + } |
| 254 | + } |
| 255 | + |
| 256 | + /** |
| 257 | + * Atomically claim a single task for execution. |
| 258 | + * Uses UPDATE with WHERE to ensure only one process can claim it. |
| 259 | + * |
| 260 | + * @param int $taskId The task ID to claim |
| 261 | + * @param string $todayFormatted Current datetime formatted |
| 262 | + * @return bool True if successfully claimed, false if already claimed by another process |
| 263 | + */ |
| 264 | + private function claimTask(int $taskId, string $todayFormatted): bool |
| 265 | + { |
| 266 | + $claimId = Str::uuid()->toString(); |
| 267 | + |
| 268 | + $affected = DB::table('scheduled_tasks') |
| 269 | + ->where('id', $taskId) |
| 270 | + ->whereNull('claimed_by') |
| 271 | + ->update([ |
| 272 | + 'claimed_by' => $claimId, |
| 273 | + 'claimed_at' => $todayFormatted, |
| 274 | + ]); |
| 275 | + |
| 276 | + return $affected > 0; |
| 277 | + } |
| 278 | + |
| 279 | + /** |
| 280 | + * Execute a task based on its type. |
| 281 | + * |
| 282 | + * @param ScheduledTask $task The task to execute |
| 283 | + * @param object $config Task configuration |
| 284 | + * @param string $todayFormatted Formatted datetime for last_execution |
| 285 | + */ |
| 286 | + private function executeTask(ScheduledTask $task, object $config, string $todayFormatted): void |
| 287 | + { |
| 288 | + $executed = false; |
| 289 | + |
| 290 | + switch ($task->type) { |
| 291 | + case 'TIMER_START_EVENT': |
| 292 | + $this->executeTimerStartEvent($task, $config); |
| 293 | + $executed = true; |
| 294 | + break; |
| 295 | + case 'INTERMEDIATE_TIMER_EVENT': |
| 296 | + $executed = $this->executeIntermediateTimerEvent($task, $config); |
| 297 | + break; |
| 298 | + case 'BOUNDARY_TIMER_EVENT': |
| 299 | + $executed = $this->executeBoundaryTimerEvent($task, $config); |
| 300 | + break; |
| 301 | + case 'SCHEDULED_JOB': |
| 302 | + $this->executeScheduledJob($config); |
| 303 | + $executed = true; |
| 304 | + break; |
| 305 | + default: |
| 306 | + throw new Exception('Unknown timer event: ' . $task->type); |
| 307 | + } |
| 308 | + |
| 309 | + if ($executed) { |
| 310 | + // Update last_execution and release claim |
| 311 | + $task->last_execution = $todayFormatted; |
| 312 | + $task->claimed_by = null; |
| 313 | + $task->claimed_at = null; |
| 314 | + $task->save(); |
| 315 | + } else { |
| 316 | + // Release claim without updating last_execution |
| 317 | + $this->releaseTask($task); |
| 318 | + } |
| 319 | + } |
| 320 | + |
| 321 | + /** |
| 322 | + * Release a task claim without updating last_execution. |
| 323 | + * |
| 324 | + * @param ScheduledTask $task The task to release |
| 325 | + */ |
| 326 | + private function releaseTask(ScheduledTask $task): void |
| 327 | + { |
| 328 | + $task->claimed_by = null; |
| 329 | + $task->claimed_at = null; |
| 330 | + $task->save(); |
| 331 | + } |
| 332 | + |
219 | 333 | /** |
220 | 334 | * Create a scheduled job |
221 | 335 | * |
|
0 commit comments