Skip to content

Commit 59e4370

Browse files
Adding new code for tubes (#1)
1 parent e04eaab commit 59e4370

33 files changed

+711
-308
lines changed

arend/__init__.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +0,0 @@
1-
from arend.queue.consumer import consumer
2-
from multiprocessing import Pool
3-
from typing import List
4-
5-
import click
6-
import logging
7-
8-
logger = logging.getLogger(__name__)
9-
10-
11-
@click.command(name="Start Processor")
12-
@click.option('--queues', type=List[str], help='Queue names')
13-
@click.option('--concurrency', type=int, default=1, help='Queue concurrency')
14-
@click.option('--verbose', is_flag=True, default=False, help='Verbose')
15-
def pool_processor(queues, concurrency, verbose):
16-
click.echo("Getting the Pool...")
17-
p = Pool()
18-
try:
19-
click.echo("Starting workers...")
20-
p.map(consumer, [(queue, verbose) for queue in queues] * concurrency)
21-
p.close()
22-
except KeyboardInterrupt:
23-
click.echo("Keyboard Exception, terminating pool...")
24-
p.terminate()
25-
click.echo("Pool terminated...")
26-
except Exception as e:
27-
click.echo(f"Exception '{e}', terminating pool...")
28-
p.terminate()
29-
click.echo("Pool terminated...")
30-
finally:
31-
click.echo("Joining processes...")
32-
p.join()
33-
click.echo("Processes joined...")
34-
35-
36-
if __name__ == "__main__":
37-
pool_processor()

arend/async_task.py

Lines changed: 0 additions & 66 deletions
This file was deleted.

arend/backend/mongo.py

Lines changed: 0 additions & 16 deletions
This file was deleted.

arend/backends/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from .mongo import MongoBackend
2+
from .redis import RedisBackend
3+
from .sql import SqlBackend
4+
from arend.settings import settings
5+
from functools import lru_cache
6+
7+
8+
@lru_cache
9+
def get_queue_backend():
10+
backends = {
11+
"redis": RedisBackend,
12+
"mongo": MongoBackend,
13+
"sql": SqlBackend,
14+
}
15+
return backends.get(settings.broker)
16+
17+
18+
QueueBroker = get_queue_backend()

arend/backends/base.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# from arend.settings import settings
2+
# from pymongo import MongoClient
3+
# from pymongo.collection import Collection
4+
5+
import logging
6+
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class TasksBackend:
12+
def __init__(self):
13+
pass
14+
15+
def __enter__(self):
16+
return self
17+
18+
def __exit__(self, exc_type, exc_val, exc_tb):
19+
self.close()
20+
21+
def close(self):
22+
pass
23+
24+
def get_(self):
25+
pass

arend/backends/mongo.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from arend.settings import settings
2+
from pymongo import MongoClient
3+
from pymongo.collection import Collection
4+
5+
import logging
6+
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class MongoBackend:
12+
def __init__(self):
13+
self.db: MongoClient = MongoClient(settings.mongodb_string)
14+
collection = settings.mongodb_notifier_task_results
15+
self.tasks_collection: Collection = self.db[collection]
16+
17+
def __enter__(self):
18+
return self
19+
20+
def __exit__(self, exc_type, exc_val, exc_tb):
21+
self.db.close()
22+
23+
def find_one(self):
24+
pass
25+
26+
def update_one(self):
27+
pass

arend/backends/redis.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from arend.settings import settings
2+
from pymongo import MongoClient
3+
from pymongo.collection import Collection
4+
5+
import logging
6+
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class RedisBackend:
12+
def __init__(self):
13+
self.db: MongoClient = MongoClient(settings.mongodb_string)
14+
collection = settings.mongodb_notifier_task_results
15+
self.tasks_collection: Collection = self.db[collection]
16+
17+
def __enter__(self):
18+
return self
19+
20+
def __exit__(self, exc_type, exc_val, exc_tb):
21+
self.db.close()
22+
23+
def find_one(self):
24+
pass
25+
26+
def update_one(self):
27+
pass

arend/backends/sql.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from arend.settings import settings
2+
from pymongo import MongoClient
3+
from pymongo.collection import Collection
4+
5+
import logging
6+
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class SqlBackend:
12+
def __init__(self):
13+
self.db: MongoClient = MongoClient(settings.mongodb_string)
14+
collection = settings.mongodb_notifier_task_results
15+
self.tasks_collection: Collection = self.db[collection]
16+
17+
def __enter__(self):
18+
return self
19+
20+
def __exit__(self, exc_type, exc_val, exc_tb):
21+
self.db.close()
22+
23+
def find_one(self):
24+
pass
25+
26+
def update_one(self):
27+
pass

arend/brokers/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from .beanstalkd import BeanstalkdBroker
2+
from .redis import RedisBroker
3+
from .sqs import SQSBroker
4+
from arend.settings import settings
5+
from functools import lru_cache
6+
7+
8+
@lru_cache
9+
def get_queue_broker():
10+
brokers = {
11+
"beanstalk": BeanstalkdBroker,
12+
"redis": RedisBroker,
13+
"sqs": SQSBroker,
14+
}
15+
return brokers.get(settings.broker)
16+
17+
18+
QueueBroker = get_queue_broker()

arend/brokers/base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import logging
2+
import pystalkd
3+
import uuid
4+
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class BaseBroker:
10+
def __enter__(self):
11+
return self
12+
13+
def __exit__(self, exc_type, exc_val, exc_tb):
14+
raise NotImplementedError
15+
16+
def add_to_queue(self, task_uuid: uuid.UUID):
17+
raise NotImplementedError
18+
19+
def reserve(self, timeout: int = None) -> pystalkd.Job:
20+
raise NotImplementedError
21+
22+
def delete(self, job):
23+
raise NotImplementedError

0 commit comments

Comments
 (0)