Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ For further detail about kernel spec customisation see [command line usage](http

- When a message is received the `msg_handler` is called with:
- 'job' (a dict of `msg`, `received_time` and `ident`)
- the [`channel`](#channel)
- The `channel`
- `msg_type`
- A function `send_reply`

Expand Down
55 changes: 12 additions & 43 deletions src/async_kernel/caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from async_kernel import utils
from async_kernel.common import Fixed
from async_kernel.pending import Pending, PendingCancelled, PendingGroup, PendingTracker, checkpoint
from async_kernel.typing import Backend, CallerCreateOptions, CallerState, NoValue, PendingCreateOptions, T
from async_kernel.typing import Backend, CallerCreateOptions, CallerState, NoValue, T

with contextlib.suppress(ImportError):
# Monkey patch sniffio.current_async_library` with aiologic's version which does a better job.
Expand Down Expand Up @@ -596,7 +596,6 @@ def schedule_call(
func: Callable[..., CoroutineType[Any, Any, T] | T],
args: tuple,
kwargs: dict,
pending_create_options: PendingCreateOptions | None = None,
context: contextvars.Context | None = None,
/,
**metadata: Any,
Expand All @@ -611,14 +610,13 @@ def schedule_call(
func: The function to be called. If it returns a coroutine, it will be awaited and its result will be returned.
args: Arguments corresponding to in the call to `func`.
kwargs: Keyword arguments to use with in the call to `func`.
pending_create_options: Options are passed to [Pending][async_kernel.pending.Pending].
context: The context to use, if not provided the current context is used.
**metadata: Additional metadata to store in the instance.
"""
if self._state in {CallerState.stopping, CallerState.stopped}:
msg = f"{self} is {self._state.name}!"
raise RuntimeError(msg)
pen = Pending(pending_create_options, func=func, args=args, kwargs=kwargs, caller=self, **metadata)
pen = Pending(func=func, args=args, kwargs=kwargs, caller=self, **metadata)
self._queue.append((context or contextvars.copy_context(), pen))
self._resume()
return pen
Expand Down Expand Up @@ -735,26 +733,23 @@ def queue_get(self, func: Callable) -> Pending[None] | None:
"""
return self._queue_map.get(hash(func))

def queue_call_advanced(
def queue_call(
self,
func: Callable[P, T | CoroutineType[Any, Any, T]],
args: tuple,
kwargs: dict,
/,
*,
allow_tracking: NoValue | bool = NoValue, # pyright: ignore[reportInvalidTypeForm]
track: bool = True,
*args: P.args,
**kwargs: P.kwargs,
) -> Pending[T]:
"""
Queue the execution of `func` in a queue unique to it and the caller instance (thread-safe).

The returned pending is 'resettable' and will provide the result of the most recent successful
call once the queue has been emptied. Exceptions are not set, instead the result would be `None`.

Args:
func: The function.
args: Arguments to use with `func`.
kwargs: Keyword arguments to use with `func`.
allow_tracking: Used for the first call for `func`. Defaults to the value of `track` if not provided.
track: Allow the present call to be tracked by a [PendingTracker][async_kernel.pending.PendingTracker].
This includes the subclasses [PendingManager][async_kernel.pending.PendingManager] and [PendingGroup][async_kernel.pending.PendingGroup].
*args: Arguments to use with `func`.
**kwargs: Keyword arguments to use with `func`.

Returns:
Pending: The pending where the queue loop is running.
Expand Down Expand Up @@ -807,39 +802,13 @@ async def queue_loop() -> None:
finally:
self._queue_map.pop(key)

options = PendingCreateOptions(allow_tracking=track if allow_tracking is NoValue else allow_tracking)
self._queue_map[key] = pen_ = self.schedule_call(
queue_loop, (), {}, options, key=key, queue=queue, resume=noop
)
self._queue_map[key] = pen_ = self.schedule_call(queue_loop, (), {}, key=key, queue=queue, resume=noop)
pen_.metadata["queue"].append((func, args, kwargs))
pen_.metadata["resume"]()
if track:
if pen_.trackers:
PendingTracker.add_to_pending_trackers(pen_)
return pen_ # pyright: ignore[reportReturnType]

def queue_call(
self,
func: Callable[P, T | CoroutineType[Any, Any, T]],
/,
*args: P.args,
**kwargs: P.kwargs,
) -> Pending[T]:
"""
Queue the execution of `func` in a queue unique to it and the caller instance (thread-safe).

The returned pending is 'resettable' and will provide the result of the most recent successful
call once the queue has been emptied. Exceptions are not set, instead the result would be `None`.

Args:
func: The function.
*args: Arguments to use with `func`.
**kwargs: Keyword arguments to use with `func`.

Returns:
Pending: The pending where the queue loop is running.
"""
return self.queue_call_advanced(func, args, kwargs, track=True)

def queue_close(self, func: Callable | int) -> None:
"""
Close the execution queue associated with `func` (thread-safe).
Expand Down
2 changes: 1 addition & 1 deletion src/async_kernel/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def msg_handler(
case RunMode.direct:
self.callers[channel].call_direct(handler, job)
case RunMode.queue:
self.callers[channel].queue_call_advanced(handler, (job,), {}, allow_tracking=True, track=False)
self.callers[channel].queue_call(handler, job).trackers = () # A slight optimisation
case RunMode.task:
self.callers[channel].call_soon(handler, job)
case RunMode.thread:
Expand Down
Loading
Loading