Skip to content

DaSeeker67/ArcticQueue

Repository files navigation

QueueCTL - Background Job Queue System

queuectl is a robust, CLI-first background job queue system built in Python, fulfilling all requirements for the Backend Developer Internship Assignment.

It manages job execution with persistent storage, concurrent workers, automatic retries, and a dead-letter queue. This implementation includes all core requirements and all optional bonus features, including a real-time web dashboard.


Live Demo

Watch the video demo to see all features in action.

[https://drive.google.com/file/d/1ZJeCpVkPRXlGNXcVzCv_kM4JMbOazK7E/view?usp=drive_link]


Features

A quick overview of all implemented features.

Core Features

Persistent Jobs: Uses SQLite to ensure jobs are not lost on restart.

Concurrent Workers: Uses multiprocessing to run multiple jobs in parallel.

Atomic Operations: Guarantees that a job is only ever processed by one worker.

Exponential Backoff: Automatically retries failed jobs with increasing delays.

Dead Letter Queue (DLQ): Moves permanently-failed jobs to a separate queue for inspection.

Full CLI: All operations are managed through a clean click-based CLI.

Bonus Features (All Implemented)

Priority Queues: Enqueue high-priority jobs to be run first (--priority).

Scheduled/Delayed Jobs: Enqueue jobs to be run at a future time (--delay).

Job Timeouts: Set a per-job timeout to kill long-running processes (--timeout).

Output Logging & Metrics: The inspect command shows detailed job info, duration, stdout, and stderr logs.

Minimal Web Dashboard: A live-updating web UI (built with Flask & HTMX) to monitor the queue.


Job Lifecycle & Logic

  1. Job Lifecycle: A job follows a strict state machine:

    • pending: The initial state when a job is enqueued (or scheduled for the future).
    • processing: A worker has atomically acquired the job and is executing it.
    • completed: The job's command finished with an exit code of 0.
    • (retry): If the job fails, it's moved back to pending with an increased attempts count and a new next_run_at time for exponential backoff.
    • dead: The job has failed all max_retries and is moved to the Dead Letter Queue (DLQ) for manual inspection.
  2. Data Persistence: All job data is stored in a single SQLite database file (~/.queuectl/jobs.db). This ensures that all enqueued jobs, even pending ones, survive a system restart.

  3. Worker Logic (Concurrency): Workers run in separate processes. To prevent two workers from grabbing the same job (a race condition), the system uses an atomic SQL UPDATE ... RETURNING query. This finds, locks (by setting state to processing), and returns the next available job in a single, non-interruptible operation.

Architecture & Tech Stack

  • Language: Python 3
  • CLI: click (for a clean, composable command-line interface) & rich (for minimilistic design)
  • Database: sqlite3 (for file-based, transactional, and concurrent-safe persistence)
  • Concurrency: multiprocessing (to run workers in parallel and bypass the GIL)
  • Web Dashboard: Flask + htmx-flask (for a real-time, lightweight UI)
  • Utilities: tabulate (for pretty-printing tables in the CLI)

The system works by having enqueue commands write jobs to the SQLite database. Multiple worker processes poll this database, atomically acquiring the next available job (ordered by priority), and then executing it.



Assumptions & Trade-offs

This project was built with specific design decisions to balance simplicity, robustness, and feature-completeness.

  • SQLite vs. Postgres/Redis: SQLite was chosen for its simplicity, ease of setup (no external server), and ability to fulfill the "file-based persistence" requirement. It provides robust, transactional, and concurrent-safe operations for this scale. The trade-off is that it's not ideal for a massively distributed system, where a dedicated server like Postgres or Redis would be superior.

  • multiprocessing vs. asyncio: multiprocessing was used for concurrency. Because jobs are external commands (subprocess.run), they are inherently blocking. multiprocessing allows these blocking tasks to run in true parallel by bypassing Python's Global Interpreter Lock (GIL). The trade-off is that multiprocessing has higher memory overhead than an asyncio approach, but asyncio would be more complex to manage with blocking subprocesses.

  • htmx vs. React/Vue: The web dashboard uses Flask and HTMX. This allows for a dynamic, real-time UI with zero custom JavaScript. HTMX leverages server-side-rendered HTML fragments. The trade-off is that it's not a full Single Page Application (SPA), but it is far simpler and faster to develop for this type of monitoring tool.

  • shell=True Security: The worker uses subprocess.run(..., shell=True) to allow users to enqueue complex commands (e.g., echo "hi" && exit 1). This is a known security trade-off. In a production system, this would be disabled, and commands would be parsed into a list (e.g., ['echo', 'hi']) to prevent shell injection attacks.

Setup and Installation

1. Clone the repository

git clone <your-repo-link>
cd queuectl-project

2. Create a virtual environment

Windows (PowerShell):

python -m venv venv
.\venv\Scripts\Activate.ps1

Linux/Mac:

python3 -m venv venv
source venv/bin/activate

3. Install dependencies

First, create a requirements.txt file:

pip freeze > requirements.txt

Then install:

pip install -r requirements.txt

4. Initialize the Database

This is a required first step. It creates the ~/.queuectl/jobs.db file and all tables.

python queuectl.py init

CLI Usage

All commands are run through python queuectl.py.

1. start-workers

Start the worker processes. This is the "engine" of the queue.

# Run 3 workers in parallel
python queuectl.py start-workers --count 3

# Output:
# Starting 3 workers... (Press Ctrl+C to stop)
# worker(pid1234) starting...
# worker(pid1235) starting...
# worker(pid1236) starting...

2. enqueue

Add a new job. This is where you set all job options.

# Enqueue a simple job
python queuectl.py enqueue timeout /t 2

# Enqueue a high-priority, delayed job with a 5s timeout
python queuectl.py enqueue --priority 10 --delay 30 --timeout 5 "echo 'High priority!'"

Options:

  • --priority: Set job priority (higher numbers run first). Default: 0
  • --delay: Delay execution by N seconds. Default: 0
  • --timeout: Set maximum execution time in seconds. Default: 30

3. status

Get a high-level summary of all jobs by state.

python queuectl.py status

# Output:
# +-----------+---------+
# | state     |   count |
# +===========+=========+
# | pending   |       1 |
# | completed |      10 |
# | dead      |       2 |
# +-----------+---------+

4. list

List all jobs in a specific state.

python queuectl.py list --state pending

# Output:
# +------------------+------------------+----------+-------------------+
# | id               | command          | attempts | updated_at        |
# +==================+==================+==========+===================+
# | 12a4...          | echo 'High prio' | 0        | 2025-11-10T14:30:00 |
# +------------------+------------------+----------+-------------------+

Available states: pending, running, completed, failed, dead

5. dlq

Manage the Dead Letter Queue (DLQ).

# List failed jobs
python queuectl.py dlq list

# Retry a failed job (moves it back to 'pending')
python queuectl.py dlq retry <job-id-from-list>

6. inspect

Get detailed logs and metrics for a single job.

python queuectl.py inspect <job-id>

# Output:
# --- Job Details: 12a4... ---
# State     completed
# Command   timeout /t 2
# Priority  10
# Attempts  1 / 3
# Timeout   30s
#
# --- Timestamps ---
# Created   2025-11-10T14:30:00
# Next Run  2025-11-10T14:30:00
# Started   2025-11-10T14:30:01
# Finished  2025-11-10T14:30:03
# Duration  2.12s
#
# --- STDOUT ---
# Waiting for 2 seconds, press a key to continue ...

7. dashboard

Start the (bonus) live web dashboard.

python queuectl.py dashboard

# Output:
# Starting web dashboard on http://127.0.0.1:5000/
# (Your browser will open automatically)

The dashboard provides:

  • Real-time job status updates
  • Visual queue statistics
  • Live job log streaming
  • One-click job retry from the UI

Testing

To validate all features, use the test-advanced.ps1 script. This requires two terminals.

Terminal 1:

# Start the workers
python queuectl.py start-workers

Terminal 2:

# Run the automated test script
.test2.ps1

What the test script does:

  1. Initializes the database
  2. Enqueues a mix of high/low priority jobs to test the priority queue
  3. Enqueues a delayed job to test the scheduler
  4. Enqueues a job that is guaranteed to time out
  5. Enqueues a job with a bad command to test the DLQ
  6. Waits 20 seconds for all jobs to process
  7. Prints the final status, completed, and DLQ lists for verification

After the script finishes, you can use python queuectl.py inspect <job-id> on any of the jobs to see their detailed logs.


Project Structure

QUEUECTL/
├── __pycache__/                  # Python cache files
├── .pytest_cache/                # Pytest cache files
├── queue_system/                 # Main package directory
│   ├── __pycache__/
│   ├── __init__.py              # Package initialization
│   ├── config.py                # Configuration settings
│   ├── storage.py               # Database operations
│   └── worker.py                # Worker process logic
├── scripts/                      # Utility scripts
├── templates/                    # HTML templates for dashboard
│   ├── jobs.html                # Job listing template
│   ├── _stats.html              # Statistics partial
│   └── index.html               # Main dashboard page
├── tests/                        # Test files
├── virt/                         # Virtual environment
├── .gitignore                    # Git ignore rules
├── dashboard.py                  # Flask web dashboard
├── queuectl.py                   # Main CLI entry point
├── test.ps1                      # PowerShell test script
├── test2.ps1                     # Additional test script
├── requirements.txt              # Python dependencies
├── README.md                     # This file
└── ~/.queuectl/                  # Database directory (created on init)
    └── jobs.db                   # SQLite database

Troubleshooting

Database locked errors

If you see "database is locked" errors, ensure only one instance of workers is running.

Jobs not executing

  1. Verify workers are running: python queuectl.py start-workers
  2. Check job state: python queuectl.py list --state pending
  3. Inspect specific job: python queuectl.py inspect <job-id>

Dashboard not loading

Ensure Flask is installed: pip install flask htmx-flask


License

This project is submitted as part of the Backend Developer Internship Assignment.


Author

[Amit Mishra]


Acknowledgments

Built with Python, SQLite, Click, Flask, and HTMX.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published