Skip to content

Commit

Permalink
Bugfixes and QoL changes. (#22)
Browse files Browse the repository at this point in the history
* Fixed keyboardinterrupt to stop the program.

* Added support for argumentless handler functions.

* Moved kafka configuration warning log filter.

* Added function to get admin client.

* Added 'auto.offset.reset': 'earliest' in examples.

* Using Cache contextmanager now returns a Cache instance.

* Added escape option for for literal values starting with $ in CLI config file.

* Using absolute cache path in CLI config file.
  • Loading branch information
Menziess authored May 12, 2023
1 parent 58bf057 commit ca7f9c9
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 23 deletions.
4 changes: 3 additions & 1 deletion docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Topic can be used to send and receive kafka messages.

topic = Topic('emoji', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2)
Expand Down Expand Up @@ -155,7 +156,7 @@ If there's no incoming data, generators can be used to trigger handler functions
sleep(interval)

@snap(timer())
def handler(msg):
def handler():
print(strftime('%H:%M:%S', localtime()))

stream()
Expand Down Expand Up @@ -183,6 +184,7 @@ Codecs are used for serializing and deserializing data.

topic = Topic('codec-demo', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
}, offset=-2, codec=JsonCodec())
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Snapstream can be summarized as:

t = Topic('emoji', {
'bootstrap.servers': 'localhost:29091',
'auto.offset.reset': 'earliest',
'group.instance.id': 'demo',
'group.id': 'demo',
})
Expand Down
7 changes: 6 additions & 1 deletion snapstream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Snapstream public objects."""

import logging
from inspect import signature
from typing import Any, Callable, Generator, Iterable

Expand All @@ -18,6 +19,8 @@
'READ_FROM_END',
]

logging.basicConfig()


def _sink_output(s: Callable[..., None], output: Any) -> None:
if not isinstance(output, tuple) and isinstance(s, (Cache)):
Expand Down Expand Up @@ -63,8 +66,10 @@ def _handler(msg, kwargs):
parameters = signature(f).parameters.values()
if any(p.kind == p.VAR_KEYWORD for p in parameters):
output = f(msg, **kwargs)
else:
elif parameters:
output = f(msg)
else:
output = f()
_handle_generator_or_function(sink, output)

for it in iterable:
Expand Down
29 changes: 15 additions & 14 deletions snapstream/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from os import path
from re import search
from sys import argv, exit
from typing import Optional
from typing import Callable, Optional

from rocksdict import AccessType
from toolz import curry
from toolz import compose, curry, identity

from snapstream import READ_FROM_END, Cache, Topic
from snapstream.codecs import AvroCodec
Expand Down Expand Up @@ -60,22 +60,22 @@ def get_args(args=argv[1:]) -> Namespace:
return parser.parse_args(args)


def default_topic_entry(name: str) -> dict:
def default_topic_entry(name: str, prep: Callable) -> dict:
"""Create default topic entry."""
return {
'type': 'Topic',
'name': name,
'name': prep(name),
'conf': {
'bootstrap.servers': 'localhost:29091',
}
}


def default_cache_entry(path: str) -> dict:
def default_cache_entry(path: str, prep: Callable) -> dict:
"""Create default topic entry."""
return {
'type': 'Cache',
'name': path,
'path': prep(path),
'conf': {}
}

Expand All @@ -97,22 +97,22 @@ def get_config_entry(config_path: str, args: Namespace) -> dict:
config = []

# Find prop having certain key
prop = {
'topic': 'name',
'cache': 'path',
prep, prop = {
'topic': [identity, 'name'],
'cache': [compose(path.abspath, path.expanduser), 'path'],
}[args.action]
key = getattr(args, prop)
if entry := {
key: _
for _ in config
if _.get(prop) == key
if _.get(prop) == prep(key)
}.get(key):
return entry

# If not found, create entry
entry = (
default_topic_entry(args.name) if args.action == 'topic'
else default_cache_entry(args.path) if args.action == 'cache'
default_topic_entry(args.name, prep) if args.action == 'topic'
else default_cache_entry(args.path, prep) if args.action == 'cache'
else {}
)
config.append(entry)
Expand All @@ -126,8 +126,9 @@ def replace_variable_references(entry: dict, args: Namespace) -> dict:
conf = entry['conf']
for k, v in conf.items():
if v.startswith('$'):
updated_v = get_variable(v[1:], args.secrets_base_path)
conf[k] = updated_v
conf[k] = get_variable(v[1:], args.secrets_base_path)
if v.startswith(r'\$'):
conf[k] = v.replace()
entry['conf'] = conf
return entry

Expand Down
6 changes: 3 additions & 3 deletions snapstream/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ def __setitem__(self, key, val) -> None:
"""Set item in db."""
self.db[key] = val

def __enter__(self) -> Rdict:
def __enter__(self) -> 'Cache':
"""Contextmanager."""
return self.db.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit contextmanager."""
self.db.__exit__(exc_type, exc_val, exc_tb)
self.close()

def set_dumps(self, dumps: Callable[[Any], bytes]) -> None:
"""Set custom dumps function."""
Expand Down
1 change: 1 addition & 0 deletions snapstream/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from toolz import curry

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def deserialize_json(msg: bytes) -> dict:
Expand Down
13 changes: 9 additions & 4 deletions snapstream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from snapstream.codecs import ICodec
from snapstream.utils import KafkaIgnoredPropertyFilter, Singleton

logger = logging.getLogger(__file__)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addFilter(KafkaIgnoredPropertyFilter())

READ_FROM_START = -2
READ_FROM_END = -1
Expand Down Expand Up @@ -53,8 +55,6 @@ def distribute_messages(it, queue, kwargs):

def start(self, **kwargs):
"""Start the streams."""
logger.addFilter(KafkaIgnoredPropertyFilter())

queue = Queue(maxsize=1)
threads = [
Thread(
Expand All @@ -73,7 +73,7 @@ def start(self, **kwargs):
if exception := queue.get():
raise exception
except KeyboardInterrupt:
logger.info('You stopped the program.')
exit()
finally:
self.iterables = set()

Expand Down Expand Up @@ -238,6 +238,7 @@ class Topic(ITopic):
>>> topic = Topic('emoji', {
... 'bootstrap.servers': 'localhost:29091',
... 'auto.offset.reset': 'earliest',
... 'group.id': 'demo',
... })
Expand Down Expand Up @@ -276,6 +277,10 @@ def __init__(
self.codec = codec
self.dry = dry

def admin(self):
"""Get admin client."""
return AdminClient(self.conf)

def create_topic(self, *args, **kwargs) -> None:
"""Create topic."""
admin = AdminClient(self.conf)
Expand Down
1 change: 1 addition & 0 deletions snapstream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from toolz.curried import compose, curry, last

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def get_variable(
Expand Down

0 comments on commit ca7f9c9

Please sign in to comment.