Skip to content

Commit

Permalink
ExperimentAxisQuery uses the thread pool from ContextBase (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
ebezzi authored Feb 12, 2024
1 parent 61f6cc9 commit 9b282eb
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
2 changes: 2 additions & 0 deletions python-spec/src/somacore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .query import AxisColumnNames
from .query import AxisQuery
from .query import ExperimentAxisQuery
from .types import ContextBase

try:
# This trips up mypy since it's a generated file:
Expand Down Expand Up @@ -59,4 +60,5 @@
"AxisColumnNames",
"AxisQuery",
"ExperimentAxisQuery",
"ContextBase",
)
3 changes: 2 additions & 1 deletion python-spec/src/somacore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing_extensions import LiteralString, Self

from . import options
from . import types


class SOMAObject(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -65,7 +66,7 @@ def uri(self) -> str:
raise NotImplementedError()

@property
def context(self) -> Any:
def context(self) -> Optional[types.ContextBase]:
"""A value storing implementation-specific configuration information.
This contains long-lived (i.e., not call-specific) information that is
Expand Down
16 changes: 12 additions & 4 deletions python-spec/src/somacore/query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .. import data
from .. import measurement
from .. import options
from .. import types as base_types
from . import _fast_csr
from . import axis
from . import types
Expand Down Expand Up @@ -587,11 +588,14 @@ def _var_df(self) -> data.DataFrame:

@property
def _threadpool(self) -> futures.ThreadPoolExecutor:
"""Creates a thread pool just in time."""
"""
Returns the threadpool provided by the experiment's context.
If not available, creates a thread pool just in time."""
context = self.experiment.context
if context and context.threadpool:
return context.threadpool

if self._threadpool_ is None:
# TODO: the user should be able to set their own threadpool, a la asyncio's
# loop.set_default_executor(). This is important for managing the level of
# concurrency, etc.
self._threadpool_ = futures.ThreadPoolExecutor()
return self._threadpool_

Expand Down Expand Up @@ -797,6 +801,10 @@ def ms(self) -> Mapping[str, measurement.Measurement]:
def obs(self) -> data.DataFrame:
...

@property
def context(self) -> Optional[base_types.ContextBase]:
...


class _HasObsVar(Protocol[_T_co]):
"""Something which has an ``obs`` and ``var`` field.
Expand Down
10 changes: 10 additions & 0 deletions python-spec/src/somacore/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import sys
from concurrent import futures
from typing import TYPE_CHECKING, NoReturn, Optional, Sequence, Type, TypeVar

from typing_extensions import Protocol, TypeGuard
Expand Down Expand Up @@ -75,3 +76,12 @@ def is_slice_of(__obj: object, __typ: Type[_T]) -> TypeGuard[Slice[_T]]:
and (__obj.stop is None or isinstance(__obj.stop, __typ))
and (__obj.step is None or isinstance(__obj.step, __typ))
)


class ContextBase(Protocol):
"""A protocol for a context manager that can be used as a base class.
If a threadpool is specified as part of the context, it will be used by
experiment queries. Otherwise, the implementer will use its own threadpool.
"""

threadpool: Optional[futures.ThreadPoolExecutor]

0 comments on commit 9b282eb

Please sign in to comment.