Skip to content

Commit

Permalink
fix: set loop for services where missing (faust-streaming#441)
Browse files Browse the repository at this point in the history
* fix: set loop for services where missing

* chore: linting
  • Loading branch information
dada-engineer authored Feb 1, 2023
1 parent c20e11c commit 0af3c8a
Show file tree
Hide file tree
Showing 38 changed files with 55 additions and 106 deletions.
97 changes: 49 additions & 48 deletions extra/tools/verify_doc_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,36 @@
from yarl import URL


SETTINGS: Path = Path('docs/userguide/settings.rst')
SETTINGS: Path = Path("docs/userguide/settings.rst")

app = faust.App('verify_defaults')
app = faust.App("verify_defaults")

ignore_settings: Set[str] = {
'id',
'tabledir',
'reply_to',
'broker_consumer',
'broker_producer',
"id",
"tabledir",
"reply_to",
"broker_consumer",
"broker_producer",
}

builtin_locals: Dict[str, Any] = {
'aiohttp': aiohttp,
'app': app,
'datetime': datetime,
'datadir': app.conf.datadir,
'faust': faust,
'logging': logging,
'mode': mode,
'socket': socket,
'timedelta': timedelta,
'web_host': socket.gethostname(),
'web_port': 6066,
'VERSION': faust.__version__,
'uuid': uuid,
'URL': URL,
"aiohttp": aiohttp,
"app": app,
"datetime": datetime,
"datadir": app.conf.datadir,
"faust": faust,
"logging": logging,
"mode": mode,
"socket": socket,
"timedelta": timedelta,
"web_host": socket.gethostname(),
"web_port": 6066,
"VERSION": faust.__version__,
"uuid": uuid,
"URL": URL,
}

RE_REF = re.compile(r'^:(\w+):`')
RE_REF = re.compile(r"^:(\w+):`")


class Error(NamedTuple):
Expand All @@ -63,7 +63,7 @@ def verify_settings(rst_path: Path) -> Iterator[Error]:
actual = actual.value
if actual != default:
yield Error(
reason='mismatch',
reason="mismatch",
setting=setting_name,
default=default,
actual=actual,
Expand All @@ -74,53 +74,54 @@ def report_errors(errors: Iterator[Error]) -> int:
num_errors: int = 0
for num_errors, e in enumerate(errors, start=1):
if num_errors == 1:
carp(f'{sys.argv[0]}: Errors in docs/userguide/settings.rst:')
carp(f' + Setting {e.reason} {e.setting}:')
carp(f' documentation: {e.default!r}')
carp(f' actual: {e.actual!r}')
carp(f"{sys.argv[0]}: Errors in docs/userguide/settings.rst:")
carp(f" + Setting {e.reason} {e.setting}:")
carp(f" documentation: {e.default!r}")
carp(f" actual: {e.actual!r}")
if num_errors:
carp(f'Found {num_errors} error(s).', file=sys.stderr)
carp(f"Found {num_errors} error(s).", file=sys.stderr)
else:
print(f'{sys.argv[0]}: All OK :-)', file=sys.stdout)
print(f"{sys.argv[0]}: All OK :-)", file=sys.stdout)
return num_errors


def carp(msg, *, file: IO = sys.stderr, **kwargs: Any) -> None:
print(msg, file=file, **kwargs)


def find_settings_in_rst(rst_path: Path,
locals: Dict[str, Any] = None,
builtin_locals: Dict[str, Any] = builtin_locals,
ignore_settings: Set[str] = ignore_settings):
def find_settings_in_rst(
rst_path: Path,
locals: Dict[str, Any] = None,
builtin_locals: Dict[str, Any] = builtin_locals,
ignore_settings: Set[str] = ignore_settings,
):
setting: str = None
default: Any = None
app = faust.App('_verify_doc_defaults')
app = faust.App("_verify_doc_defaults")
_globals = dict(globals())
# Add setting default to globals
# so that defaults referencing another setting work.
# E.g.:
# :default: :setting:`broker_api_version`
_globals.update({
name: getattr(app.conf, name)
for name in app.conf.setting_names()
})
_globals.update(
{name: getattr(app.conf, name) for name in app.conf.setting_names()}
)
local_ns: Dict[str, Any] = {**builtin_locals, **(locals or {})}
for line in rst_path.read_text().splitlines():
if line.startswith('.. setting::'):
if line.startswith(".. setting::"):
if setting and not default and setting not in ignore_settings:
raise Exception(f'No default value for {setting}')
setting = line.split('::')[-1].strip()
elif ':default:' in line:
if '``' in line:
line, sep, rest = line.rpartition('``')
default = line.split(':default:')[-1].strip()
default = default.strip('`')
default = RE_REF.sub('', default)
raise Exception(f"No default value for {setting}")
setting = line.split("::")[-1].strip()
elif ":default:" in line:
if "``" in line:
line, sep, rest = line.rpartition("``")
default = line.split(":default:")[-1].strip()
default = default.strip("`")
default = RE_REF.sub("", default)
default_value = eval(default, _globals, local_ns)
if setting not in ignore_settings:
yield setting, default_value


if __name__ == '__main__':
if __name__ == "__main__":
sys.exit(report_errors(verify_settings(SETTINGS)))
3 changes: 1 addition & 2 deletions faust/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def __init__(
"Agent concurrency must be 1 when using isolated partitions"
)
self.use_reply_headers = use_reply_headers
Service.__init__(self)
Service.__init__(self, loop=app.loop)

def on_init_dependencies(self) -> Iterable[ServiceT]:
"""Return list of services dependencies required to start agent."""
Expand Down Expand Up @@ -1090,7 +1090,6 @@ def shortlabel(self) -> str:


class AgentTestWrapper(Agent, AgentTestWrapperT): # pragma: no cover

_stream: StreamT

def __init__(
Expand Down
1 change: 1 addition & 0 deletions faust/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ class Command(abc.ABC): # noqa: B024
@classmethod
def as_click_command(cls) -> Callable: # pragma: no cover
"""Convert command into :pypi:`click` command."""

# This is what actually registers the commands into the
# :pypi:`click` command-line interface (the ``def cli`` main above).
# __init_subclass__ calls this for the side effect of being
Expand Down
2 changes: 1 addition & 1 deletion faust/livecheck/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def __init__(
# signal attributes have the correct signal instance.
self.__dict__.update(self.signals)

Service.__init__(self, **kwargs)
Service.__init__(self, loop=app.loop, **kwargs)

@Service.timer(10.0)
async def _sampler(self) -> None:
Expand Down
2 changes: 0 additions & 2 deletions faust/models/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ def prepare_value(


class NumberField(FieldDescriptor[T]):

max_value: Optional[int]
min_value: Optional[int]

Expand Down Expand Up @@ -439,7 +438,6 @@ def validate(self, value: Decimal) -> Iterable[ValidationError]:


class CharField(FieldDescriptor[CharacterType]):

max_length: Optional[int]
min_length: Optional[int]
trim_whitespace: bool
Expand Down
1 change: 0 additions & 1 deletion faust/models/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def __format__(self, format_spec: str) -> str:


class _FrameLocal(UserString, Generic[T]):

_field_name: str
_tag_type: str
_frame: str
Expand Down
1 change: 0 additions & 1 deletion faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,6 @@ def get_key(withdrawal):
if topic is not None:
channel = topic
else:

prefix = ""
if self.prefix and not cast(TopicT, self.channel).has_prefix:
prefix = self.prefix + "-"
Expand Down
2 changes: 1 addition & 1 deletion faust/tables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def __init__(
synchronize_all_active_partitions: bool = False,
**kwargs: Any,
) -> None:
Service.__init__(self, **kwargs)
Service.__init__(self, loop=app.loop, **kwargs)
self.app = app
self.name = cast(str, name) # set lazily so CAN BE NONE!
self.default = default
Expand Down
2 changes: 1 addition & 1 deletion faust/tables/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(self, table: Table, **kwargs: Any) -> None:
self.table_name = self.table.name
self.data = {}
self._dirty = set()
Service.__init__(self, **kwargs)
Service.__init__(self, loop=table.loop, **kwargs)

def send_changelog_event(self, key: Any, operation: int, value: Any) -> None:
"""Send changelog event to the tables changelog topic."""
Expand Down
1 change: 0 additions & 1 deletion faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,6 @@ def verify_recovery_event_path(self, now: float, tp: TP) -> None:


class ThreadDelegateConsumer(Consumer):

_thread: ConsumerThread

#: Main thread method queue.
Expand Down
4 changes: 0 additions & 4 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,6 @@ async def commit_transaction(self, transactional_id: str) -> None:
"""Commit transaction by id."""
try:
async with self._trn_locks[transactional_id]:

transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer:
await transaction_producer.commit_transaction()
Expand All @@ -1170,7 +1169,6 @@ async def abort_transaction(self, transactional_id: str) -> None:
"""Abort and rollback transaction by id."""
try:
async with self._trn_locks[transactional_id]:

transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer:
await transaction_producer.abort_transaction()
Expand Down Expand Up @@ -1210,7 +1208,6 @@ async def commit_transactions(
for transactional_id, offsets in tid_to_offset_map.items():
# get the producer
async with self._trn_locks[transactional_id]:

transaction_producer = self._transaction_producers.get(transactional_id)
if transaction_producer:
logger.debug(
Expand Down Expand Up @@ -1346,7 +1343,6 @@ async def send(
try:
if transactional_id:
async with self._trn_locks[transactional_id]:

return cast(
Awaitable[RecordMetadata],
await transaction_producer.send(
Expand Down
3 changes: 0 additions & 3 deletions faust/types/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class _AppT:


class ActorT(ServiceT, Generic[_T]):

agent: "AgentT"
stream: StreamT
it: _T
Expand Down Expand Up @@ -122,7 +121,6 @@ class AwaitableActorT(ActorT[Awaitable], Awaitable):


class AgentT(ServiceT, Generic[_T]):

name: str
app: _AppT
concurrency: int
Expand Down Expand Up @@ -325,7 +323,6 @@ def human_tracebacks(self) -> str:


class AgentTestWrapperT(AgentT, AsyncIterable):

new_value_processed: asyncio.Condition
original_channel: ChannelT
results: MutableMapping[int, Any]
Expand Down
2 changes: 0 additions & 2 deletions faust/types/assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class _AppT:


class PartitionAssignorT(abc.ABC):

replicas: int
app: _AppT

Expand Down Expand Up @@ -69,7 +68,6 @@ def tables_metadata(self) -> HostToPartitionMap:


class LeaderAssignorT(ServiceT):

app: _AppT

@abc.abstractmethod
Expand Down
1 change: 0 additions & 1 deletion faust/types/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class _SchemaT:


class EventT(Generic[T], AsyncContextManager):

app: _AppT
key: K
value: V
Expand Down
1 change: 0 additions & 1 deletion faust/types/fixups.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class _AppT:


class FixupT(abc.ABC):

app: _AppT

@abc.abstractmethod
Expand Down
1 change: 0 additions & 1 deletion faust/types/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ def validation_errors(self) -> List[ValidationError]:


class FieldDescriptorT(Generic[T]):

field: str
input_name: str
output_name: str
Expand Down
1 change: 0 additions & 1 deletion faust/types/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ class SensorT(SensorInterfaceT, ServiceT):


class SensorDelegateT(SensorInterfaceT, Iterable):

# Delegate calls to many sensors.

@abc.abstractmethod
Expand Down
2 changes: 0 additions & 2 deletions faust/types/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class _Message:


class RegistryT(abc.ABC):

key_serializer: CodecArg
value_serializer: CodecArg

Expand Down Expand Up @@ -72,7 +71,6 @@ def dumps_value(


class SchemaT(Generic[KT, VT]):

key_type: Optional[_ModelArg] = None
value_type: Optional[_ModelArg] = None

Expand Down
1 change: 0 additions & 1 deletion faust/types/stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class _CollectionT:


class StoreT(ServiceT, FastUserDict[KT, VT]):

url: URL
app: _AppT
table: _CollectionT
Expand Down
1 change: 0 additions & 1 deletion faust/types/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def _human_channel(self) -> str:


class StreamT(AsyncIterable[T_co], JoinableT, ServiceT):

app: _AppT
channel: AsyncIterator[T_co]
outbox: Optional[asyncio.Queue] = None
Expand Down
1 change: 0 additions & 1 deletion faust/types/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class _SchemaT:


class TopicT(ChannelT):

#: Iterable/Sequence of topic names to subscribe to.
topics: Sequence[str]

Expand Down
Loading

0 comments on commit 0af3c8a

Please sign in to comment.