Skip to content

Procrastinate queue setup #668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: deployments-webhook
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ ARG REDIS_ENABLED=true
ARG BACKEND_ENABLED=true
ARG FRONTEND_ENABLED=true
ARG CRON_ENABLED=true
ARG QUEUE_ENABLED=true
ARG DEFAULT_SYNC_DAYS=31
ARG BUILD_DATE
ARG MERGE_COMMIT_SHA
Expand Down Expand Up @@ -100,6 +101,8 @@ RUN apt-get update && \
&& touch /var/log/sync_server/sync_server.log \
&& mkdir -p /var/log/web-server \
&& touch /var/log/web-server/web-server.log \
&& mkdir -p /var/log/queue \
&& touch /var/log/queue/queue.log \
&& mkdir -p /var/log/cron \
&& touch /var/log/cron/cron.log \
&& chmod 0644 /etc/cron.d/cronjob \
Expand All @@ -122,6 +125,7 @@ ENV REDIS_ENABLED=true
ENV BACKEND_ENABLED=true
ENV FRONTEND_ENABLED=true
ENV CRON_ENABLED=true
ENV QUEUE_ENABLED=true
ENV BUILD_DATE=$BUILD_DATE
ENV MERGE_COMMIT_SHA=$MERGE_COMMIT_SHA

Expand Down
4 changes: 4 additions & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ARG REDIS_ENABLED=true
ARG BACKEND_ENABLED=true
ARG FRONTEND_ENABLED=true
ARG CRON_ENABLED=true
ARG QUEUE_ENABLED=true
ARG DB_PORT=5434
ARG BUILD_DATE
ARG MERGE_COMMIT_SHA
Expand Down Expand Up @@ -57,6 +58,8 @@ RUN mkdir -p /var/log/sync_server
RUN touch /var/log/sync_server/sync_server.log
RUN mkdir -p /var/log/web-server
RUN touch /var/log/web-server/web-server.log
RUN mkdir -p /var/log/queue
RUN touch /var/log/queue/queue.log
RUN mkdir -p /var/log/cron
RUN touch /var/log/cron/cron.log
RUN chmod 0644 /etc/cron.d/cronjob
Expand All @@ -73,6 +76,7 @@ ENV REDIS_ENABLED=true
ENV BACKEND_ENABLED=true
ENV FRONTEND_ENABLED=true
ENV CRON_ENABLED=true
ENV QUEUE_ENABLED=true
ENV ENVIRONMENT=dev
ENV BUILD_DATE=$BUILD_DATE
ENV MERGE_COMMIT_SHA=$MERGE_COMMIT_SHA
Expand Down
35 changes: 35 additions & 0 deletions backend/analytics_server/procrastinate_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from procrastinate import App, PsycopgConnector
from os import getenv
from flask import Flask
import logging
from mhq.store import configure_db_with_app
from env import load_app_env

load_app_env()

# Configure logging
logging.basicConfig(
format='[%(asctime)s] [%(process)d] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO
)

# Create Flask app
flask_app = Flask(__name__)
configure_db_with_app(flask_app)

# Create Procrastinate app
app = App(
connector=PsycopgConnector(
kwargs={
"host": getenv("DB_HOST"),
"port": getenv("DB_PORT"),
"user": getenv("DB_USER"),
"password": getenv("DB_PASS"),
"dbname": getenv("DB_NAME"),
}
),
# import_paths=["mhq.service.queue.procrastinate_webhook_queue"]
)

app.open()
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ psycopg2==2.9.3
python-dotenv==1.0.1
gunicorn==22.0.0
Flask-SQLAlchemy==3.1.1
procrastinate==3.1.0
20 changes: 20 additions & 0 deletions cli/source/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ const CliUi = () => {
dispatch(appSlice.actions.setLogSource(LogSource.All));
} else if (lowerCaseInput === 's') {
dispatch(appSlice.actions.setLogSource(LogSource.SyncServer));
} else if (lowerCaseInput === 'd') {
dispatch(appSlice.actions.setLogSource(LogSource.Queue));
}
}

Expand Down Expand Up @@ -543,6 +545,21 @@ const CliUi = () => {
<Text color="yellow">STARTING</Text>
)}
</Text>
<Text>
<Text bold color="green">
[d]
</Text>{' '}
<Text inverse={logSource === LogSource.Queue}>
{'queue logs'.padEnd(40, ' ')}
</Text>{' '}
{readyServices.includes(LogSource.Queue) ? (
<Text bold color="green">
READY
</Text>
) : (
<Text color="yellow">STARTING</Text>
)}
</Text>
<Newline />
<Text>
<Text bold color="yellow">
Expand Down Expand Up @@ -608,6 +625,9 @@ const CliUi = () => {
--
</Text>
<Text bold>{`http://localhost:${sync_server_port}`}</Text>
<Text bold color="grey">
--
</Text>
</Box>
)}
</Box>
Expand Down
2 changes: 2 additions & 0 deletions cli/source/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export enum LogSource {
Redis,
InitDb,
Postgres,
Queue,
Cron,
DockerWatch,
DockerWatchProcessIdLock
Expand Down Expand Up @@ -81,6 +82,7 @@ export const READY_MESSAGES = {
[LogSource.Postgres]: `database system is ready to accept connections`,
[LogSource.Redis]: `Ready to accept connections`,
[LogSource.InitDb]: [`exit 0`, `Writing: ./db/schema.sql`],
[LogSource.Queue]: `Starting worker`,
[LogSource.DockerWatch]: [
`Watch configuration for service`,
`Watch enabled`,
Expand Down
2 changes: 2 additions & 0 deletions cli/source/hooks/useLogs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export const useLogs = (
return ['postgres/postgres', 'pgs', '#ff70a6'];
case LogSource.Cron:
return ['cron/cron', 'cro', '#ffd166'];
case LogSource.Queue:
return ['queue/queue', 'que', '#3a86ff'];
default:
return def;
}
Expand Down
6 changes: 6 additions & 0 deletions cli/source/hooks/useLogsFromAllSources.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export const useLogsFromAllSources = () => {
const initDbLogs = useLogs(LogSource.InitDb, addLogs(LogSource.InitDb));
const pgLogs = useLogs(LogSource.Postgres, addLogs(LogSource.Postgres));
const cronLogs = useLogs(LogSource.Cron, addLogs(LogSource.Cron));
const queueLogs = useLogs(LogSource.Queue, addLogs(LogSource.Queue));

const webLogsRef = useRef(webLogs);
const apiLogsRef = useRef(apiLogs);
Expand All @@ -53,6 +54,7 @@ export const useLogsFromAllSources = () => {
const initDbLogsRef = useRef(initDbLogs);
const pgLogsRef = useRef(pgLogs);
const cronLogsRef = useRef(cronLogs);
const queueLogsRef = useRef(queueLogs);
const allLogsRef = useRef(allLogs);

webLogsRef.current = webLogs;
Expand All @@ -62,6 +64,7 @@ export const useLogsFromAllSources = () => {
initDbLogsRef.current = initDbLogs;
pgLogsRef.current = pgLogs;
cronLogsRef.current = cronLogs;
queueLogsRef.current = queueLogs;
allLogsRef.current = allLogs;

useEffect(() => {
Expand Down Expand Up @@ -90,6 +93,9 @@ export const useLogsFromAllSources = () => {
case LogSource.Cron:
newLogs.push(...cronLogsRef.current.slice(-HIST_LIMIT));
break;
case LogSource.Queue:
newLogs.push(...queueLogsRef.current.slice(-HIST_LIMIT));
break;
case LogSource.All:
default:
newLogs.push(...allLogsRef.current.slice(-HIST_LIMIT));
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
BACKEND_ENABLED: ${BACKEND_ENABLED:-true}
FRONTEND_ENABLED: ${FRONTEND_ENABLED:-true}
CRON_ENABLED: ${CRON_ENABLED:-true}
QUEUE_ENABLED: ${QUEUE_ENABLED:-true}
DB_PORT: ${DB_PORT:-5434}

env_file:
Expand Down
1 change: 1 addition & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ REDIS_ENABLED=true
BACKEND_ENABLED=true
FRONTEND_ENABLED=true
CRON_ENABLED=true
QUEUE_ENABLED=true

DB_HOST=localhost
DB_NAME=mhq-oss
Expand Down
14 changes: 14 additions & 0 deletions setup_utils/init_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ DB_URL="postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_

/usr/local/bin/dbmate -u "$DB_URL" up

# Function to check whether to run procrastinate schema command
check_and_apply_procrastinate_schema() {
TABLE_EXISTS=$(su - postgres -c "psql -U postgres -d $POSTGRES_DB -tAc \
\"SELECT to_regclass('public.procrastinate_jobs') IS NOT NULL;\"")

if [ "$TABLE_EXISTS" = "t" ]; then
echo "Procrastinate schema already applied."
else
PYTHONPATH=/app/backend/analytics_server PROCRASTINATE_APP=procrastinate_worker.app procrastinate schema --apply
echo "Procrastinate schema applied successfully."
fi
}

check_and_apply_procrastinate_schema

MESSAGE="mhq-oss DB initialized"
TOPIC="db_init"
Expand Down
21 changes: 21 additions & 0 deletions setup_utils/start_procrastinate_worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

TOPIC="db_init"
SUB_DIR="/tmp/pubsub"

# Function to wait for message on a topic
wait_for_message() {
while [ ! -f "$SUB_DIR/$TOPIC" ]; do
sleep 1
done
MESSAGE=$(cat "$SUB_DIR/$TOPIC")
echo "Received message: $MESSAGE"
}

# Wait for message on the specified topic
wait_for_message

cd /app/backend/analytics_server

# Start Procrastinate worker
PYTHONPATH=. PROCRASTINATE_APP=procrastinate_worker.app procrastinate worker
19 changes: 19 additions & 0 deletions setup_utils/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,22 @@ retry=3
retry_delay=5
environment=CRON_ENABLED=%(ENV_CRON_ENABLED)s
autostart=%(ENV_CRON_ENABLED)s

[program:procrastinate_worker]
priority=6
directory=/app/setup_utils
command=/bin/bash -c "chmod +x ./start_procrastinate_worker.sh && ./start_procrastinate_worker.sh"
startsecs=10
numprocs=3
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/queue/queue.log
stderr_logfile=/var/log/queue/queue.log
stdout_logfile_maxbytes=512KB
stderr_logfile_maxbytes=512KB
stdout_logfile_backups=0
stderr_logfile_backups=0
autorestart=true
retry=3
retry_delay=5
environment=QUEUE_ENABLED=%(ENV_QUEUE_ENABLED)s
autostart=%(ENV_QUEUE_ENABLED)s
14 changes: 13 additions & 1 deletion web-server/app/api/stream/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,24 @@ async function isPostgresUp(): Promise<boolean> {
}
}

async function isQueueUp(): Promise<boolean> {
try {
const response = await executeCommand(
`PYTHONPATH=../backend/analytics_server PROCRASTINATE_APP=procrastinate_worker.app procrastinate healthchecks`
);
return response.includes('Found procrastinate_jobs table: OK');
} catch {
return false;
}
}

async function checkServiceStatus(serviceName: ServiceNames): Promise<boolean> {
const statusCheckers = {
[ServiceNames.API_SERVER]: isApiServerUp,
[ServiceNames.SYNC_SERVER]: isSyncServerUp,
[ServiceNames.REDIS]: isRedisUp,
[ServiceNames.POSTGRES]: isPostgresUp
[ServiceNames.POSTGRES]: isPostgresUp,
[ServiceNames.QUEUE]: isQueueUp
};

const checker = statusCheckers[serviceName];
Expand Down
10 changes: 8 additions & 2 deletions web-server/src/components/Service/SystemLog/FormattedLog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,23 @@ export const FormattedLog = ({ log }: { log: ParsedLog; index: number }) => {
[theme]
);

const { timestamp, ip, logLevel, message } = log;
const { timestamp, ip, pid, logLevel, message } = log;
return (
<Line mono marginBottom={1}>
<Line component="span" color="info">
{timestamp}
</Line>{' '}
{ip && (
<Line component="span" color="primary">
{ip}{' '}
{ip}
</Line>
)}
{!ip && pid && (
<Line component="span" color="primary">
[{pid}]
</Line>
)}
{' '}
<Line component="span" color={getLevelColor(logLevel)}>
[{logLevel}]
</Line>{' '}
Expand Down
6 changes: 4 additions & 2 deletions web-server/src/components/Service/SystemStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ const serviceTitle: Record<ServiceNames, string> = {
[ServiceNames.API_SERVER]: 'Backend Server',
[ServiceNames.REDIS]: 'Redis Database',
[ServiceNames.POSTGRES]: 'Postgres Database',
[ServiceNames.SYNC_SERVER]: 'Sync Server'
[ServiceNames.SYNC_SERVER]: 'Sync Server',
[ServiceNames.QUEUE]: 'Queue Service'
};

const serviceColor: Record<ServiceNames, string> = {
[ServiceNames.API_SERVER]: '#06d6a0',
[ServiceNames.REDIS]: '#ef476f',
[ServiceNames.POSTGRES]: '#ff70a6',
[ServiceNames.SYNC_SERVER]: '#ab34eb'
[ServiceNames.SYNC_SERVER]: '#ab34eb',
[ServiceNames.QUEUE]: '#3a86ff'
};

export const SystemStatus: FC = () => {
Expand Down
3 changes: 2 additions & 1 deletion web-server/src/constants/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export enum ServiceNames {
API_SERVER = 'api-server-service',
REDIS = 'redis-service',
POSTGRES = 'postgres-service',
SYNC_SERVER = 'sync-server-service'
SYNC_SERVER = 'sync-server-service',
QUEUE = 'queue-service'
}
4 changes: 4 additions & 0 deletions web-server/src/constants/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const LOG_FILES: LogFile[] = [
{
path: '/var/log/postgres/postgres.log',
serviceName: ServiceNames.POSTGRES
},
{
path: '/var/log/queue/queue.log',
serviceName: ServiceNames.QUEUE
}
];

Expand Down
3 changes: 2 additions & 1 deletion web-server/src/slices/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const initialState: State = {
[ServiceNames.API_SERVER]: { isUp: false, logs: [] },
[ServiceNames.REDIS]: { isUp: false, logs: [] },
[ServiceNames.POSTGRES]: { isUp: false, logs: [] },
[ServiceNames.SYNC_SERVER]: { isUp: false, logs: [] }
[ServiceNames.SYNC_SERVER]: { isUp: false, logs: [] },
[ServiceNames.QUEUE]: { isUp: false, logs: [] },
},
loading: true
};
Expand Down
1 change: 1 addition & 0 deletions web-server/src/types/resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1060,4 +1060,5 @@ export interface ParsedLog {
message: string;
role?: string;
ip?: string;
pid?: string;
}
3 changes: 2 additions & 1 deletion web-server/src/utils/logFormatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import {
export const parseLogLine = (rawLogLine: string): ParsedLog | null => {
const generalLogMatch = rawLogLine.match(generalLogRegex);
if (generalLogMatch) {
const [_fullLog, timestamp, _pid, logLevel, message] = generalLogMatch;
const [_fullLog, timestamp, pid, logLevel, message] = generalLogMatch;
return {
timestamp,
pid,
logLevel,
message
};
Expand Down