Skip to content

Commit

Permalink
Make optional types explicit (faust-streaming#255)
Browse files Browse the repository at this point in the history
* Fix optional typing

* Fix import sort / formatting

* Fix formatting

* Update black formatter to 22.1.0

* Fix imports for tests

- Also fix optional types for worker

* Black formatting

* Black formatted

* Revert black changes

* Fix test to reflect optional type

* Use different typing syntax for test

* Revert. Test case doesn't work with optional types

* Remove unused import

Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
  • Loading branch information
dbrattli and patkivikram authored Feb 7, 2022
1 parent bc3cc15 commit 96ce8f0
Show file tree
Hide file tree
Showing 75 changed files with 959 additions and 815 deletions.
2 changes: 1 addition & 1 deletion docs/includes/settingref.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ consider doing retries (send to separate topic):
main_topic = app.topic('main')
deadletter_topic = app.topic('main_deadletter')

async def send_request(value, timeout: float = None) -> None:
async def send_request(value, timeout: Optional[float] = None) -> None:
await app.http_client.get('http://foo.com', timeout=timeout)

@app.agent(main_topic)
Expand Down
4 changes: 2 additions & 2 deletions docs/userguide/application.rst
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,8 @@ This can be used to attach custom headers to Kafka messages:
sender: AppT,
key: bytes = None,
value: bytes = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: List[Tuple[str, bytes]] = None,
**kwargs: Any) -> None:
test = current_test()
Expand Down
4 changes: 2 additions & 2 deletions docs/userguide/models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ time and last modified time:
.. sourcecode:: python

class MyBaseRecord(Record, abstract=True):
time_created: float = None
time_modified: float = None
time_created: Optional[float] = None
time_modified: Optional[float] = None

Inherit from this model to create a new model
having the fields by default:
Expand Down
6 changes: 3 additions & 3 deletions faust/agents/actor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Actor - Individual Agent instances."""
from typing import Any, AsyncGenerator, AsyncIterator, Coroutine, Set, cast
from typing import Any, AsyncGenerator, AsyncIterator, Coroutine, Optional, Set, cast

from mode import Service
from mode.utils.tracebacks import format_agen_stack, format_coro_stack
Expand All @@ -22,8 +22,8 @@ def __init__(
agent: AgentT,
stream: StreamT,
it: _T,
index: int = None,
active_partitions: Set[TP] = None,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
**kwargs: Any,
) -> None:
self.agent = agent
Expand Down
75 changes: 39 additions & 36 deletions faust/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,18 @@ def __init__(
fun: AgentFun,
*,
app: AppT,
name: str = None,
name: Optional[str] = None,
channel: Union[str, ChannelT] = None,
concurrency: int = 1,
sink: Iterable[SinkT] = None,
on_error: AgentErrorHandler = None,
supervisor_strategy: Type[SupervisorStrategyT] = None,
help: str = None,
schema: SchemaT = None,
help: Optional[str] = None,
schema: Optional[SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
isolated_partitions: bool = False,
use_reply_headers: bool = None,
use_reply_headers: Optional[bool] = None,
**kwargs: Any,
) -> None:
self.app = app
Expand Down Expand Up @@ -242,8 +242,8 @@ async def _start_one(
*,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
stream: StreamT = None,
channel: ChannelT = None,
stream: Optional[StreamT] = None,
channel: Optional[ChannelT] = None,
) -> ActorT:
# an index of None means there's only one instance,
# and `index is None` is used as a test by functions that
Expand All @@ -261,7 +261,7 @@ async def _start_one_supervised(
self,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
stream: StreamT = None,
stream: Optional[StreamT] = None,
) -> ActorT:
aref = await self._start_one(
index=index,
Expand Down Expand Up @@ -456,7 +456,7 @@ def clone(self, *, cls: Type[AgentT] = None, **kwargs: Any) -> AgentT:

def test_context(
self,
channel: ChannelT = None,
channel: Optional[ChannelT] = None,
supervisor_strategy: SupervisorStrategyT = None,
on_error: AgentErrorHandler = None,
**kwargs: Any,
Expand Down Expand Up @@ -487,7 +487,7 @@ def _prepare_channel(
self,
channel: Union[str, ChannelT] = None,
internal: bool = True,
schema: SchemaT = None,
schema: Optional[SchemaT] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
**kwargs: Any,
Expand All @@ -514,10 +514,10 @@ def _prepare_channel(
def __call__(
self,
*,
index: int = None,
active_partitions: Set[TP] = None,
stream: StreamT = None,
channel: ChannelT = None,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
stream: Optional[StreamT] = None,
channel: Optional[ChannelT] = None,
) -> ActorRefT:
"""Create new actor instance for this agent."""
# The agent function can be reused by other agents/tasks.
Expand Down Expand Up @@ -545,9 +545,9 @@ def actor_from_stream(
self,
stream: Optional[StreamT],
*,
index: int = None,
active_partitions: Set[TP] = None,
channel: ChannelT = None,
index: Optional[int] = None,
active_partitions: Optional[Set[TP]] = None,
channel: Optional[ChannelT] = None,
) -> ActorRefT:
"""Create new actor from stream."""
we_created_stream = False
Expand Down Expand Up @@ -601,7 +601,10 @@ def add_sink(self, sink: SinkT) -> None:
self._sinks.append(sink)

def stream(
self, channel: ChannelT = None, active_partitions: Set[TP] = None, **kwargs: Any
self,
channel: Optional[ChannelT] = None,
active_partitions: Optional[Set[TP]] = None,
**kwargs: Any,
) -> StreamT:
"""Create underlying stream used by this agent."""
if channel is None:
Expand Down Expand Up @@ -631,9 +634,9 @@ async def _start_task(
*,
index: Optional[int],
active_partitions: Optional[Set[TP]] = None,
stream: StreamT = None,
channel: ChannelT = None,
beacon: NodeT = None,
stream: Optional[StreamT] = None,
channel: Optional[ChannelT] = None,
beacon: Optional[NodeT] = None,
) -> ActorRefT:
# If the agent is an async function we simply start it,
# if it returns an AsyncIterable/AsyncGenerator we start a task
Expand Down Expand Up @@ -750,8 +753,8 @@ async def cast(
value: V = None,
*,
key: K = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
) -> None:
"""RPC operation: like :meth:`ask` but do not expect reply.
Expand All @@ -772,11 +775,11 @@ async def ask(
value: V = None,
*,
key: K = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
) -> Any:
"""RPC operation: ask agent for result of processing value.
Expand All @@ -803,11 +806,11 @@ async def ask_nowait(
value: V = None,
*,
key: K = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
force: bool = False,
) -> ReplyPromise:
"""RPC operation: ask agent for result of processing value.
Expand Down Expand Up @@ -835,7 +838,7 @@ def _create_req(
key: K = None,
value: V = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
headers: HeadersArg = None,
) -> Tuple[V, Optional[HeadersArg]]:
if reply_to is None:
Expand Down Expand Up @@ -871,14 +874,14 @@ async def send(
*,
key: K = None,
value: V = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
callback: Optional[MessageSentCallback] = None,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
"""Send message to topic used by agent."""
Expand Down Expand Up @@ -1091,7 +1094,7 @@ class AgentTestWrapper(Agent, AgentTestWrapperT): # pragma: no cover
_stream: StreamT

def __init__(
self, *args: Any, original_channel: ChannelT = None, **kwargs: Any
self, *args: Any, original_channel: Optional[ChannelT] = None, **kwargs: Any
) -> None:
super().__init__(*args, **kwargs)
self.results = {}
Expand Down Expand Up @@ -1131,7 +1134,7 @@ async def put(
value_serializer: CodecArg = None,
*,
reply_to: ReplyToArg = None,
correlation_id: str = None,
correlation_id: Optional[str] = None,
wait: bool = True,
) -> EventT:
if reply_to:
Expand Down Expand Up @@ -1164,7 +1167,7 @@ def to_message(
*,
partition: Optional[int] = None,
offset: int = 0,
timestamp: float = None,
timestamp: Optional[float] = None,
timestamp_type: int = 0,
headers: HeadersArg = None,
) -> Message:
Expand Down
17 changes: 9 additions & 8 deletions faust/app/_attached.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
List,
MutableMapping,
NamedTuple,
Optional,
Union,
cast,
)
Expand Down Expand Up @@ -91,13 +92,13 @@ async def maybe_put(
channel: Union[ChannelT, str],
key: K = None,
value: V = None,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: SchemaT = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
callback: Optional[MessageSentCallback] = None,
force: bool = False,
) -> Awaitable[RecordMetadata]:
"""Attach message to source topic offset.
Expand Down Expand Up @@ -145,13 +146,13 @@ def put(
channel: Union[str, ChannelT],
key: K,
value: V,
partition: int = None,
timestamp: float = None,
partition: Optional[int] = None,
timestamp: Optional[float] = None,
headers: HeadersArg = None,
schema: SchemaT = None,
schema: Optional[SchemaT] = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
callback: Optional[MessageSentCallback] = None,
) -> Awaitable[RecordMetadata]:
"""Attach message to source topic offset."""
# This attaches message to be published when source message' is
Expand Down
Loading

0 comments on commit 96ce8f0

Please sign in to comment.