Skip to content

Commit

Permalink
100% coverage for faust.tables.wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Nov 20, 2018
1 parent a9a663f commit b23285f
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 25 deletions.
9 changes: 5 additions & 4 deletions faust/tables/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Base class Collection for Table and future data structures."""
import abc
import time
from collections import defaultdict
from datetime import datetime
from heapq import heappop, heappush
from time import time
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -362,19 +362,20 @@ def _relative_now(self, event: EventT = None) -> float:
# get current timestamp
event = event if event is not None else current_event()
if event is None:
return time()
return time.time()
return self._partition_latest_timestamp[event.message.partition]

def _relative_event(self, event: EventT = None) -> float:
event = event if event is not None else current_event()
# get event timestamp
if event is None:
raise RuntimeError('Outside of stream iteration')
raise RuntimeError('Operation outside of stream iteration')
return event.message.timestamp

def _relative_field(self, field: FieldDescriptorT) -> RelativeHandler:
def to_value(event: EventT = None) -> Union[float, datetime]:
if event is None:
raise RuntimeError('Outside of stream iteration')
raise RuntimeError('Operation outside of stream iteration')
return field.getattr(cast(ModelT, event.value))

return to_value
Expand Down
61 changes: 46 additions & 15 deletions faust/tables/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,28 +72,40 @@ def __iter__(self) -> Iterator[Tuple[Any, Any]]:
table = cast(Table, wrapper.table)
timestamp = wrapper.get_timestamp(self.event)
for key in wrapper._keys():
yield key, table._windowed_timestamp(key, timestamp)
try:
yield key, table._windowed_timestamp(key, timestamp)
except KeyError:
pass

def now(self) -> Iterator[Tuple[Any, Any]]:
wrapper = cast(WindowWrapper, self._mapping)
table = cast(Table, wrapper.table)
for key in wrapper._keys():
yield key, table._windowed_now(key)
try:
yield key, table._windowed_now(key)
except KeyError:
pass

def current(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]:
wrapper = cast(WindowWrapper, self._mapping)
table = cast(Table, wrapper.table)
timestamp = table._relative_event(event or self.event)
for key in wrapper._keys():
yield key, table._windowed_timestamp(key, timestamp)
try:
yield key, table._windowed_timestamp(key, timestamp)
except KeyError:
pass

def delta(self,
d: Seconds,
event: EventT = None) -> Iterator[Tuple[Any, Any]]:
wrapper = cast(WindowWrapper, self._mapping)
table = cast(Table, wrapper.table)
for key in wrapper._keys():
yield key, table._windowed_delta(key, d, event or self.event)
try:
yield key, table._windowed_delta(key, d, event or self.event)
except KeyError:
pass


class WindowedValuesView(WindowedValuesViewT):
Expand All @@ -109,26 +121,38 @@ def __iter__(self) -> Iterator[Any]:
table = cast(Table, wrapper.table)
timestamp = wrapper.get_timestamp(self.event)
for key in wrapper._keys():
yield table._windowed_timestamp(key, timestamp)
try:
yield table._windowed_timestamp(key, timestamp)
except KeyError:
pass

def now(self) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
table = cast(Table, wrapper.table)
for key in wrapper._keys():
yield table._windowed_now(key)
try:
yield table._windowed_now(key)
except KeyError:
pass

def current(self, event: EventT = None) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
table = cast(Table, wrapper.table)
timestamp = table._relative_event(event or self.event)
for key in wrapper._keys():
yield table._windowed_timestamp(key, timestamp)
try:
yield table._windowed_timestamp(key, timestamp)
except KeyError:
pass

def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
table = cast(Table, wrapper.table)
for key in wrapper._keys():
yield table._windowed_delta(key, d, event or self.event)
try:
yield table._windowed_delta(key, d, event or self.event)
except KeyError:
pass


class WindowSet(WindowSetT, FastUserDict):
Expand Down Expand Up @@ -262,13 +286,18 @@ class WindowWrapper(WindowWrapperT):
accessed, instead :class:`WindowSet` is returned so that
the values can be further reduced to the wanted time period.
"""
key_index: bool = False
key_index_table: Optional[TableT] = None

def __init__(self, table: TableT, *,
relative_to: RelativeArg = None,
key_index: bool = False) -> None:
key_index: bool = False,
key_index_table: TableT = None) -> None:
self.table = table
if key_index:
self.key_index = key_index
self.key_index_table = key_index_table

if self.key_index and self.key_index_table is None:
self.key_index_table = self.table.app.Table(
f'{self.table.name}-key_index', value_type=int)
self._get_relative_timestamp = self._relative_handler(relative_to)
Expand All @@ -277,6 +306,8 @@ def clone(self, relative_to: RelativeArg) -> WindowWrapperT:
return type(self)(
table=self.table,
relative_to=relative_to or self._get_relative_timestamp,
key_index=self.key_index,
key_index_table=self.key_index_table,
)

@property
Expand All @@ -297,14 +328,14 @@ def relative_to_stream(self) -> WindowWrapperT:

def get_timestamp(self, event: EventT = None) -> float:
event = event or current_event()
if event is None:
raise TypeError('Operation outside of stream iteration')
get_relative_timestamp = self.get_relative_timestamp
if get_relative_timestamp:
timestamp = get_relative_timestamp(event)
if isinstance(timestamp, datetime):
return timestamp.timestamp()
return timestamp
if event is None:
raise RuntimeError('Operation outside of stream iteration')
return event.message.timestamp

def on_recover(self, fun: RecoverCallback) -> RecoverCallback:
Expand Down Expand Up @@ -337,9 +368,6 @@ def __delitem__(self, key: Any) -> None:
self.on_del_key(key)
cast(Table, self.table)._del_windowed(key, self.get_timestamp())

def __iter__(self) -> Iterator:
return iter(self.table)

def __len__(self) -> int:
if self.key_index_table is not None:
return len(self.key_index_table)
Expand All @@ -361,6 +389,9 @@ def _relative_handler(
raise ImproperlyConfigured(
f'Relative cannot be type {type(relative_to)}')

def __iter__(self) -> Iterator:
return self._keys()

def keys(self) -> KeysView:
return WindowedKeysView(self)

Expand Down
27 changes: 26 additions & 1 deletion t/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import time
from typing import NamedTuple
import pytest
from mode.utils.mocks import MagicMock
from mode.utils.mocks import MagicMock, patch

sentinel = object()

Expand Down Expand Up @@ -63,3 +65,26 @@ def setitem(self, dic, name, value=sentinel, new=MagicMock, **kwargs):
value = self._value_or_mock(value, new, name, dic, **kwargs)
self.monkeypatch.setitem(dic, name, value)
return value


class TimeMarks(NamedTuple):
time: float = None
monotonic: float = None


@pytest.yield_fixture()
def freeze_time(event_loop, request):
marks = request.node.get_closest_marker('time')
timestamp = time.time()
monotimestamp = time.monotonic()

with patch('time.time') as time_:
with patch('time.monotonic') as monotonic_:
options = TimeMarks(**{
**{'time': timestamp,
'monotonic': monotimestamp},
**((marks.kwargs or {}) if marks else {}),
})
time_.return_value = options.time
monotonic_.return_value = options.monotonic
yield options
Loading

0 comments on commit b23285f

Please sign in to comment.