Skip to content

feat: version 1.7.0 #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 80 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Standalone admin panel with all data stored in SQLite database
- [Previews](#previews)
- [Usage](#usage)
- [Docker Compose Example](#docker-compose-example)
- [Running without Docker](#running-without-docker)
- [Task States](#task-states)
- [Development](#development)

Expand All @@ -20,13 +21,18 @@ Tasks Page | Task Details Page
1) Add this middleware to your project:

```python
import asyncio
import logging
import aiohttp
from typing import Any
from urllib.parse import urljoin
from datetime import datetime, UTC

import httpx
from taskiq import TaskiqMiddleware, TaskiqResult, TaskiqMessage

logger = logging.getLogger(__name__)


class TaskiqAdminMiddleware(TaskiqMiddleware):
def __init__(
self,
Expand All @@ -38,63 +44,82 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
self.url = url
self.api_token = api_token
self.__ta_broker_name = taskiq_broker_name
self._pending: set[asyncio.Task[Any]] = set()
self._client: aiohttp.ClientSession | None = None

async def post_send(self, message):
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
async with httpx.AsyncClient() as client:
await client.post(
headers={"access-token": self.api_token},
url=urljoin(self.url, f"/api/tasks/{message.task_id}/queued"),
json={
"args": message.args,
"kwargs": message.kwargs,
"taskName": message.task_name,
"worker": self.__ta_broker_name,
"queuedAt": now,
},
@staticmethod
def _now_iso() -> str:
return datetime.now(UTC).replace(tzinfo=None).isoformat()

async def startup(self):
self._client = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=5),
)

async def shutdown(self):
if self._pending:
await asyncio.gather(*self._pending, return_exceptions=True)
if self._client is not None:
await self._client.close()

def _spawn_request(self, endpoint: str, payload: dict[str, Any]) -> None:
async def _send() -> None:
session = self._client or aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=5)
)

async with session.post(
urljoin(self.url, endpoint),
headers={"access-token": self.api_token},
json=payload,
) as resp:
resp.raise_for_status()
if not resp.ok:
logger.error(f"POST {endpoint} - {resp.status}")

task = asyncio.create_task(_send())
self._pending.add(task)
task.add_done_callback(self._pending.discard)

async def post_send(self, message):
self._spawn_request(
f"/api/tasks/{message.task_id}/queued",
{
"args": message.args,
"kwargs": message.kwargs,
"queuedAt": self._now_iso(),
"taskName": message.task_name,
"worker": self.__ta_broker_name,
},
)
return super().post_send(message)

async def pre_execute(self, message: TaskiqMessage):
""""""
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
async with httpx.AsyncClient() as client:
await client.post(
headers={"access-token": self.api_token},
url=urljoin(self.url, f"/api/tasks/{message.task_id}/started"),
json={
"startedAt": now,
"args": message.args,
"kwargs": message.kwargs,
"taskName": message.task_name,
"worker": self.__ta_broker_name,
},
)

self._spawn_request(
f"/api/tasks/{message.task_id}/started",
{
"args": message.args,
"kwargs": message.kwargs,
"startedAt": self._now_iso(),
"taskName": message.task_name,
"worker": self.__ta_broker_name,
},
)
return super().pre_execute(message)

async def post_execute(
self,
message: TaskiqMessage,
result: TaskiqResult[Any],
):
async def post_execute(self, message: TaskiqMessage, result: TaskiqResult[Any]):
""""""
now = datetime.now(UTC).replace(tzinfo=None).isoformat()
async with httpx.AsyncClient() as client:
await client.post(
headers={"access-token": self.api_token},
url=urljoin(
self.url,
f"/api/tasks/{message.task_id}/executed",
),
json={
"finishedAt": now,
"error": result.error
if result.error is None
else repr(result.error),
"executionTime": result.execution_time,
"returnValue": {"return_value": result.return_value},
},
)
self._spawn_request(
f"/api/tasks/{message.task_id}/executed",
{
"finishedAt": self._now_iso(),
"executionTime": result.execution_time,
"error": None if result.error is None else repr(result.error),
"returnValue": {"return_value": result.return_value},
},
)
return super().post_execute(message, result)
```

Expand All @@ -103,7 +128,7 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
```python
...
broker = (
ListQueueBroker(
RedisStreamBroker(
url=redis_url,
queue_name="my_lovely_queue",
)
Expand Down Expand Up @@ -164,6 +189,11 @@ volumes:
admin_data:
```

### Running without Docker
1) `cp env-example .env`, enter `.env` file and fill in all needed variables
2) run `make dev` to run it locally in dev mode
3) run `make prod` to run it locally in prod mode

### Task States
Let's assume we have a task 'do_smth', there are all states it can embrace:
1) `queued` - the task has been sent to the queue without an error
Expand Down
25 changes: 25 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
MODE ?= dev

ifneq (,$(wildcard ./.env))
include .env
export
endif

.PHONY: all dev prod install gen run build

all: $(MODE)

install:
pnpm install --frozen-lockfile

gen:
pnpm run generate:sql

dev: install gen
pnpm dev

build: install gen
pnpm build

prod: build
node .output/server/index.mjs
44 changes: 25 additions & 19 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,49 +1,55 @@
{
"name": "nuxt-app",
"name": "taskiq-admin",
"private": true,
"type": "module",
"version": "1.6.0",
"version": "1.7.0",
"scripts": {
"build": "nuxt build",
"dev": "nuxt dev",
"generate": "nuxt generate",
"preview": "nuxt preview",
"postinstall": "nuxt prepare",
"typecheck": "tsc --noEmit",
"test": "vitest --run",
"db:push": "drizzle-kit push",
"generate:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql",
"generate:future:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql; sed -i '1s/^/PRAGMA journal_mode = WAL; PRAGMA synchronous = normal; PRAGMA journal_size_limit = 6144000;\\n/' dbschema.sql"
"generate:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql; sed -i '1s/^/PRAGMA journal_mode = WAL; PRAGMA synchronous = normal; PRAGMA journal_size_limit = 6144000;\\n/' dbschema.sql",
"generate:deprecated:sql": "drizzle-kit export --sql | sed 's/CREATE TABLE/CREATE TABLE IF NOT EXISTS/g; s/CREATE INDEX/CREATE INDEX IF NOT EXISTS/g' > dbschema.sql"
},
"dependencies": {
"@internationalized/date": "^3.8.0",
"@internationalized/date": "^3.8.2",
"@tailwindcss/vite": "^4.1.3",
"@tanstack/vue-table": "^8.21.3",
"@vueuse/core": "^12.8.2",
"better-sqlite3": "^11.9.1",
"bootstrap": "^5.3.3",
"@vueuse/core": "^13.4.0",
"better-sqlite3": "^12.1.1",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"dayjs": "^1.11.13",
"dotenv": "^16.4.7",
"drizzle-orm": "^0.42.0",
"lucide-vue-next": "^0.487.0",
"nuxt": "^3.16.2",
"dotenv": "^16.6.0",
"drizzle-orm": "^0.44.2",
"lucide-vue-next": "^0.524.0",
"nuxt": "^3.17.5",
"reka-ui": "^2.2.0",
"tailwind-merge": "^3.2.0",
"tailwindcss": "^4.1.3",
"tw-animate-css": "^1.2.5",
"vue": "^3.5.13",
"vue-router": "^4.5.0",
"vue-sonner": "^1.3.0",
"zod": "^3.24.3"
"vue": "^3.5.17",
"vue-router": "^4.5.1",
"vue-sonner": "^2.0.1",
"zod": "^3.25.67"
},
"packageManager": "pnpm@8.7.6+sha1.a428b12202bc4f23b17e6dffe730734dae5728e2",
"devDependencies": {
"@iconify-json/radix-icons": "^1.2.2",
"@iconify/vue": "^4.3.0",
"@iconify/vue": "^5.0.0",
"@nuxt/test-utils": "^3.19.1",
"@types/better-sqlite3": "^7.6.12",
"drizzle-kit": "^0.31.0",
"@vue/test-utils": "^2.4.6",
"drizzle-kit": "^0.31.4",
"happy-dom": "^18.0.1",
"playwright-core": "^1.53.1",
"prettier": "^3.5.3",
"typescript": "^5.8.3"
"tsx": "^4.20.3",
"typescript": "^5.8.3",
"vitest": "^3.2.4"
}
}
Loading