Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions sergeant/connector/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,73 @@ def queue_pop_bulk(
return regular_items + delayed_items


class OrderedSetQueueRedis(
QueueRedis,
):
def queue_pop(
self,
queue_name: str,
) -> typing.Optional[typing.Any]:
result = self.zpopmin(
name=f'{queue_name}',
count=1,
)
if result:
item, item_score = result[0]

return item

result = self.zpopmin(
name=f'{queue_name}.delayed',
count=1,
)
if not result:
return None
else:
delayed_item, delayed_item_score = result[0]

if delayed_item_score <= int(time.time()):
return delayed_item
else:
self.zadd(
name=f'{queue_name}.delayed',
mapping={
delayed_item: delayed_item_score,
},
nx=False,
)

return None

def queue_pop_bulk(
self,
queue_name: str,
number_of_items: int,
) -> typing.List[typing.Any]:
pipeline = self.pipeline()
pipeline.lrange(queue_name, 0, number_of_items - 1)
pipeline.ltrim(queue_name, number_of_items, -1)
value = pipeline.execute()

regular_items = value[0]
number_of_regular_items = len(regular_items)
if number_of_regular_items == number_of_items:
return regular_items
else:
delayed_items_to_pull = number_of_items - number_of_regular_items
delayed_items = self.delayed_queue_pop_bulk_script(
keys=[
f'{queue_name}.delayed',
],
args=[
delayed_items_to_pull,
int(time.time()),
],
)

return regular_items + delayed_items


class Connector(
_connector.Connector,
):
Expand Down Expand Up @@ -395,3 +462,71 @@ def lock(
redis_connection=redis_connection,
name=name,
)


class OrderedSetConnector(
Connector,
):
name: str = 'ordered_set_redis'

def __init__(
self,
nodes: typing.List[typing.Dict[str, typing.Any]],
) -> None:
self.connections = [
OrderedSetQueueRedis(
host=node['host'],
port=node['port'],
password=node['password'],
db=node['database'],
retry_on_timeout=True,
socket_keepalive=True,
socket_connect_timeout=10,
socket_timeout=60,
single_connection_client=True,
)
for node in nodes
]
self.number_of_connections = len(self.connections)
self.current_connection_index = random.randint(0, self.number_of_connections - 1)

def queue_push(
self,
queue_name: str,
item: bytes,
priority: str = 'NORMAL',
consumable_from: int = 0,
) -> bool:
if consumable_from == 0:
if priority == 'NORMAL':
consumable_from = 1
self.next_connection.zadd(
name=queue_name,
mapping={
item: consumable_from,
},
nx=False,
)

return True

def queue_push_bulk(
self,
queue_name: str,
items: typing.Iterable[bytes],
priority: str = 'NORMAL',
consumable_from: int = 0,
) -> bool:
if consumable_from == 0:
if priority == 'NORMAL':
consumable_from = 1
self.next_connection.zadd(
name=queue_name,
mapping={
item: consumable_from
for item in items
},
nx=False,
)

return True
2 changes: 2 additions & 0 deletions sergeant/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def init_broker(
connector_obj = connector.mongo.Connector(**self.config.connector.params)
elif self.config.connector.type == connector.redis.Connector.name:
connector_obj = connector.redis.Connector(**self.config.connector.params)
elif self.config.connector.type == connector.redis.OrderedSetConnector.name:
connector_obj = connector.redis.OrderedSetConnector(**self.config.connector.params)
else:
raise ValueError(f'connector type {self.config.connector.type} is not supported')

Expand Down