Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExperimentAxisQuery uses the thread pool from ContextBase #184

Merged
merged 8 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python-spec/src/somacore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .query import AxisColumnNames
from .query import AxisQuery
from .query import ExperimentAxisQuery
from .types import ContextBase

try:
# This trips up mypy since it's a generated file:
Expand Down Expand Up @@ -59,4 +60,5 @@
"AxisColumnNames",
"AxisQuery",
"ExperimentAxisQuery",
"ContextBase",
)
7 changes: 4 additions & 3 deletions python-spec/src/somacore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing_extensions import LiteralString, Self

from . import options
from .types import ContextBase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: in somacore import modules, not things inside modules, so from . import types and use types.ContextBase below



class SOMAObject(metaclass=abc.ABCMeta):
Expand All @@ -24,7 +25,7 @@ def open(
uri: str,
mode: options.OpenMode = "r",
*,
context: Optional[Any] = None,
context: Optional[ContextBase] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this a little further and it turns out there is a deeper wrinkle to this: we wouldn’t be able to override

class SOMAObject(...):
  @classmethod
  def open(..., context: Optional[ContextBase] = None) -> Self: ...

with

class ImplObject(somacore.SOMAObject):
  @classmethod
  def open(..., context: Optional[ImplContext] = None) -> Self: ...
  # not allowed, because `ImplContext` is a narrowing of `ContextBase`,
  # which would violate the Liskov principle:
  # https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides

We could do a bunch of chicanery with another entry in the list of generics, so it would look something like:

_CtxT = TypeVar("_CtxT", bound=ContextBase)
class SOMAObject(Generic[_CtxT]):
  def open(..., context: Optional[_CtxT] = None) -> Self: ...

# elsewhere:

class ImplObject(somacore.SOMAObject[ImplContext]):
  def open(..., context: Optional[ImplContext] = None) -> Self: ...

but that would also require adding a _CtxT to every SOMA object, which would be particularly unwieldy on the already highly-genericised Measurement type:

class Measurement(
collection.BaseCollection[_RootSO],
Generic[_DF, _NDColl, _DenseNDColl, _SparseNDColl, _RootSO],
):

so instead, I think the easier thing to do is to opt out of type-checking for this parameter when being passed in, which, while imperfect, will allow it to be overridden with type information in the implementation. So the end result looks like:

class SOMAObject(metaclass=abc.ABCMeta):
  @classmethod
  def open(..., context: Optional[Any] = None) -> Self:
    ...
  # Ignore type-checking the specific type allowed in `open`

  @property
  def context(self) -> Optional[types.ContextBase]: ...
  # Returning a subclass is OK, though.

Then, at the implementation, we can say:

class ImplObject(SOMAObject):
  @classmethod
  def open(..., context: Optional[ImplContext] = None) -> Self: ...
  # Allowed since the superclass `context` parameter type is unchecked.

  @property
  def context(self) -> Optional[ImplContext]: ...
  # Allowed since you can narrow returned types.

platform_config: Optional[options.PlatformConfig] = None,
) -> Self:
"""Opens the SOMA object of this type at the given URI.
Expand All @@ -42,7 +43,7 @@ def open(

@classmethod
@abc.abstractmethod
def exists(cls, uri: str, *, context: Optional[Any] = None) -> bool:
def exists(cls, uri: str, *, context: Optional[ContextBase] = None) -> bool:
"""Checks whether a SOMA object of this type is stored at the URI.

Args:
Expand All @@ -65,7 +66,7 @@ def uri(self) -> str:
raise NotImplementedError()

@property
def context(self) -> Any:
def context(self) -> Optional[ContextBase]:
"""A value storing implementation-specific configuration information.

This contains long-lived (i.e., not call-specific) information that is
Expand Down
5 changes: 3 additions & 2 deletions python-spec/src/somacore/collection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import abc
from typing import Any, MutableMapping, Optional, Sequence, Type, TypeVar, overload
from typing import MutableMapping, Optional, Sequence, Type, TypeVar, overload

import pyarrow as pa
from typing_extensions import Final, Self

from . import base
from . import data
from . import options
from .types import ContextBase

_Elem = TypeVar("_Elem", bound=base.SOMAObject)
"""Element Type for a SOMA collection."""
Expand Down Expand Up @@ -35,7 +36,7 @@ def create(
uri: str,
*,
platform_config: Optional[options.PlatformConfig] = None,
context: Optional[Any] = None,
context: Optional[ContextBase] = None,
) -> Self:
"""Creates a new collection of this type at the given URI.

Expand Down
5 changes: 3 additions & 2 deletions python-spec/src/somacore/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from . import base
from . import options
from .types import ContextBase

_RO_AUTO = options.ResultOrder.AUTO

Expand All @@ -48,7 +49,7 @@ def create(
index_column_names: Sequence[str] = (options.SOMA_JOINID,),
domain: Optional[Sequence[Optional[Tuple[Any, Any]]]] = None,
platform_config: Optional[options.PlatformConfig] = None,
context: Optional[Any] = None,
context: Optional[ContextBase] = None,
) -> Self:
"""Creates a new ``DataFrame`` at the given URI.

Expand Down Expand Up @@ -231,7 +232,7 @@ def create(
type: pa.DataType,
shape: Sequence[Optional[int]],
platform_config: Optional[options.PlatformConfig] = None,
context: Optional[Any] = None,
context: Optional[ContextBase] = None,
) -> Self:
"""Creates a new ND array of the current type at the given URI.

Expand Down
5 changes: 4 additions & 1 deletion python-spec/src/somacore/ephemeral/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .. import experiment
from .. import measurement
from .. import options
from ..types import ContextBase

_Elem = TypeVar("_Elem", bound=base.SOMAObject)

Expand Down Expand Up @@ -51,7 +52,9 @@ def open(cls, *args, **kwargs) -> NoReturn:
)

@classmethod
def exists(cls, uri: str, *, context: Any = None) -> Literal[False]:
def exists(
cls, uri: str, *, context: Optional[ContextBase] = None
) -> Literal[False]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of the above, this can be left as context: Any = None (i.e. no changes to this file at all).

del uri, context # All unused.
# Ephemeral collections are in-memory only and do not otherwise exist.
return False
Expand Down
18 changes: 14 additions & 4 deletions python-spec/src/somacore/query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .. import data
from .. import measurement
from .. import options
from ..types import ContextBase
from . import _fast_csr
from . import axis
from . import types
Expand Down Expand Up @@ -587,11 +588,16 @@ def _var_df(self) -> data.DataFrame:

@property
def _threadpool(self) -> futures.ThreadPoolExecutor:
"""Creates a thread pool just in time."""
"""
Returns the threadpool provided by the experiment's context.
If not available, creates a thread pool just in time."""
if (
self.experiment.context is not None
and self.experiment.context.threadpool is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since context and context.threadpool will be nonzero if they are set at all, this can be

context = self.experiment.context
if context and context.threadpool:
  return context.threadpool

without the need for is not None.

):
return self.experiment.context.threadpool

if self._threadpool_ is None:
# TODO: the user should be able to set their own threadpool, a la asyncio's
# loop.set_default_executor(). This is important for managing the level of
# concurrency, etc.
self._threadpool_ = futures.ThreadPoolExecutor()
return self._threadpool_

Expand Down Expand Up @@ -797,6 +803,10 @@ def ms(self) -> Mapping[str, measurement.Measurement]:
def obs(self) -> data.DataFrame:
...

@property
def context(self) -> Optional[ContextBase]:
...


class _HasObsVar(Protocol[_T_co]):
"""Something which has an ``obs`` and ``var`` field.
Expand Down
10 changes: 10 additions & 0 deletions python-spec/src/somacore/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import sys
from concurrent import futures
from typing import TYPE_CHECKING, NoReturn, Optional, Sequence, Type, TypeVar

from typing_extensions import Protocol, TypeGuard
Expand Down Expand Up @@ -75,3 +76,12 @@ def is_slice_of(__obj: object, __typ: Type[_T]) -> TypeGuard[Slice[_T]]:
and (__obj.stop is None or isinstance(__obj.stop, __typ))
and (__obj.step is None or isinstance(__obj.step, __typ))
)


class ContextBase(Protocol):
"""A protocol for a context manager that can be used as a base class.
If a threadpool is specified as part of the context, it will be used by the
implementer. Otherwise, the implementer will use its own threadpool.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"it will be used by experiment queries", maybe? the implementer would be the one providing it

"""

threadpool: Optional[futures.ThreadPoolExecutor]