Skip to content

xadd & xread & xreadgroup use invariant dict typing #129

Closed
@imnotjames

Description

@imnotjames

Along with all of the other choices made by the previous maintainers around typing, it seems the mappings passed to xadd, read, and xreadgroup are invariant and do not allow passing in values like dict[str, str]. This makes sense when you're concerned about someone setting an incompatible value into the dict or mapping. A bit less for cases like this where we just want to read the values. (I think? I'm not an expert on Python typing!)

This can be seen with xreadgroup, when passing in the streams and stream IDs:

def xreadgroup(
self,
groupname: str,
consumername: str,
streams: Dict[KeyT, StreamIdT],
count: Union[int, None] = None,
block: Union[int, None] = None,
noack: bool = False,
) -> ResponseT:

If I were to try to interact with this like such:

stream = { "incoming-text": "$" }
payload = cast(Any, client.xread(stream, None, 0))

'course, things like pyright don't like this and emit an error about it. Invariant types and all that.

Pyright: Argument of type "dict[str, str]" cannot be assigned to parameter "streams" of type "Dict[KeyT, Unknown]" in function "xread"
   "dict[str, str]" is not assignable to "Dict[KeyT, Unknown]"
     Type parameter "_KT@dict" is invariant, but "str" is not the same as "KeyT" (reportArgumentType)

There's a helpful comment about this in the typing.py file.

# Mapping is not covariant in the key type, which prevents
# Mapping[_StringLikeT, X] from accepting arguments of type Dict[str, X]. Using
# a TypeVar instead of a Union allows mappings with any of the permitted types
# to be passed. Care is needed if there is more than one such mapping in a
# type signature because they will all be required to be the same key type.

Using this, we can apply it to StreamIdTto create a version that's covariant.

AnyStreamIdT = TypeVar("AnyStreamIdT", int, bytes, str, memoryview)
 def xreadgroup( 
     self, 
     groupname: str, 
     consumername: str, 
-     streams: Dict[KeyT, StreamIdT], 
+     streams: Mapping[AnyKeyT, AnyStreamIDT],
     count: Union[int, None] = None, 
     block: Union[int, None] = None, 
     noack: bool = False, 
 ) -> ResponseT: 

It seems to work well! I tested it with the above example with xreadgroup, at least. I can't seem to find any issues to this approach -- and it seems to be used elsewhere even if not here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions