Skip to content

Commit

Permalink
Added rocksdict level locks. (#63)
Browse files Browse the repository at this point in the history
* Added rocksdict level locks.

* Can specify a pool of locks to increase concurrency.

* Fix types for curried functions.

* Cleanup.

* Updated docs.

* Added missing docstring.
  • Loading branch information
Menziess authored Mar 22, 2024
1 parent 56f0293 commit 685ab0e
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 38 deletions.
1 change: 0 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output

html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']
30 changes: 14 additions & 16 deletions docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Topic
Topic can be used to send and receive kafka messages.

- Data is sent to kafka when ``topic`` is called as a function
- Messages can be consumed by iterating over the ``topic`` object

::

Expand All @@ -29,7 +30,6 @@ Topic can be used to send and receive kafka messages.
for msg in topic:
print(msg.key(), msg.value().decode())

- Messages are consumed when ``topic`` is iterated over

::

Expand All @@ -44,6 +44,7 @@ Cache
Cache can be used to persist data.

- Data is cached when ``cache`` is called as a function
- Data is stored in sst files in the specified folder: ``db``

::

Expand All @@ -52,31 +53,32 @@ Cache can be used to persist data.
cache = Cache('db')

cache('prize', '🏆')

- Data is stored in sst files in the provided folder: ``db``

::

cache['phone'] = '📞'

for x, y in cache.items():
print(x, y)

- Cache is also subscriptable

::

phone 📞
prize 🏆

To avoid race conditions, lock database keys using the ``transaction`` context manager:

::

with cache.transaction('fish'):
cache['fish'] = '🐟'

Cache is a basic wrapper around `rocksdict <https://congyuwang.github.io/RocksDict/rocksdict.html>`_.

Conf
----

Conf can be used to set default kafka configurations.

- Conf is a Singleton class, there can only be a single instance
- Conf is a Singleton class, only one instance exists
- Configurations can be overridden per topic

::

Expand All @@ -101,8 +103,6 @@ Conf can be used to set default kafka configurations.
print(topic1.conf)
print(topic2.conf)

- Default configurations can be overridden per topic

::

{'bootstrap.servers': 'localhost:29092', 'group.id': 'default-demo'}
Expand Down Expand Up @@ -138,6 +138,8 @@ Timer

If there's no incoming data, generators can be used to trigger handler functions.

- The ``timer()`` function returns a generator that yields ``None`` every 1.0 seconds

::

from time import localtime, sleep, strftime
Expand All @@ -155,8 +157,6 @@ If there's no incoming data, generators can be used to trigger handler functions

stream()

- The ``timer()`` function returns a generator that yields ``None`` every 1.0 seconds

::

23:25:10
Expand All @@ -169,7 +169,7 @@ Codec

Codecs are used for serializing and deserializing data.

- Data that's passed to ``topic`` is automatically json serialized
- Using ``JsonCodec`` values are automatically converted to and from json

::

Expand All @@ -188,8 +188,6 @@ Codecs are used for serializing and deserializing data.
for msg in topic:
print(msg.value())

- Data that's read from ``topic`` is automatically deserialized

::

{'msg': '🐟'}
Expand Down
2 changes: 1 addition & 1 deletion docs/source/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Here's the hello-world application using Kafka:

stream()

Some basic principles to remember:
The principles remain the same:

- Any `iterable <https://pythonbasics.org/iterable/>`_ may act as a source of data
- Any callable can be used as a sink
Expand Down
14 changes: 10 additions & 4 deletions docs/source/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,20 @@ It uses:
def value(self):
return self.return_value

def timestamp(self):
return (1, dt.now().timestamp() * 1000)


class FakeCache(defaultdict):

def __init__(self, contents={}):
return super().__init__(lambda: None, contents)
def __init__(self, contents={}):
return super().__init__(lambda: None, contents)

def __call__(self, key, val, *args) -> None:
self.__setitem__(key, val)

def values(self, *args, **kwargs):
return super().values()
def values(self, *args, **kwargs):
return [_ for _ in super().values() if _]


def test_produce(mocker):
Expand Down
4 changes: 2 additions & 2 deletions snapstream/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def inspect_topic(entry: dict, args: Namespace):
key = msg.key().decode() if msg.key() is not None else ''
offset = msg.offset()
val = msg.value()
if key_filter(str(key)) and val_filter(str(val)): # pyright: ignore
if key_filter(str(key)) and val_filter(str(val)):
print()
if timestamp and timestamp < start_time:
print('>>> timestamp:', timestamp_str, '(catching up)')
Expand All @@ -194,7 +194,7 @@ def inspect_cache(entry: dict, args: Namespace):
key_filter = curry(regex_filter)(args.key_filter)
val_filter = curry(regex_filter)(args.val_filter)
for key, val in cache.items():
if key_filter(str(key)) and val_filter(str(val)): # pyright: ignore
if key_filter(str(key)) and val_filter(str(val)):
if args.columns and not isinstance(val, dict):
raise ValueError(f'Columns could not be extracted from {type(val)}: {val}')
print()
Expand Down
45 changes: 38 additions & 7 deletions snapstream/caching.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
"""Snapstream caching."""

import os
from contextlib import contextmanager
from threading import RLock
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from rocksdict import (AccessType, ColumnFamily, CompactOptions,
DBCompactionStyle, DBCompressionType,
FifoCompactOptions, IngestExternalFileOptions, Options,
Rdict, RdictIter, ReadOptions, Snapshot, WriteOptions)
from rocksdict import (
AccessType,
ColumnFamily,
CompactOptions,
DBCompactionStyle,
DBCompressionType,
FifoCompactOptions,
IngestExternalFileOptions,
Options,
Rdict,
RdictIter,
ReadOptions,
Snapshot,
WriteOptions,
)
from rocksdict.rocksdict import RdictItems, RdictKeys, RdictValues

MB, MINUTES = 1024 * 1024, 60
Expand All @@ -30,7 +43,8 @@ def __init__(
options: Union[Options, None] = None,
column_families: Union[Dict[str, Options], None] = None,
access_type=AccessType.read_write(),
target_table_size=25 * MB
target_table_size=25 * MB,
number_of_locks=16
) -> None:
"""Create instance that holds rocksdb reference.
Expand All @@ -40,6 +54,8 @@ def __init__(
https://congyuwang.github.io/RocksDict/rocksdict.html
"""
self.name = path
self._number_of_locks = number_of_locks
self._locks = [RLock() for _ in range(self._number_of_locks)]
options = options or self._default_options(target_table_size)
column_families = column_families or {
key: options
Expand Down Expand Up @@ -73,6 +89,13 @@ def _default_options(target_table_size: int):
options.set_delete_obsolete_files_period_micros(10 * 1000)
return options

@contextmanager
def _get_lock(self, key):
"""Get lock from a pool of locks based on key."""
index = hash(key) % self._number_of_locks
with (lock := self._locks[index]):
yield lock

def __call__(self, key, val, *args) -> None:
"""Call cache to set item."""
self.__setitem__(key, val)
Expand All @@ -94,7 +117,8 @@ def __getitem__(self, key) -> Any:

def __setitem__(self, key, val) -> None:
"""Set item in db."""
self.db[key] = val
with self._get_lock(key):
self.db[key] = val

def __enter__(self) -> 'Cache':
"""Contextmanager."""
Expand All @@ -120,6 +144,12 @@ def set_write_options(self, write_opt: WriteOptions) -> None:
"""Set custom write options."""
return self.db.set_write_options(write_opt)

@contextmanager
def transaction(self, key) -> Any:
"""Lock the db entry while using the context manager."""
with self._get_lock(key):
yield self

def get(
self,
key: Union[str, int, float, bytes, bool, List[str], List[int], List[float], List[bytes], List[bool]],
Expand All @@ -136,7 +166,8 @@ def put(
write_opt: Union[WriteOptions, None] = None
) -> None:
"""Put item in database using key."""
return self.db.put(key, value, write_opt)
with self._get_lock(key):
return self.db.put(key, value, write_opt)

def delete(
self,
Expand Down
4 changes: 4 additions & 0 deletions snapstream/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from avro.schema import Schema, parse
from toolz import curry

from snapstream.utils import with_type_hint

logger = logging.getLogger(__name__)


Expand All @@ -24,6 +26,7 @@ def serialize_json(msg: dict) -> bytes:
return dumped.encode()


@with_type_hint
@curry
def deserialize_avro(schema: Schema, msg: bytes) -> object:
"""Deserialize avro message using provided schema."""
Expand All @@ -37,6 +40,7 @@ def deserialize_avro(schema: Schema, msg: bytes) -> object:
raise


@with_type_hint
@curry
def serialize_avro(schema: Schema, msg: dict) -> bytes:
"""Serialize avro message using provided schema."""
Expand Down
12 changes: 10 additions & 2 deletions snapstream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@
from queue import Queue
from re import sub
from threading import Thread, current_thread
from typing import (Any, Callable, Dict, Iterable, Iterator, Optional, Set,
Tuple)
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
Optional,
Set,
Tuple,
)

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
Expand Down
11 changes: 10 additions & 1 deletion snapstream/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from os import environ, getenv, listdir
from pathlib import Path
from re import match, sub
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional

from toolz.curried import compose, curry, last

Expand Down Expand Up @@ -113,3 +113,12 @@ def filter(self, record):
and
'property and will be ignored' in record.getMessage()
)


def with_type_hint(func: Callable[..., Any]) -> Callable[..., Any]:
"""Retain function types after applying decorators."""
def decorator(curried_func: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(*args: Any, **kwargs: Any) -> Any:
return curried_func(*args, **kwargs)
return wrapper
return decorator(func)
33 changes: 33 additions & 0 deletions tests/test_caching.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from threading import Thread
from time import sleep

import pytest


Expand Down Expand Up @@ -33,3 +36,33 @@ def test_iterability(cache):
assert list(cache.keys()) == [123]
assert list(cache.values()) == [123]
assert list(cache.items()) == [(123, 123)]


def test_transaction(cache):
"""Test transaction."""
key, result = '123', []

def try_access_locked_cache():
result.append(cache[key])
cache[key] = 'b'
result.append(cache[key])

t = Thread(target=try_access_locked_cache)

with cache.transaction(key):
cache[key] = 'a'

# Within the transaction, we read and alter cache[key] and add
# its value to the result list, alterations shouldn't work
t.start()
t.join(timeout=0.01)
if t.is_alive():
result.append("Timeout")

assert result == ['a', 'Timeout']
assert cache[key] == 'a'

# The thread is still running here, so outside of the
# transaction it will eventually succeed to add 'b'
sleep(0.01)
assert cache[key] == 'b'
14 changes: 10 additions & 4 deletions tests/test_codecs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from avro.schema import parse
import pytest
from avro.schema import parse

from snapstream.codecs import (AvroCodec, ICodec, JsonCodec, deserialize_avro,
deserialize_json, serialize_avro,
serialize_json)
from snapstream.codecs import (
AvroCodec,
ICodec,
JsonCodec,
deserialize_avro,
deserialize_json,
serialize_avro,
serialize_json,
)


def test_deserialize_json(raw_msg, json_msg):
Expand Down

0 comments on commit 685ab0e

Please sign in to comment.