一个基于Redis的轻量级任务调度系统,支持任务注册、重试机制、可视化界面和命令行管理。
- 轻量级设计: 极简架构,代码简洁高效
- 任务注册: 支持装饰器方式注册任务处理器
- 重试机制: 支持任务级别重试配置
- 可视化界面: 实时仪表盘、任务管理、分组统计
- 命令行工具: 完整的CLI管理界面
- 命名空间: 支持多环境隔离
- 任务查询: 多条件组合查询和清理
# 从源码安装
git clone <repository>
cd qtask
pip install -e .
# 或直接安装依赖
pip install redis fastapi uvicorn click loguru
# 默认启动 (127.0.0.1:8000)
qtask server
# 指定主机和端口
qtask server --host 0.0.0.0 --port 8080
# 连接远程Redis
qtask server --redis-host redis.example.com --redis-port 6379
# 开发模式(代码变更自动重载)
qtask server --reload
# 查看默认namespace状态
qtask status
# 查看指定namespace状态
qtask status --namespace production
# 创建2个示例任务
qtask demo --count 5
# 在指定namespace创建示例
qtask demo --namespace test --count 10
from qtask.core.task_worker import TaskWorker
from qtask.core.task_publisher import TaskPublisher
from qtask.core.task_storage import TaskStorage
import time
# 创建任务处理器
worker = TaskWorker()
# 注册任务处理器
@worker.register('email', max_retries=3)
def send_email(data):
"""发送邮件任务"""
email_to = data.get('to')
subject = data.get('subject')
content = data.get('content')
# 模拟发送邮件
time.sleep(1)
if email_to and subject:
return 'DONE', {'sent': True, 'to': email_to}, '邮件发送成功'
else:
raise Exception('邮件参数不完整')
# return 'ERROR', None, '邮件参数不完整'
@worker.register('backup', max_retries=2)
def backup_data(data):
"""数据备份任务"""
backup_path = data.get('path')
# 模拟备份操作
time.sleep(2)
if backup_path:
return 'DONE', {'backup_path': backup_path}, '备份完成'
else:
return 'NULL', None, '跳过备份:路径未指定'
# 创建任务发布器
publisher = TaskPublisher()
# 发布任务
task_data = {
'type': 'email',
'to': 'user@example.com',
'subject': '测试邮件',
'content': '这是一封测试邮件'
}
task_id = publisher.publish(task_data)
print(f"任务已发布: {task_id}")
# 处理任务
task = worker.get_next_task()
if task:
result = worker.process_task(task)
print(f"任务处理结果: {result}")
from qtask.core.config import QTaskConfig
from qtask.core.factory import TaskStorageFactory
# 创建配置
config = QTaskConfig()
config.redis_host = 'localhost'
config.redis_port = 6379
config.default_namespace = 'production'
# 创建工厂
factory = TaskStorageFactory(config)
# 获取不同命名空间的存储
prod_storage = factory.get_storage('production')
test_storage = factory.get_storage('test')
# 发布任务到不同命名空间
publisher_prod = factory.get_publisher('production')
publisher_test = factory.get_publisher('test')
# 生产环境任务
prod_task_id = publisher_prod.publish({
'type': 'backup',
'path': '/data/production'
})
# 测试环境任务
test_task_id = publisher_test.publish({
'type': 'email',
'to': 'test@example.com',
'subject': '测试'
})
from qtask.core.task_storage import TaskStorage
# 创建存储实例
storage = TaskStorage()
# 查询错误任务
error_tasks = storage.find_tasks(status='ERROR')
print(f"错误任务数量: {len(error_tasks)}")
# 查询特定分组的任务
email_tasks = storage.find_tasks(group='email')
print(f"邮件任务数量: {len(email_tasks)}")
# 重新排队错误任务
for task_id in error_tasks[:5]: # 重新排队前5个错误任务
if storage.requeue_task(task_id):
print(f"任务 {task_id} 已重新排队")
# 查看系统状态
qtask status --namespace production
# 查询所有错误任务
qtask query --error --format table
# 查询最近24小时的任务
qtask query --recent --format json
# 清理7天前的已完成任务
qtask clean --done --older-than "7d" --dry-run
# 重新排队错误任务
qtask query --error --format ids | xargs -I {} qtask requeue --task-ids {}
# 批量清理所有namespace的旧任务
for ns in $(qtask namespaces --format ids); do
echo "清理 $ns 命名空间..."
qtask clean --namespace $ns --older-than "30d" --dry-run
done
启动服务器后,访问 http://localhost:8000
可以看到:
- 仪表盘: 实时任务统计、最近任务、状态分布图表
- 任务列表: 查看所有任务,支持按namespace、类型、状态筛选
- 分组管理: 按分组查看任务统计
- 创建任务: 通过Web界面创建新任务
- 查询页面: 高级查询和清理功能
#!/bin/bash
# 监控脚本示例
# 检查错误任务数量
error_count=$(qtask query --error --count-only)
if [ $error_count -gt 10 ]; then
echo "警告: 错误任务数量过多 ($error_count)"
# 发送告警通知
fi
# 检查待处理任务积压
todo_count=$(qtask query --todo --count-only)
if [ $todo_count -gt 100 ]; then
echo "警告: 待处理任务积压 ($todo_count)"
fi
# 检查处理时间过长的任务
qtask query --status PROCESSING --older-than "1h" --format table
启动QTask的Web界面和API服务。
# 基本用法
qtask server
# 高级配置
qtask server \
--host 0.0.0.0 \
--port 8080 \
--redis-host redis.example.com \
--redis-port 6379 \
--redis-db 1 \
--namespace production \
--reload
参数说明:
--host
: 服务器主机地址 (默认: 127.0.0.1)--port
: 服务器端口 (默认: 8000)--redis-host
: Redis主机地址 (默认: localhost)--redis-port
: Redis端口 (默认: 6379)--redis-db
: Redis数据库编号 (默认: 0)--namespace
: 默认命名空间 (默认: default)--reload
: 开发模式,代码变更自动重载
显示指定命名空间的系统统计信息。
# 查看默认namespace状态
qtask status
# 查看指定namespace状态
qtask status --namespace production
# 连接远程Redis查看状态
qtask status --redis-host redis.example.com --redis-port 6379
输出示例:
QTask System Status
====================
Namespace: default
Redis: localhost:6379 (db: 0)
TODO tasks: 15
DONE tasks: 128
SKIP tasks: 3
ERROR tasks: 2
Task Groups: email, report, backup
显示系统中所有可用的命名空间。
# 基本列表
qtask namespaces
# 显示详细统计信息
qtask namespaces --show-stats
# JSON格式输出
qtask namespaces --format json
# 表格格式输出(默认)
qtask namespaces --format table
输出示例:
Available Namespaces
====================
┌─────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│ Namespace │ TODO │ DONE │ ERROR │ SKIP │ Total │
├─────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ default │ 15 │ 128 │ 2 │ 3 │ 148 │
│ production │ 8 │ 256 │ 1 │ 0 │ 265 │
│ test │ 3 │ 45 │ 0 │ 1 │ 49 │
└─────────────┴─────────────┴─────────┴─────────┴─────────┴─────────┘
强大的任务查询功能,支持多种过滤条件和输出格式。
# 基本查询 - 所有任务
qtask query
# 查询特定状态的任务
qtask query --status ERROR
qtask query --status DONE
# 快速查询标志
qtask query --todo # 所有待处理任务
qtask query --done # 所有已完成任务
qtask query --error # 所有错误任务
qtask query --recent # 最近24小时的任务
# 按分组查询
qtask query --group email
# 按任务类型查询
qtask query --task-type backup
# 按名称模糊查询
qtask query --name-contains "report"
# 时间范围查询
qtask query --after "2024-01-01"
qtask query --before "2024-01-31"
qtask query --older-than "7d" # 7天前
qtask query --newer-than "24h" # 24小时内
# 组合查询
qtask query --status ERROR --older-than "7d" --group backup
# 输出格式控制
qtask query --format json # JSON格式
qtask query --format csv # CSV格式
qtask query --format ids # 仅任务ID
qtask query --format table # 表格格式(默认)
# 排序和限制
qtask query --sort created_time --desc --limit 10
qtask query --sort name --limit 50
# 详细输出
qtask query --verbose --show-data
qtask query --count-only # 仅显示数量
常用查询示例:
# 查询所有错误任务
qtask query --error
# 查询最近7天的已完成任务
qtask query --done --newer-than "7d"
# 查询特定分组的待处理任务
qtask query --todo --group email
# 查询超过30天的旧任务
qtask query --older-than "30d"
# 查询包含"backup"的任务
qtask query --name-contains backup
# 查询特定时间范围的任务
qtask query --after "2024-01-01 00:00:00" --before "2024-01-31 23:59:59"
根据条件清理任务,支持预览模式。
# 清理所有已完成任务
qtask clean --done
# 清理所有错误任务
qtask clean --error
# 清理所有非待处理任务
qtask clean --all-completed
# 按时间清理
qtask clean --older-than "30d"
qtask clean --before "2024-01-01"
# 按分组清理
qtask clean --group test --status DONE
# 按任务类型清理
qtask clean --task-type backup --status ERROR
# 预览模式(不实际删除)
qtask clean --done --dry-run
# 强制删除(跳过确认)
qtask clean --error --force
常用清理示例:
# 预览要清理的已完成任务
qtask clean --done --dry-run
# 清理7天前的错误任务
qtask clean --error --older-than "7d"
# 清理测试分组的已完成任务
qtask clean --group test --status DONE
# 清理所有非待处理任务(谨慎使用)
qtask clean --all-completed --dry-run
一键清空整个命名空间的所有任务。
# 清空默认namespace
qtask clear
# 清空指定namespace
qtask clear --namespace test
# 预览模式
qtask clear --namespace production --dry-run
# 强制清空(跳过确认)
qtask clear --namespace test --force
将错误任务重新放回待处理队列。
# 重新排队单个任务
qtask requeue --task-ids "abc123-def456"
# 重新排队多个任务
qtask requeue --task-ids "task1,task2,task3"
# 在指定namespace重新排队
qtask requeue --namespace production --task-ids "task1,task2"
使用场景:
# 1. 先查询错误任务
qtask query --error --format ids
# 2. 重新排队特定错误任务
qtask requeue --task-ids "abc123-def456,xyz789-uvw012"
创建示例任务用于测试和演示。
# 创建2个示例任务(默认)
qtask demo
# 创建10个示例任务
qtask demo --count 10
# 在指定namespace创建示例
qtask demo --namespace test --count 5
# 开发环境
qtask server --namespace dev --redis-db 0
# 测试环境
qtask server --namespace test --redis-db 1
# 生产环境
qtask server --namespace production --redis-db 2
# 批量查询所有namespace的状态
for ns in $(qtask namespaces --format ids); do
echo "=== $ns ==="
qtask status --namespace $ns
done
# 批量清理所有namespace的旧任务
for ns in $(qtask namespaces --format ids); do
echo "Cleaning $ns..."
qtask clean --namespace $ns --older-than "30d" --dry-run
done
#!/bin/bash
# 监控脚本示例
# 检查错误任务数量
error_count=$(qtask query --error --count-only)
if [ $error_count -gt 10 ]; then
echo "警告: 错误任务数量过多 ($error_count)"
fi
# 检查待处理任务积压
todo_count=$(qtask query --todo --count-only)
if [ $todo_count -gt 100 ]; then
echo "警告: 待处理任务积压 ($todo_count)"
fi
QTask使用Redis作为存储后端,支持以下配置:
- 主机:
--redis-host
(默认: localhost) - 端口:
--redis-port
(默认: 6379) - 数据库:
--redis-db
(默认: 0) - 命名空间:
--namespace
(默认: default)
支持通过环境变量配置:
export QTASK_REDIS_HOST=redis.example.com
export QTASK_REDIS_PORT=6379
export QTASK_REDIS_DB=1
export QTASK_NAMESPACE=production
-
连接Redis失败
# 检查Redis服务状态 redis-cli ping # 使用不同数据库 qtask server --redis-db 1
-
权限问题
# 确保有Redis访问权限 redis-cli -h your-redis-host -p 6379 ping
-
端口占用
# 使用不同端口 qtask server --port 8080
# 启用详细日志
export QTASK_LOG_LEVEL=DEBUG
qtask server --reload
- 新增: 手动放回失败的任务
- 优化: dashboard UI区分namespace
- 修复: 任务状态显示问题
- 新增: 可用版本(UI部分没有很好区分namespace)
- 新增: namespace和命令行清理
- 新增: 任务查询和删除
- 新增: 任务级别重试设置
- 新增: 更丰富的任务状态和返回值 {status, data, message}
- 初始版本: 基本的任务调度系统
- 支持可视化界面
- 支持命令行界面
欢迎提交Issue和Pull Request来改进QTask。
MIT License