Skip to content

Commit

Permalink
Docstrings - strict pydocstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Apr 9, 2019
1 parent f9b93d8 commit 948497e
Show file tree
Hide file tree
Showing 94 changed files with 977 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ flakecheck:
$(FLAKE8) "$(PROJ)" "$(TESTDIR)" examples/

docstylecheck:
$(PYDOCSTYLE) --match-dir '(?!types)' "$(PROJ)"
$(PYDOCSTYLE) --match-dir '(?!types|assignor)' "$(PROJ)"

vulture:
$(VULTURE) "$(PROJ)" "$(TESTDIR)" "$(EXAMPLESDIR)" \
Expand Down
2 changes: 1 addition & 1 deletion docs/includes/blurb.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ It's inspired by tools such as `Kafka Streams`_, `Apache Spark`_,
a radically more straightforward approach to stream processing.

Modern web applications are increasingly built as a collection
of microservices and even before this, it has been difficult to write
of micro services and even before this, it has been difficult to write
data reporting operations at scale. In a reactive stream based system,
you don't have to strain your database with costly queries. In Faust,
a streaming data pipeline updates information as events happen
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Fixups
faust.fixups.base
faust.fixups.django

Livecheck
LiveCheck
=========

.. toctree::
Expand Down
4 changes: 4 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Bryson
Buckens
Bujniewicz
Buttu
CLI
CPython
Carvalho
Cassandra
Expand Down Expand Up @@ -88,6 +89,7 @@ Dein
Delalande
Demir
Deo
Deque
Django
Dmitry
Dubus
Expand Down Expand Up @@ -496,6 +498,7 @@ dburi
de
deprecated
deprecations
deque
der
deserialization
deserialize
Expand Down Expand Up @@ -563,6 +566,7 @@ memoize
memoized
metadata
metavar
middleware
misconfiguration
misconfigure
misconfigured
Expand Down
8 changes: 4 additions & 4 deletions docs/userguide/livecheck.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
================================================================
LiveCheck: End-to-end test for prodiction/staging.
LiveCheck: End-to-end test for production/staging.
================================================================

What is the problem with unit tests?
Expand All @@ -10,7 +10,7 @@ A staging environment is still desirable.

Enables you to:

- track requests as they travel through your microservice architecture.
- track requests as they travel through your micro service architecture.
- define contracts that should be met at every step

all by writing a class that looks like a regular unit test.
Expand All @@ -20,9 +20,9 @@ subsystems are down. Tests are executed based on probability, so you can
run tests for every requests, or for just 30%, 50%, or even 0.1% of your
requests.

This is not just for microservice architectures, it's for any asynchronous
This is not just for micro service architectures, it's for any asynchronous
system. A monolith sending celery tasks is a good example, you could
track and babysit at every step of a workflow to make sure things
track and babysit at every step of a work flow to make sure things
progress as they should.

Every stage of your production pipeline could be tested for
Expand Down
1 change: 1 addition & 0 deletions faust/_cython/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Cython optimized Faust features."""
1 change: 1 addition & 0 deletions faust/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Agents."""
from .agent import Agent, AgentFun, AgentT, SinkT, current_agent
from .manager import AgentManager, AgentManagerT
from .replies import ReplyConsumer
Expand Down
6 changes: 6 additions & 0 deletions faust/agents/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,36 @@ def __init__(self,
Service.__init__(self, **kwargs)

async def on_start(self) -> None:
"""Call when actor is starting."""
assert self.actor_task
self.add_future(self.actor_task)

async def on_stop(self) -> None:
"""Call when actor is being stopped."""
self.cancel()

async def on_isolated_partition_revoked(self, tp: TP) -> None:
"""Call when an isolated partition is being revoked."""
self.log.debug('Cancelling current task in actor for partition %r', tp)
self.cancel()
self.log.info('Stopping actor for revoked partition %r...', tp)
await self.stop()
self.log.debug('Actor for revoked partition %r stopped')

async def on_isolated_partition_assigned(self, tp: TP) -> None:
"""Call when an isolated partition is being assigned."""
self.log.dev('Actor was assigned to %r', tp)

def cancel(self) -> None:
"""Tell actor to stop reading from the stream."""
cast(ChannelT, self.stream.channel)._throw(StopAsyncIteration())

def __repr__(self) -> str:
return f'<{self.shortlabel}>'

@property
def label(self) -> str:
"""Return human readable description of actor."""
s = self.agent._agent_label(name_suffix='*')
if self.stream.active_partitions:
partitions = {
Expand Down
61 changes: 61 additions & 0 deletions faust/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def __init__(self,
Service.__init__(self)

def on_init_dependencies(self) -> Iterable[ServiceT]:
"""Return list of services dependencies required to start agent."""
# Agent service is now a child of app.
self.beacon.reattach(self.app.beacon)
return []
Expand Down Expand Up @@ -267,6 +268,7 @@ async def _start_for_partitions(self,
return await self._start_one_supervised(None, active_partitions)

async def on_start(self) -> None:
"""Call when an agent starts."""
self.supervisor = self._new_supervisor()
await self._on_start_supervisor()

Expand Down Expand Up @@ -322,6 +324,7 @@ def _get_active_partitions(self) -> Optional[Set[TP]]:
return active_partitions

async def on_stop(self) -> None:
"""Call when an agent stops."""
# Agents iterate over infinite streams, so we cannot wait for it
# to stop.
# Instead we cancel it and this forces the stream to ack the
Expand All @@ -340,10 +343,12 @@ async def _stop_supervisor(self) -> None:
self.supervisor = None

def cancel(self) -> None:
"""Cancel agent and its actor instances running in this process."""
for aref in self._actors:
aref.cancel()

async def on_partitions_revoked(self, revoked: Set[TP]) -> None:
"""Call when partitions are revoked."""
T = traced_from_parent_span()
if self.isolated_partitions:
# isolated: start/stop actors for each partition
Expand All @@ -352,13 +357,15 @@ async def on_partitions_revoked(self, revoked: Set[TP]) -> None:
await T(self.on_shared_partitions_revoked)(revoked)

async def on_partitions_assigned(self, assigned: Set[TP]) -> None:
"""Call when partitions are assigned."""
T = traced_from_parent_span()
if self.isolated_partitions:
await T(self.on_isolated_partitions_assigned)(assigned)
else:
await T(self.on_shared_partitions_assigned)(assigned)

async def on_isolated_partitions_revoked(self, revoked: Set[TP]) -> None:
"""Call when isolated partitions are revoked."""
self.log.dev('Partitions revoked')
T = traced_from_parent_span()
for tp in revoked:
Expand All @@ -367,6 +374,7 @@ async def on_isolated_partitions_revoked(self, revoked: Set[TP]) -> None:
await T(aref.on_isolated_partition_revoked)(tp)

async def on_isolated_partitions_assigned(self, assigned: Set[TP]) -> None:
"""Call when isolated partitions are assigned."""
T = traced_from_parent_span()
for tp in sorted(assigned):
await T(self._assign_isolated_partition)(tp)
Expand Down Expand Up @@ -402,12 +410,15 @@ async def _start_isolated(self, tp: TP) -> ActorT:
return await self._start_for_partitions({tp})

async def on_shared_partitions_revoked(self, revoked: Set[TP]) -> None:
"""Call when non-isolated partitions are revoked."""
...

async def on_shared_partitions_assigned(self, assigned: Set[TP]) -> None:
"""Call when non-isolated partitions are assigned."""
...

def info(self) -> Mapping:
"""Return agent attributes as a dictionary."""
return {
'app': self.app,
'fun': self.fun,
Expand All @@ -422,13 +433,19 @@ def info(self) -> Mapping:
}

def clone(self, *, cls: Type[AgentT] = None, **kwargs: Any) -> AgentT:
"""Create clone of this agent object.
Keyword arguments can be passed to override any argument
supported by :class:`Agent.__init__ <Agent>`.
"""
return (cls or type(self))(**{**self.info(), **kwargs})

def test_context(self,
channel: ChannelT = None,
supervisor_strategy: SupervisorStrategyT = None,
on_error: AgentErrorHandler = None,
**kwargs: Any) -> AgentTestWrapperT: # pragma: no cover
"""Create new unit-testing wrapper for this agent."""
# flow control into channel queues are disabled at startup,
# so need to resume that.
self.app.flow_control.resume()
Expand Down Expand Up @@ -471,6 +488,7 @@ def __call__(self, *,
active_partitions: Set[TP] = None,
stream: StreamT = None,
channel: ChannelT = None) -> ActorRefT:
"""Create new actor instance for this agent."""
# The agent function can be reused by other agents/tasks.
# For example:
#
Expand Down Expand Up @@ -499,6 +517,7 @@ def actor_from_stream(self,
index: int = None,
active_partitions: Set[TP] = None,
channel: ChannelT = None) -> ActorRefT:
"""Create new actor from stream."""
we_created_stream = False
actual_stream: StreamT
if stream is None:
Expand Down Expand Up @@ -532,13 +551,15 @@ def actor_from_stream(self,
)

def add_sink(self, sink: SinkT) -> None:
"""Add new sink to further handle results from this agent."""
if sink not in self._sinks:
self._sinks.append(sink)

def stream(self,
channel: ChannelT = None,
active_partitions: Set[TP] = None,
**kwargs: Any) -> StreamT:
"""Create underlying stream used by this agent."""
if channel is None:
channel = cast(TopicT, self.channel_iterator).clone(
is_iterator=False,
Expand Down Expand Up @@ -672,6 +693,11 @@ async def cast(self,
partition: int = None,
timestamp: float = None,
headers: HeadersArg = None) -> None:
"""RPC operation: like :meth:`ask` but do not expect reply.
Cast here is like "casting a spell", and will not expect
a reply back from the agent.
"""
await self.send(
key=key,
value=value,
Expand All @@ -689,6 +715,11 @@ async def ask(self,
headers: HeadersArg = None,
reply_to: ReplyToArg = None,
correlation_id: str = None) -> Any:
"""RPC operation: ask agent for result of processing value.
This version will wait until the result is available
and return the processed value.
"""
p = await self.ask_nowait(
value,
key=key,
Expand All @@ -714,6 +745,11 @@ async def ask_nowait(self,
reply_to: ReplyToArg = None,
correlation_id: str = None,
force: bool = False) -> ReplyPromise:
"""RPC operation: ask agent for result of processing value.
This version does not wait for the result to arrive,
but instead returns a promise of future evaluation.
"""
if reply_to is None:
raise TypeError('Missing reply_to argument')
reply_to = self._get_strtopic(reply_to)
Expand Down Expand Up @@ -805,6 +841,11 @@ async def map(self,
key: K = None,
reply_to: ReplyToArg = None,
) -> AsyncIterator: # pragma: no cover
"""RPC map operation on a list of values.
A map operation iterates over results as they arrive.
See :meth:`join` and :meth:`kvjoin` if you want them in order.
"""
# Map takes only values, but can provide one key that is used for all.
async for value in self.kvmap(
((key, v) async for v in aiter(values)), reply_to):
Expand All @@ -815,6 +856,11 @@ async def kvmap(
items: Union[AsyncIterable[Tuple[K, V]], Iterable[Tuple[K, V]]],
reply_to: ReplyToArg = None,
) -> AsyncIterator[str]: # pragma: no cover
"""RPC map operation on a list of ``(key, value)`` pairs.
A map operation iterates over results as they arrive.
See :meth:`join` and :meth:`kvjoin` if you want them in order.
"""
# kvmap takes (key, value) pairs.
reply_to = self._get_strtopic(reply_to or self.app.conf.reply_to)

Expand Down Expand Up @@ -843,6 +889,11 @@ async def join(self,
key: K = None,
reply_to: ReplyToArg = None,
) -> List[Any]: # pragma: no cover
"""RPC map operation on a list of values.
A join returns the results in order, and only returns once
all values have been processed.
"""
return await self.kvjoin(
((key, value) async for value in aiter(values)),
reply_to=reply_to,
Expand All @@ -852,6 +903,11 @@ async def kvjoin(
self,
items: Union[AsyncIterable[Tuple[K, V]], Iterable[Tuple[K, V]]],
reply_to: ReplyToArg = None) -> List[Any]: # pragma: no cover
"""RPC map operation on list of ``(key, value)`` pairs.
A join returns the results in order, and only returns once
all values have been processed.
"""
reply_to = self._get_strtopic(reply_to or self.app.conf.reply_to)
barrier = BarrierState(reply_to)

Expand Down Expand Up @@ -901,13 +957,15 @@ def _repr_info(self) -> str:
return shorten_fqdn(self.name)

def get_topic_names(self) -> Iterable[str]:
"""Return list of topic names this agent subscribes to."""
channel = self.channel
if isinstance(channel, TopicT):
return channel.topics
return []

@property
def channel(self) -> ChannelT:
"""Return channel used by agent."""
if self._channel is None:
self._channel = self._prepare_channel(
self._channel_arg,
Expand All @@ -923,6 +981,7 @@ def channel(self, channel: ChannelT) -> None:

@property
def channel_iterator(self) -> AsyncIterator:
"""Return channel agent iterates over."""
# The channel is "memoized" here, so subsequent access to
# instance.channel_iterator will return the same value.
if self._channel_iterator is None:
Expand All @@ -937,6 +996,7 @@ def channel_iterator(self, it: AsyncIterator) -> None:

@property
def label(self) -> str:
"""Return human-readable description of agent."""
return self._agent_label()

def _agent_label(self, name_suffix: str = '') -> str:
Expand All @@ -946,6 +1006,7 @@ def _agent_label(self, name_suffix: str = '') -> str:

@property
def shortlabel(self) -> str:
"""Return short description of agent."""
return self._agent_label()


Expand Down
Loading

0 comments on commit 948497e

Please sign in to comment.