Skip to content

feat(#25): add job manager and jobs. use scheduledtask as templates #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.6.2

- Add `JobManager` and `Job` making the `ScheduledTask` a template for the jobs. [#54](https://github.com/agilord/cron/pull/54) by [francescovallone](https://github.com/francescovallone)

## 0.6.1

- Add `isRunning` property to `ScheduledTask` to check if the task is running. [#53](https://github.com/agilord/cron/pull/53) by [francescovallone](https://github.com/francescovallone)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ void main() {
```

## Cron parser

You can easily create and parse [cron format](https://en.wikipedia.org/wiki/Cron):

```dart
Expand Down
93 changes: 59 additions & 34 deletions lib/cron.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import 'dart:async';

import 'package:clock/clock.dart';
import 'package:cron/src/job.dart';

import 'src/constraint_parser.dart';
import 'src/job_manager.dart';

export 'src/constraint_parser.dart' show ScheduleParseException;

Expand All @@ -25,6 +27,12 @@ abstract class Cron {

/// Closes the cron instance and doesn't accept new tasks anymore.
Future close();

/// Returns `true` if the task with the specified [taskId] is running.
bool isRunning(String taskId);

/// Returns the count of the jobs running with the specified [taskId].
int count(String taskId);
}

/// The cron schedule.
Expand Down Expand Up @@ -153,19 +161,26 @@ class Schedule {
}

abstract class ScheduledTask {

String get id;

Schedule get schedule;

bool get isRunning;
Task get task;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing isRunning is a breaking change for clients of this package.


Future cancel();
}

const int _millisecondsPerSecond = 1000;

class _Cron implements Cron {

_Cron() : _jobManager = JobManager();

bool _closed = false;
Timer? _timer;
final _schedules = <_ScheduledTask>[];
final JobManager _jobManager;

@override
ScheduledTask schedule(Schedule schedule, Task task) {
Expand All @@ -176,6 +191,16 @@ class _Cron implements Cron {
return st;
}

@override
bool isRunning(String taskId) {
return _jobManager.isRunning(taskId);
}

@override
int count(String taskId) {
return _jobManager.count(taskId);
}

@override
Future close() async {
_closed = true;
Expand All @@ -201,67 +226,67 @@ class _Cron implements Cron {
_timer = null;
final now = clock.now();
for (final schedule in _schedules) {
schedule.tick(now);
final job = schedule.tick(now);
if (job != null) {
_jobManager.start(job, schedule.task);
}
}
_scheduleNextTick();
}
}

class _ScheduledTask implements ScheduledTask {

@override
final Schedule schedule;
final Task _task;
String get id => '${task.hashCode}';

bool _closed = false;
Future? _running;
bool _overrun = false;
@override
final Schedule schedule;

@override
bool get isRunning => _running != null;
final Task task;

bool _closed = false;

/// The datetime a Task last run.
DateTime lastTime = DateTime(0, 0, 0, 0, 0, 0, 0);

_ScheduledTask(this.schedule, this._task);
_ScheduledTask(this.schedule, this.task);

void tick(DateTime now) {
if (_closed) return;
if (!schedule.shouldRunAt(now)) return;
Job? tick(DateTime now) {
if (_closed) return null;
if (!schedule.shouldRunAt(now)) return null;
if ((schedule.seconds == null || lastTime.second == now.second) &&
(schedule.minutes == null || lastTime.minute == now.minute) &&
(schedule.hours == null || lastTime.hour == now.hour) &&
(schedule.days == null || lastTime.day == now.day) &&
(schedule.months == null || lastTime.month == now.month) &&
(schedule.weekdays == null || lastTime.weekday == now.weekday)) {
return;
return null;
}
lastTime = now;
_run();
return Job(taskId: id, id: '$id-${now.millisecondsSinceEpoch}');
}

void _run() {
if (_closed) return;
if (_running != null) {
_overrun = true;
return;
}
_running =
Future.microtask(() => _task()).then((_) => null, onError: (_) => null);
_running!.whenComplete(() {
_running = null;
if (_overrun) {
_overrun = false;
_run();
}
});
}
// void _run() {
// if (_closed) return;
// if (_running != null) {
// _overrun = true;
// return;
// }
// _running =
// Future.microtask(() => _task()).then((_) => null, onError: (_) => null);
// _running!.whenComplete(() {
// _running = null;
// if (_overrun) {
// _overrun = false;
// _run();
// }
// });
// }

@override
Future<void> cancel() async {
_closed = true;
_overrun = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure: why are these removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _overrun is not used anymore since the ScheduledTask is just a template for the Job

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more interested in the why: why is not needed to track it anymore? Related: what if the scheduler wants the next execution to wait until the previous completes its running? Shouldn't that be a configuration option and this way we don't break the runtime behavior for existing clients?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it wasn't needed anymore since in the proposed implementation there is not a way to wait for a job to finish. But you are totally right this is a breaking change and should be added an option to manage the jobs like before. We can, however, prevent the breaking change making the option enabled by default.

if (_running != null) {
await _running;
}
}
}
12 changes: 12 additions & 0 deletions lib/src/job.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class Job {

final String taskId;

final String id;

const Job({
required this.taskId,
required this.id,
});

}
30 changes: 30 additions & 0 deletions lib/src/job_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'package:cron/cron.dart';
import 'package:cron/src/job.dart';

class JobManager {

final Map<String, List<Job>> _jobs = {};

JobManager();

void start(Job job, Task task) {
final jobs = _jobs[job.taskId];
if (jobs != null) {
jobs.add(job);
} else {
_jobs[job.taskId] = [job];
}
Future.microtask(() => task()).then((_) {
_jobs[job.taskId]?.removeWhere((element) => element.id == job.id);
}, onError: (_) => _);
}

bool isRunning(String taskId) {
return _jobs[taskId]?.isNotEmpty ?? false;
}

int count(String taskId) {
return _jobs[taskId]?.length ?? 0;
}

}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: cron
description: A time-based job scheduler similar to cron. Run tasks periodically at fixed times or intervals.
version: 0.6.1
version: 0.6.2
repository: https://github.com/agilord/cron

topics:
Expand Down
3 changes: 1 addition & 2 deletions test/cron_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ void main() {

async.elapse(Duration(minutes: 10));

expect(schedule.isRunning, true);

expect(cron.isRunning(schedule.id), true);
async.elapse(Duration(seconds: 10));

expect(count, 10);
Expand Down