Skip to content

x42005e1f/culsans

Repository files navigation

culsans

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous one, between two asynchronous codes in different threads, and for any other combination that you want. Based on the queue module. Built on the aiologic package. Inspired by the janus library.

Like Culsans god, the queue object from the library has two faces: synchronous and asynchronous interface. Unlike Janus library, synchronous interface supports eventlet, gevent, and threading, while asynchronous interface supports asyncio, curio, and trio.

Synchronous is fully compatible with standard queue, asynchronous one follows asyncio queue design.

Installation

Install from PyPI (stable):

pip install culsans

Or from GitHub (latest):

pip install git+https://github.com/x42005e1f/culsans.git

You can also use other package managers, such as uv.

Usage

Three queues are available:

  • Queue
  • LifoQueue
  • PriorityQueue

Each has two properties: sync_q and async_q.

Use the first to get synchronous interface and the second to get asynchronous one.

Example

import anyio
import culsans


def sync_run(sync_q: culsans.SyncQueue[int]) -> None:
    for i in range(100):
        sync_q.put(i)
    else:
        sync_q.join()


async def async_run(async_q: culsans.AsyncQueue[int]) -> None:
    for i in range(100):
        value = await async_q.get()

        assert value == i

        async_q.task_done()


async def main() -> None:
    queue: culsans.Queue[int] = culsans.Queue()

    async with anyio.create_task_group() as tasks:
        tasks.start_soon(anyio.to_thread.run_sync, sync_run, queue.sync_q)
        tasks.start_soon(async_run, queue.async_q)

    queue.shutdown()


anyio.run(main)

Extras

Both interfaces support some additional features that are not found in the original queues.

growing & shrinking

You can dynamically change the upperbound limit on the number of items that can be placed in the queue with queue.maxsize = N. If it increases (growing), the required number of waiting putters will be woken up. If it decreases (shrinking), items exceeding the new limit will remain in the queue, but all putters will be blocked until enough items are retrieved from the queue. And if maxsize is less than or equal to zero, all putters will be woken up.

async with anyio.create_task_group() as tasks:
    async_q = culsans.Queue(1).async_q

    for i in range(4):
        tasks.start_soon(async_q.put, i)

    await anyio.sleep(1e-3)
    assert async_q.qsize() == 1

    async_q.maxsize = 2  # growing

    await anyio.sleep(1e-3)
    assert async_q.qsize() == 2

    async_q.maxsize = 1  # shrinking

    await anyio.sleep(1e-3)
    assert async_q.qsize() == 2

    async_q.get_nowait()

    await anyio.sleep(1e-3)
    assert async_q.qsize() == 1

    async_q.maxsize = 0  # now the queue size is infinite

    await anyio.sleep(1e-3)
    assert async_q.qsize() == 3

peek() & peek_nowait()

If you want to check the first item of the queue, but do not want to remove that item from the queue, you can use the peek() and peek_nowait() methods instead of the get() and get_nowait() methods.

sync_q = culsans.Queue().sync_q

sync_q.put("spam")

assert sync_q.peekable()
assert sync_q.peek() == "spam"
assert sync_q.peek_nowait() == "spam"
assert sync_q.qsize() == 1

These methods can be considered an implementation of partial compatibility with gevent queues.

clear()

In some scenarios it may be necessary to clear the queue. But it is inefficient to do this through a loop, and it causes additional difficulties when it is also necessary to ensure that no new items can be added during the clearing process. For this purpose, there is the atomic clear() method that clears the queue most efficiently.

async with anyio.create_task_group() as tasks:
    async_q = culsans.Queue(3).async_q

    for i in range(5):
        tasks.start_soon(async_q.put, i)

    await anyio.sleep(1e-3)
    assert async_q.qsize() == 3
    assert async_q.clearable()

    async_q.clear()  # clearing

    await anyio.sleep(1e-3)
    assert async_q.qsize() == 2
    assert async_q.get_nowait() == 3
    assert async_q.get_nowait() == 4

Roughly equivalent to:

def clear(queue):
    while True:
        try:
            queue.get_nowait()
        except Empty:
            break
        else:
            queue.task_done()

Subclasses

You can create your own queues by inheriting from existing queue classes as if you were using the queue module. For example, this is how you can create an unordered queue that contains only unique items:

from culsans import Queue


class UniqueQueue(Queue):
    def _init(self, maxsize):
        self.data = set()

    def _qsize(self):
        return len(self.data)

    def _put(self, item):
        self.data.add(item)

    def _get(self):
        return self.data.pop()

    def _peekable(self):
        return False

    _peek = None

    def _clearable(self):
        return True

    def _clear(self):
        self.data.clear()
sync_q = UniqueQueue().sync_q

sync_q.put_nowait(23)
sync_q.put_nowait(42)
sync_q.put_nowait(23)

assert sync_q.qsize() == 2
assert sorted(sync_q.get_nowait() for _ in range(2)) == [23, 42]

All eight of these methods are called in exclusive access mode, so you can freely create your subclasses without thinking about whether your methods are thread-safe or not.

Checkpoints

Sometimes it is useful when each asynchronous call switches execution to the next task and checks for cancellation and timeouts. For example, if you want to distribute CPU usage across all tasks.

The culsans library adopts aiologic's checkpoints, but unlike it does not guarantee that there will only be one per asynchronous call, due to design specifics.

See the aiologic documentation for details on how to control checkpoints.

Compatibility

If you want to use culsans as a backport of the standard queues to older versions of Python (for example, if you need the shutdown() method), you can replace something like this:

sync_q = queue.Queue()
async_q = asyncio.Queue()

with this:

sync_q = culsans.Queue().sync_q
async_q = culsans.Queue().async_q

And if you are using janus in your application and want to switch to culsans, all you have to do is replace this:

import janus

with this:

import culsans as janus

and everything will work!

Documentation

Read the Docs: https://culsans.readthedocs.io (official)

DeepWiki: https://deepwiki.com/x42005e1f/culsans (AI generated; lying!)

There are also related posts:

Communication channels

GitHub Discussions: https://github.com/x42005e1f/culsans/discussions (ideas, questions)

GitHub Issues: https://github.com/x42005e1f/culsans/issues (bug tracker)

You can also send an email to 0x42005e1f@gmail.com with any feedback.

Project status

The project is developed and maintained by one person in his spare time and is not a commercial product. The author is not a professional programmer, so you may encounter some misunderstandings. However, he has been programming as a hobby for over a decade, delving into specific topics (often esoteric ones), and until 2024, he made almost no public contributions (you can find some if you try hard; for example, one very ugly one in Java from 2019), as he set high standards for himself. The author often makes mistakes, so he constantly double-checks and improves himself (which is well reflected in how often he edits his own comments) — as a result, he relies heavily on careful theoretical analysis and proactive bug fixing.

No AI tools are used in the development (nor are IDE tools, for that matter). The only exception is text translation, since the author is not a native English speaker, but the texts themselves are not generated (they are written and edited manually by a human). Unicode characters are also actively used (via a compose key), as the author likes beautiful texts, such as they were before the avoidance of signs of AI writing. — It may not be too obvious, but this paragraph addresses the sore subject of "you are an AI", expressed in... not the best ways.

It is published for the simple reason that the author considered it noteworthy and not too ugly. The topic is quite non-trivial, so although contributions are not prohibited, they will be very, very difficult if you decide to make them (except for some very simple ones). The functionality provided is still being perfected, so the development status is alpha.

What is the goal of the project? To realize the author's vision. Is it worth trusting what is available now? Well, the choice is yours. But the project is already being used, so why not give it a try?

License

The culsans library is REUSE-compliant and is offered under multiple licenses:

  • All original source code is licensed under ISC.
  • All original test code is licensed under 0BSD.
  • All documentation is licensed under CC-BY-4.0.
  • All configuration is licensed under CC0-1.0.
  • Some test code borrowed from python/cpython is licensed under PSF-2.0.

For more accurate information, check the individual files.