-
Notifications
You must be signed in to change notification settings - Fork 23
feat(#25): add job manager and jobs. use scheduledtask as templates #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,8 +4,10 @@ | |
import 'dart:async'; | ||
|
||
import 'package:clock/clock.dart'; | ||
import 'package:cron/src/job.dart'; | ||
|
||
import 'src/constraint_parser.dart'; | ||
import 'src/job_manager.dart'; | ||
|
||
export 'src/constraint_parser.dart' show ScheduleParseException; | ||
|
||
|
@@ -25,6 +27,12 @@ abstract class Cron { | |
|
||
/// Closes the cron instance and doesn't accept new tasks anymore. | ||
Future close(); | ||
|
||
/// Returns `true` if the task with the specified [taskId] is running. | ||
bool isRunning(String taskId); | ||
|
||
/// Returns the count of the jobs running with the specified [taskId]. | ||
int count(String taskId); | ||
} | ||
|
||
/// The cron schedule. | ||
|
@@ -153,19 +161,26 @@ class Schedule { | |
} | ||
|
||
abstract class ScheduledTask { | ||
|
||
String get id; | ||
|
||
Schedule get schedule; | ||
|
||
bool get isRunning; | ||
Task get task; | ||
|
||
Future cancel(); | ||
} | ||
|
||
const int _millisecondsPerSecond = 1000; | ||
|
||
class _Cron implements Cron { | ||
|
||
_Cron() : _jobManager = JobManager(); | ||
|
||
bool _closed = false; | ||
Timer? _timer; | ||
final _schedules = <_ScheduledTask>[]; | ||
final JobManager _jobManager; | ||
|
||
@override | ||
ScheduledTask schedule(Schedule schedule, Task task) { | ||
|
@@ -176,6 +191,16 @@ class _Cron implements Cron { | |
return st; | ||
} | ||
|
||
@override | ||
bool isRunning(String taskId) { | ||
return _jobManager.isRunning(taskId); | ||
} | ||
|
||
@override | ||
int count(String taskId) { | ||
return _jobManager.count(taskId); | ||
} | ||
|
||
@override | ||
Future close() async { | ||
_closed = true; | ||
|
@@ -201,67 +226,67 @@ class _Cron implements Cron { | |
_timer = null; | ||
final now = clock.now(); | ||
for (final schedule in _schedules) { | ||
schedule.tick(now); | ||
final job = schedule.tick(now); | ||
if (job != null) { | ||
_jobManager.start(job, schedule.task); | ||
} | ||
} | ||
_scheduleNextTick(); | ||
} | ||
} | ||
|
||
class _ScheduledTask implements ScheduledTask { | ||
|
||
@override | ||
final Schedule schedule; | ||
final Task _task; | ||
String get id => '${task.hashCode}'; | ||
|
||
bool _closed = false; | ||
Future? _running; | ||
bool _overrun = false; | ||
@override | ||
final Schedule schedule; | ||
|
||
@override | ||
bool get isRunning => _running != null; | ||
final Task task; | ||
|
||
bool _closed = false; | ||
|
||
/// The datetime a Task last run. | ||
DateTime lastTime = DateTime(0, 0, 0, 0, 0, 0, 0); | ||
|
||
_ScheduledTask(this.schedule, this._task); | ||
_ScheduledTask(this.schedule, this.task); | ||
|
||
void tick(DateTime now) { | ||
if (_closed) return; | ||
if (!schedule.shouldRunAt(now)) return; | ||
Job? tick(DateTime now) { | ||
if (_closed) return null; | ||
if (!schedule.shouldRunAt(now)) return null; | ||
if ((schedule.seconds == null || lastTime.second == now.second) && | ||
(schedule.minutes == null || lastTime.minute == now.minute) && | ||
(schedule.hours == null || lastTime.hour == now.hour) && | ||
(schedule.days == null || lastTime.day == now.day) && | ||
(schedule.months == null || lastTime.month == now.month) && | ||
(schedule.weekdays == null || lastTime.weekday == now.weekday)) { | ||
return; | ||
return null; | ||
} | ||
lastTime = now; | ||
_run(); | ||
return Job(taskId: id, id: '$id-${now.millisecondsSinceEpoch}'); | ||
} | ||
|
||
void _run() { | ||
if (_closed) return; | ||
if (_running != null) { | ||
_overrun = true; | ||
return; | ||
} | ||
_running = | ||
Future.microtask(() => _task()).then((_) => null, onError: (_) => null); | ||
_running!.whenComplete(() { | ||
_running = null; | ||
if (_overrun) { | ||
_overrun = false; | ||
_run(); | ||
} | ||
}); | ||
} | ||
// void _run() { | ||
// if (_closed) return; | ||
// if (_running != null) { | ||
// _overrun = true; | ||
// return; | ||
// } | ||
// _running = | ||
// Future.microtask(() => _task()).then((_) => null, onError: (_) => null); | ||
// _running!.whenComplete(() { | ||
// _running = null; | ||
// if (_overrun) { | ||
// _overrun = false; | ||
// _run(); | ||
// } | ||
// }); | ||
// } | ||
|
||
@override | ||
Future<void> cancel() async { | ||
_closed = true; | ||
_overrun = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure: why are these removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The _overrun is not used anymore since the ScheduledTask is just a template for the Job There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm more interested in the why: why is not needed to track it anymore? Related: what if the scheduler wants the next execution to wait until the previous completes its running? Shouldn't that be a configuration option and this way we don't break the runtime behavior for existing clients? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought it wasn't needed anymore since in the proposed implementation there is not a way to wait for a job to finish. But you are totally right this is a breaking change and should be added an option to manage the jobs like before. We can, however, prevent the breaking change making the option enabled by default. |
||
if (_running != null) { | ||
await _running; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
class Job { | ||
|
||
final String taskId; | ||
|
||
final String id; | ||
|
||
const Job({ | ||
required this.taskId, | ||
required this.id, | ||
}); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import 'package:cron/cron.dart'; | ||
import 'package:cron/src/job.dart'; | ||
|
||
class JobManager { | ||
|
||
final Map<String, List<Job>> _jobs = {}; | ||
|
||
JobManager(); | ||
|
||
void start(Job job, Task task) { | ||
final jobs = _jobs[job.taskId]; | ||
if (jobs != null) { | ||
jobs.add(job); | ||
} else { | ||
_jobs[job.taskId] = [job]; | ||
} | ||
Future.microtask(() => task()).then((_) { | ||
_jobs[job.taskId]?.removeWhere((element) => element.id == job.id); | ||
}, onError: (_) => _); | ||
} | ||
|
||
bool isRunning(String taskId) { | ||
return _jobs[taskId]?.isNotEmpty ?? false; | ||
} | ||
|
||
int count(String taskId) { | ||
return _jobs[taskId]?.length ?? 0; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing
isRunning
is a breaking change for clients of this package.