Skip to content

Commit

Permalink
Added the ability to publish items in IOGroup to pubsub, and made set…
Browse files Browse the repository at this point in the history
…ting defaults and deleting keys an option.
  • Loading branch information
washad committed Jun 19, 2019
1 parent b32057d commit efaa19a
Show file tree
Hide file tree
Showing 18 changed files with 123 additions and 28 deletions.
7 changes: 7 additions & 0 deletions build/lib/pyrediseasyio/abstract_reader_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

class AbstractReaderWriter(ABC):

@abstractmethod
def publish(self, value: str, channel: str = None):
""" Publishes a value to pub/sub """
pass

@abstractmethod
def read(self, addr: str) -> object:
"""This method provides a response to a read request, based on last load"""
Expand All @@ -15,3 +20,5 @@ def write(self, addr: str, value: object):





12 changes: 9 additions & 3 deletions build/lib/pyrediseasyio/io_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

class IOGroup(ReaderWriter):

def __init__(self, host='localhost', port=6379, db=0):
super().__init__(host=host, port=port, db=db)
def __init__(self, host='localhost', port=6379, db=0,
set_defaults_on_startup: bool = False,
delete_keys_on_startup: bool = False, **kwargs):

super().__init__(host=host, port=port, db=db, **kwargs)
member_names = [d for d in dir(self) if not d.startswith('__')]
self.members = []
for name in member_names:
Expand All @@ -16,7 +19,10 @@ def __init__(self, host='localhost', port=6379, db=0):
continue
attr._reader_writer = self
self.members.append(name)
attr.write(attr.default)
if delete_keys_on_startup:
self.delete_key(attr.addr)
if set_defaults_on_startup:
attr.write(attr.default)
except AttributeError:
pass

Expand Down
16 changes: 11 additions & 5 deletions build/lib/pyrediseasyio/reader_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@ def __init__(self, host='localhost', port=6379, db=0,
self._pubsub.subscribe(self._channels)
self.get_messages()

def flush_keys(self):
def flush_db(self):
self._server.flushdb()

def flush_all(self):
self._server.flushall()

def delete_key(self, key: str):
self._server.delete(key)

@staticmethod
def _get_channel_and_data_from_message(message: dict):
c = message.get('channel')
Expand All @@ -54,19 +60,19 @@ def get_next_message(self, get_subscription_message=False):
return None
return self._get_channel_and_data_from_message(message)

def get_messages(self, get_subscription_messages=False, limit=100):
def get_messages(self, include_subscription_messages=False, limit=100):
"""
Checks for waiting messages and returns all of them, an empty list if none are waiting.
:param get_subscription_messages: If this is true, messages will also include first-time subscription
:param include_subscription_messages: If this is true, messages will also include first-time subscription
notifications. It is generally better to leave this False as the data response won't match expectations.
:param limit: Applies a cap to the number of messages that will be returned.
:return: Returns a list of tuples (channel, data) or an empty list if no messages are found.
"""
messages = []
msg = self.get_next_message(get_subscription_messages)
msg = self.get_next_message(include_subscription_messages)
while msg is not None:
messages.append(msg)
msg = self.get_next_message(get_subscription_messages)
msg = self.get_next_message(include_subscription_messages)
if len(messages) >= limit:
break
return messages
Expand Down
6 changes: 6 additions & 0 deletions build/lib/pyrediseasyio/single_io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pyrediseasyio.abstract_reader_writer import AbstractReaderWriter
import threading
from str2bool import str2bool
import json

lock = threading.Lock()

Expand Down Expand Up @@ -71,6 +72,11 @@ def value(self):
def _convert_type(value):
return value

def publish(self, value, channel: str = None):
value = self._convert_type(value)
data = json.dumps({self.addr: value})
self._reader_writer.publish(data, channel)

def read(self):
if self._reader_writer is None:
return None
Expand Down
4 changes: 2 additions & 2 deletions build/lib/tests/test_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def _publish_many(self, count: int, wait: float = 0):
def setUp(self):
self.pubsub1 = ReaderWriter(channel="Channel1")
self.pubsub2 = ReaderWriter(channel="Channel1")
self.pubsub1.flush_keys()
self.pubsub2.flush_keys()
self.pubsub1.flush_db()
self.pubsub2.flush_db()

def test_get_single_message(self):
pubsub1, pubsub2 = self.pubsub1, self.pubsub2
Expand Down
27 changes: 24 additions & 3 deletions build/lib/tests/test_read_write.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import unittest

from pyrediseasyio.io_group import IOGroup
from pyrediseasyio.single_io import BooleanIO, IntIO, FloatIO
from pyrediseasyio.single_io import BooleanIO, IntIO, FloatIO, StringIO
from assertpy import assert_that


Expand All @@ -12,13 +11,20 @@ class TestGroup(IOGroup):
Int2 = IntIO("Integer 2", "Int2", default=34)
Float1 = FloatIO("Float 1", "Float1", default=1.2)

def __init__(self):
super().__init__(channel="TestChannel", delete_keys_on_startup = True)


class TestGroup2(IOGroup):
Bool1 = BooleanIO("Boolean 1", "Bool1", False)
Bool2 = BooleanIO("Boolean 2", "Bool2", True)
Int1 = IntIO("Integer 1", "Int1")
Int2 = IntIO("Integer 2", "Int2", default=34)
Float1 = FloatIO("Float 1", "Float1", default=1.2)
Message1 = StringIO("String 1", "String1")

def __init__(self):
super().__init__(channel="TestChannel", delete_keys_on_startup=True)


class TestGroup3(IOGroup):
Expand All @@ -28,7 +34,7 @@ def __init__(self, db=2):
self.Int1 = IntIO("Integer 1", "Int1")
self.Int2 = IntIO("Integer 2", "Int2", default=34)
self.Float1 = FloatIO("Float 1", "Float1", default=1.2)
super().__init__(db=db)
super().__init__(db=db, delete_keys_on_startup=True)


class TestReadWrite(unittest.TestCase):
Expand All @@ -37,6 +43,9 @@ def setUp(self):
self.group = TestGroup()
self.group2 = TestGroup2()
self.group3 = TestGroup3()
self.group.flush_db()
self.group2.flush_db()
self.group3.flush_db()

def test_that_defaults_are_applied(self):
group = self.group
Expand Down Expand Up @@ -123,4 +132,16 @@ def test_arithmatic_operations(self):
assert_that(group3.Float1 / group1.Float1).is_equal_to(2)
assert_that(group3.Float1 - group1.Float1).is_equal_to(10)

def test_that_pubsub_possible_on_io_group(self):
group = self.group
group2 = self.group2

group2.Message1.publish("This is a test")

msg = None
for m in group.listen():
msg = m
break

assert_that("This is a test" in msg[1]).is_true()

Binary file added dist/pyrediseasyio-0.0.11-py3-none-any.whl
Binary file not shown.
Binary file added dist/pyrediseasyio-0.0.11.tar.gz
Binary file not shown.
Binary file removed dist/pyrediseasyio-0.0.9-py3-none-any.whl
Binary file not shown.
Binary file removed dist/pyrediseasyio-0.0.9.tar.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion pyrediseasyio.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: pyrediseasyio
Version: 0.0.9
Version: 0.0.11
Summary: A set of tools for simplifying reading and writing of single values to/from Redis.
Home-page: https://github.com/washad/PyRedisEasyIO
Author: Steve Jackson
Expand Down
7 changes: 7 additions & 0 deletions pyrediseasyio/abstract_reader_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

class AbstractReaderWriter(ABC):

@abstractmethod
def publish(self, value: str, channel: str = None):
""" Publishes a value to pub/sub """
pass

@abstractmethod
def read(self, addr: str) -> object:
"""This method provides a response to a read request, based on last load"""
Expand All @@ -15,3 +20,5 @@ def write(self, addr: str, value: object):





12 changes: 9 additions & 3 deletions pyrediseasyio/io_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

class IOGroup(ReaderWriter):

def __init__(self, host='localhost', port=6379, db=0):
super().__init__(host=host, port=port, db=db)
def __init__(self, host='localhost', port=6379, db=0,
set_defaults_on_startup: bool = False,
delete_keys_on_startup: bool = False, **kwargs):

super().__init__(host=host, port=port, db=db, **kwargs)
member_names = [d for d in dir(self) if not d.startswith('__')]
self.members = []
for name in member_names:
Expand All @@ -16,7 +19,10 @@ def __init__(self, host='localhost', port=6379, db=0):
continue
attr._reader_writer = self
self.members.append(name)
attr.write(attr.default)
if delete_keys_on_startup:
self.delete_key(attr.addr)
if set_defaults_on_startup:
attr.write(attr.default)
except AttributeError:
pass

Expand Down
16 changes: 11 additions & 5 deletions pyrediseasyio/reader_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@ def __init__(self, host='localhost', port=6379, db=0,
self._pubsub.subscribe(self._channels)
self.get_messages()

def flush_keys(self):
def flush_db(self):
self._server.flushdb()

def flush_all(self):
self._server.flushall()

def delete_key(self, key: str):
self._server.delete(key)

@staticmethod
def _get_channel_and_data_from_message(message: dict):
c = message.get('channel')
Expand All @@ -54,19 +60,19 @@ def get_next_message(self, get_subscription_message=False):
return None
return self._get_channel_and_data_from_message(message)

def get_messages(self, get_subscription_messages=False, limit=100):
def get_messages(self, include_subscription_messages=False, limit=100):
"""
Checks for waiting messages and returns all of them, an empty list if none are waiting.
:param get_subscription_messages: If this is true, messages will also include first-time subscription
:param include_subscription_messages: If this is true, messages will also include first-time subscription
notifications. It is generally better to leave this False as the data response won't match expectations.
:param limit: Applies a cap to the number of messages that will be returned.
:return: Returns a list of tuples (channel, data) or an empty list if no messages are found.
"""
messages = []
msg = self.get_next_message(get_subscription_messages)
msg = self.get_next_message(include_subscription_messages)
while msg is not None:
messages.append(msg)
msg = self.get_next_message(get_subscription_messages)
msg = self.get_next_message(include_subscription_messages)
if len(messages) >= limit:
break
return messages
Expand Down
6 changes: 6 additions & 0 deletions pyrediseasyio/single_io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pyrediseasyio.abstract_reader_writer import AbstractReaderWriter
import threading
from str2bool import str2bool
import json

lock = threading.Lock()

Expand Down Expand Up @@ -71,6 +72,11 @@ def value(self):
def _convert_type(value):
return value

def publish(self, value, channel: str = None):
value = self._convert_type(value)
data = json.dumps({self.addr: value})
self._reader_writer.publish(data, channel)

def read(self):
if self._reader_writer is None:
return None
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pyrediseasyio",
version="0.0.9",
version="0.0.11",
author="Steve Jackson",
author_email="washad@gmail.com",
description="A set of tools for simplifying reading and writing of single values to/from Redis.",
Expand Down
7 changes: 5 additions & 2 deletions tests/test_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import time
import threading

from pyrediseasyio.io_group import IOGroup
from pyrediseasyio.reader_writer import ReaderWriter
from assertpy import assert_that




class PubSubTests(unittest.TestCase):

def _publish_many(self, count: int, wait: float = 0):
Expand All @@ -17,8 +20,8 @@ def _publish_many(self, count: int, wait: float = 0):
def setUp(self):
self.pubsub1 = ReaderWriter(channel="Channel1")
self.pubsub2 = ReaderWriter(channel="Channel1")
self.pubsub1.flush_keys()
self.pubsub2.flush_keys()
self.pubsub1.flush_db()
self.pubsub2.flush_db()

def test_get_single_message(self):
pubsub1, pubsub2 = self.pubsub1, self.pubsub2
Expand Down
27 changes: 24 additions & 3 deletions tests/test_read_write.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import unittest

from pyrediseasyio.io_group import IOGroup
from pyrediseasyio.single_io import BooleanIO, IntIO, FloatIO
from pyrediseasyio.single_io import BooleanIO, IntIO, FloatIO, StringIO
from assertpy import assert_that


Expand All @@ -12,13 +11,20 @@ class TestGroup(IOGroup):
Int2 = IntIO("Integer 2", "Int2", default=34)
Float1 = FloatIO("Float 1", "Float1", default=1.2)

def __init__(self):
super().__init__(channel="TestChannel", delete_keys_on_startup = True)


class TestGroup2(IOGroup):
Bool1 = BooleanIO("Boolean 1", "Bool1", False)
Bool2 = BooleanIO("Boolean 2", "Bool2", True)
Int1 = IntIO("Integer 1", "Int1")
Int2 = IntIO("Integer 2", "Int2", default=34)
Float1 = FloatIO("Float 1", "Float1", default=1.2)
Message1 = StringIO("String 1", "String1")

def __init__(self):
super().__init__(channel="TestChannel", delete_keys_on_startup=True)


class TestGroup3(IOGroup):
Expand All @@ -28,7 +34,7 @@ def __init__(self, db=2):
self.Int1 = IntIO("Integer 1", "Int1")
self.Int2 = IntIO("Integer 2", "Int2", default=34)
self.Float1 = FloatIO("Float 1", "Float1", default=1.2)
super().__init__(db=db)
super().__init__(db=db, delete_keys_on_startup=True)


class TestReadWrite(unittest.TestCase):
Expand All @@ -37,6 +43,9 @@ def setUp(self):
self.group = TestGroup()
self.group2 = TestGroup2()
self.group3 = TestGroup3()
self.group.flush_db()
self.group2.flush_db()
self.group3.flush_db()

def test_that_defaults_are_applied(self):
group = self.group
Expand Down Expand Up @@ -123,4 +132,16 @@ def test_arithmatic_operations(self):
assert_that(group3.Float1 / group1.Float1).is_equal_to(2)
assert_that(group3.Float1 - group1.Float1).is_equal_to(10)

def test_that_pubsub_possible_on_io_group(self):
group = self.group
group2 = self.group2

group2.Message1.publish("This is a test")

msg = None
for m in group.listen():
msg = m
break

assert_that("This is a test" in msg[1]).is_true()

0 comments on commit efaa19a

Please sign in to comment.