Replies: 3 comments 8 replies
-
|
Hi, I'm having a hard time fully understanding what you mean, but it sounds a bit like you would want to create a QueueManager (QM) per virtual environment. Make sure that the entry point names are unique per QM, as a QM will only dequeue jobs for the entry points it knows of. Here's an updated approach that could align more closely with your requirements:
For example: from pgqueuer import QueueManager
async def qm1_factory(...):
qm = QueueManager(...)
@qm.entrypoint("qm1_entrypoint")
def qm1_entrypoint(...):
...
return qm
async def qm2_factory(...):
qm = QueueManager(...)
@qm.entrypoint("qm2_entrypoint")
def qm2_entrypoint(...):
...
return qmNow you can create two or more virtual environments, each with its own version of the required libraries, and invoke them using: python -m pgqueuer run <path-to-factory-function>This approach lets you isolate different projects by using unique QueueManagers and virtual environments, achieving both separation and the ability to run one job at a time per machine. Let me know if this makes sense or if you'd like more details about the setup! |
Beta Was this translation helpful? Give feedback.
-
|
from your example you register in different entrypoint in one consumer, if i separate the 2 task into 2 consumer file and run each consumer. can it still only run 1 task at a time, is i need add concurrency_limit = 1 ? and I just found this
is this implementation correct ? consumer_1.py consumer_2.py but i still confused for how enqueue task_1 and task_2 simultaneously trigger by different user and what different between concurrency_limit and serialized_dispatch ? |
Beta Was this translation helpful? Give feedback.
-
|
To run one job at a time on a machine across multiple consumers (consumer_1.py, consumer_2.py), use serialized_dispatch=True in your entry points. This ensures only one job globally with the same entry point name runs concurrently. If you run consumer_1 and consumer_2 at the same time, only one of them will pick a job and run it until it's completed. The other will remain idle for the duration of the processing. This is because serialized_dispatch=True enforces strict serialization of job execution. However, this does not guarantee that jobs will be round-robin distributed to the consumers.
In your 'producer.py', you can use batch-insert to enqueue multiple jobs at once: import asyncio
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
# Enqueue multiple jobs using the enqueue function
async def enqueue_jobs():
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
queries = Queries(driver)
await queries.enqueue(
entrypoint=["shared_job", "shared_job"],
payload=[b"task_1_payload", b"task_2_payload"],
priority=[0, 0]
)
# Run the enqueue function
asyncio.run(enqueue_jobs())
Local vs Global Limits
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi,
Is there any example for how registering multiple job in one worker ?
My use case is, i want queue multiple job in one worker so only one job can run at a time in one machine, and if possible i want separate the one worker and each job in different project which is have his own virtual environment that contain different version of library, i plan run the job on subprocess for isolated virtual environment
Beta Was this translation helpful? Give feedback.
All reactions