Skip to content

Commit

Permalink
feat: refactor the cronjobext configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
apocelipes committed Jun 8, 2022
1 parent c086930 commit 3643ef5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 44 deletions.
4 changes: 3 additions & 1 deletion docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Unreleased
# [Unreleased]

## Added:

- 添加用于执行定时任务的 CronJobExt

Expand Down
24 changes: 11 additions & 13 deletions docs/ext_cronjob_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@ CronJobExt 的主要特点:

首先,配置 config.yaml

```yaml
cronjob_concurrency: 10
cronjob_broker: "redis://redis:6379/8"
cronjob_default_queue: "gobay.task"
cronjob_result_backend: "redis://redis:6379/8"
cronjob_results_expire_in: 600
cronjob_redis: {}
cronjob_tz: "Asia/Tokyo" # 默认为 UTC
cronjob_health_check_port: 8080 # 默认为5000
```
如果想要复用 async task 的配置(比如需要像某个特定的任务队列发送定时任务):
cronjob必须要和asynctaskext一起使用,因此需要先创建asynctask的配置,然后使用`bind_to`,指定要使用的异步任务队列的配置:

```yaml
cronjob_reuse_other: "other_asynctask_" # 复用other_asynctask的配置,other_asynctask_为配置项的前缀
other_asynctask_concurrency: 10
other_asynctask_broker: "redis://127.0.0.1:6379/8"
other_asynctask_default_queue: "gobay.asynctask.queue"
other_asynctask_result_backend: "redis://127.0.0.1:6379/8"
other_asynctask_results_expire_in: 1
other_asynctask_redis: {}

cronjob_bind_to: "other_asynctask_" # 使用other_asynctask的配置,other_asynctask_为配置项的前缀
cronjob_health_check_port: 5001
```
Expand Down Expand Up @@ -139,3 +135,5 @@ CronJob.StartCronJob()
2. 如果不能设置专用队列,且无法接受执行延迟,可以使用 k8s 的 cronjob,但要注意 k8s 的 cronjob 只支持 crontab 表达式。

worker 需要自己处理任务超时和防止任务重叠运行,因为 machinery 的机制导致这些功能在 worker 上处理会比较方便,CronJobExt暂不支持。

**任务超时和重叠运行都不是cronjob的关注点,未来应该也不会支持。**
33 changes: 15 additions & 18 deletions extensions/cronjobext/cronjobext.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

// Config configuration of cronjobext
type Config struct {
*config.Config `mapstructure:",squash"`
ReuseOther string `yaml:"reuse_other"`
TimeZone string `yaml:"tz"`
HealthCheckPort int `yaml:"health_check_port"` // default is 5000
AsyncTaskConfig *config.Config `yaml:"-" ignored:"true"`
BindTo string `yaml:"bind_to"`
TimeZone string `yaml:"tz"`
HealthCheckPort int `yaml:"health_check_port"` // default is 5000
}

// TZ converts TimeZone to a pointer of time.Location
Expand Down Expand Up @@ -59,28 +59,25 @@ func (t *CronJobExt) Init(app *gobay.Application) error {
t.app = app
extCfg := app.Config()
extCfg = gobay.GetConfigByPrefix(extCfg, t.NS, true)
t.config = &Config{Config: &config.Config{}, TimeZone: "UTC", HealthCheckPort: 5000}
t.config = &Config{AsyncTaskConfig: &config.Config{}, TimeZone: "UTC", HealthCheckPort: 5000}
if err := extCfg.Unmarshal(t.config, func(config *mapstructure.DecoderConfig) {
config.TagName = "yaml"
config.Squash = true
}); err != nil {
return err
}

asyncTaskNS := t.NS
if t.config.ReuseOther != "" {
// reuse other AsyncTaskExt configurations
asyncTaskNS = t.config.ReuseOther
asyncExtCfg := app.Config()
asyncExtCfg = gobay.GetConfigByPrefix(asyncExtCfg, asyncTaskNS, true)
asyncConf := &config.Config{}
if err := asyncExtCfg.Unmarshal(asyncConf, func(config *mapstructure.DecoderConfig) {
config.TagName = "yaml"
}); err != nil {
return err
}
t.config.Config = asyncConf
// bind to other AsyncTaskExt configurations
asyncTaskNS := t.config.BindTo
asyncExtCfg := app.Config()
asyncExtCfg = gobay.GetConfigByPrefix(asyncExtCfg, asyncTaskNS, true)
asyncConf := &config.Config{}
if err := asyncExtCfg.Unmarshal(asyncConf, func(config *mapstructure.DecoderConfig) {
config.TagName = "yaml"
}); err != nil {
return err
}
t.config.AsyncTaskConfig = asyncConf
t.server = &asynctaskext.AsyncTaskExt{
NS: asyncTaskNS,
}
Expand Down
15 changes: 9 additions & 6 deletions extensions/cronjobext/cronjobext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ func TestCronJobExtTimeZone(t *testing.T) {
}

func TestReuseOtherConfig(t *testing.T) {
if cronjobOne.config.DefaultQueue != "gobay.task.one" {
t.Errorf("reuse error: want: %v got: %v", "gobay.task.one", cronjobOne.config.DefaultQueue)
if cronjobOne.config.BindTo != "one_asynctask_" {
t.Errorf("reuse error: want: %v got: %v", "one_asynctask_", cronjobOne.config.BindTo)
}
if cronjobOne.config.AsyncTaskConfig.DefaultQueue != "gobay.task.one" {
t.Errorf("reuse error: want: %v got: %v", "gobay.task.one", cronjobOne.config.AsyncTaskConfig.DefaultQueue)
}

if cronjobTwo.config.ReuseOther != "two_asynctask_" {
t.Errorf("reuse error: want: %v got: %v", "two_asynctask_", cronjobTwo.config.ReuseOther)
if cronjobTwo.config.BindTo != "two_asynctask_" {
t.Errorf("reuse error: want: %v got: %v", "two_asynctask_", cronjobTwo.config.BindTo)
}
if cronjobTwo.config.DefaultQueue != "gobay.task.two" {
t.Errorf("reuse error: want: %v got: %v", "gobay.task.two", cronjobTwo.config.DefaultQueue)
if cronjobTwo.config.AsyncTaskConfig.DefaultQueue != "gobay.task.two" {
t.Errorf("reuse error: want: %v got: %v", "gobay.task.two", cronjobTwo.config.AsyncTaskConfig.DefaultQueue)
}
}

Expand Down
8 changes: 2 additions & 6 deletions testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,11 @@ defaults: &defaults
two_asynctask_results_expire_in: 1
two_asynctask_redis: {}

one_cronjob_broker: "redis://127.0.0.1:6379/8"
one_cronjob_default_queue: "gobay.task.one"
one_cronjob_result_backend: "redis://127.0.0.1:6379/8"
one_cronjob_results_expire_in: 1
one_cronjob_redis: { }
one_cronjob_bind_to: "one_asynctask_"
one_cronjob_tz: "Asia/Shanghai"
one_cronjob_health_check_port: 5001

two_cronjob_reuse_other: "two_asynctask_"
two_cronjob_bind_to: "two_asynctask_"
two_cronjob_health_check_port: 5001

db_driver: sqlite3
Expand Down

0 comments on commit 3643ef5

Please sign in to comment.