Skip to content

Commit

Permalink
PubSub (#19)
Browse files Browse the repository at this point in the history
* Disable debug messages in release build

* PubSub draft

* PubSub done

* on_message signature with topic
  • Loading branch information
FeodorFitsner authored Jun 10, 2022
1 parent 81d719b commit 3e2de16
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 5 deletions.
8 changes: 4 additions & 4 deletions client/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import 'web_socket_client.dart';
const bool isProduction = bool.fromEnvironment('dart.vm.product');

void main([List<String>? args]) async {
// if (isProduction) {
// // ignore: avoid_returning_null_for_void
// debugPrint = (String? message, {int? wrapWidth}) => null;
// }
if (isProduction) {
// ignore: avoid_returning_null_for_void
debugPrint = (String? message, {int? wrapWidth}) => null;
}

await setupDesktop();

Expand Down
1 change: 1 addition & 0 deletions sdk/python/flet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from flet.popup_menu_button import PopupMenuButton, PopupMenuItem
from flet.progress_bar import ProgressBar
from flet.progress_ring import ProgressRing
from flet.pubsub import PubSub
from flet.radio import Radio
from flet.radio_group import RadioGroup
from flet.ref import Ref
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/flet/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid

from flet.protocol import *
from flet.pubsub import PubSubHub
from flet.reconnecting_websocket import ReconnectingWebSocket


Expand All @@ -18,6 +19,7 @@ def __init__(self, ws: ReconnectingWebSocket):
self.page_name = None
self.page_url = None
self.sessions = {}
self.pubsubhub = PubSubHub()

@property
def on_event(self):
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/flet/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from flet.control_event import ControlEvent
from flet.floating_action_button import FloatingActionButton
from flet.protocol import Command
from flet.pubsub import PubSub
from flet.snack_bar import SnackBar
from flet.theme import Theme

Expand Down Expand Up @@ -49,12 +50,12 @@ def __init__(self, conn: Connection, session_id):
self._last_event = None
self._event_available = threading.Event()
self._fetch_page_details()
self.lock = threading.Lock()

self.__offstage = Offstage()
self.__appbar = None
self.__theme = None
self.__dark_theme = None
self.__pubsub = PubSub(conn.pubsubhub, session_id)

def __enter__(self):
return self
Expand Down Expand Up @@ -243,6 +244,11 @@ def index(self):
def session_id(self):
return self._session_id

# pubsub
@property
def pubsub(self):
return self.__pubsub

# controls
@property
def controls(self):
Expand Down
140 changes: 140 additions & 0 deletions sdk/python/flet/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import logging
import threading
from typing import Callable, Dict, Iterable


class PubSubHub:
def __init__(self):
self.__lock = threading.Lock()
self.__subscribers: Dict[str, Callable] = {} # key: session_id, value: handler
self.__topic_subscribers: Dict[
str, Dict[str, Callable]
] = {} # key: topic, value: dict[session_id, handler]
self.__subscriber_topics: Dict[
str, Dict[str, Callable]
] = {} # key: session_id, value: dict[topic, handler]

def send_all(self, message: any):
logging.debug(f"pubsub.send_all({message})")
with self.__lock:
for handler in self.__subscribers.values():
self.__send(handler, [message])

def send_all_on_topic(self, topic: str, message: any):
logging.debug(f"pubsub.send_all_on_topic({topic}, {message})")
with self.__lock:
if topic in self.__topic_subscribers:
for handler in self.__topic_subscribers[topic].values():
self.__send(handler, [topic, message])

def send_others(self, except_session_id: str, message: any):
logging.debug(f"pubsub.send_others({except_session_id}, {message})")
with self.__lock:
for session_id, handler in self.__subscribers.items():
if except_session_id != session_id:
self.__send(handler, [message])

def send_others_on_topic(self, except_session_id: str, topic: str, message: any):
logging.debug(
f"pubsub.send_others_on_topic({except_session_id}, {topic}, {message})"
)
with self.__lock:
if topic in self.__topic_subscribers:
for session_id, handler in self.__topic_subscribers[topic].values():
if except_session_id != session_id:
self.__send(handler, [topic, message])

def subscribe(self, session_id: str, handler: Callable):
logging.debug(f"pubsub.subscribe({session_id})")
with self.__lock:
self.__subscribers[session_id] = handler

def subscribe_topic(self, session_id: str, topic: str, handler: Callable):
logging.debug(f"pubsub.subscribe_topic({session_id}, {topic})")
with self.__lock:
topic_subscribers = self.__topic_subscribers.get(topic)
if topic_subscribers == None:
topic_subscribers = {}
self.__topic_subscribers[topic] = topic_subscribers
topic_subscribers[session_id] = handler
subscriber_topics = self.__subscriber_topics.get(session_id)
if subscriber_topics == None:
subscriber_topics = {}
self.__subscriber_topics[session_id] = subscriber_topics
subscriber_topics[topic] = handler

def unsubscribe(self, session_id: str):
logging.debug(f"pubsub.unsubscribe({session_id})")
with self.__lock:
self.__unsubscribe(session_id)

def unsubscribe_topic(self, session_id: str, topic: str):
logging.debug(f"pubsub.unsubscribe({session_id}, {topic})")
with self.__lock:
self.__unsubscribe_topic(session_id, topic)

def unsubscribe_all(self, session_id: str):
logging.debug(f"pubsub.unsubscribe_all({session_id})")
with self.__lock:
self.__unsubscribe(session_id)
if session_id in self.__subscriber_topics:
for topic in self.__subscriber_topics[session_id].keys():
self.__unsubscribe_topic(session_id, topic)

def __unsubscribe(self, session_id: str):
logging.debug(f"pubsub.__unsubscribe({session_id})")
self.__subscribers.pop(session_id)

def __unsubscribe_topic(self, session_id: str, topic: str):
logging.debug(f"pubsub.__unsubscribe_topic({session_id}, {topic})")
topic_subscribers = self.__topic_subscribers.get(topic)
if topic_subscribers != None:
topic_subscribers.pop(session_id)
if len(topic_subscribers) == 0:
self.__topic_subscribers.pop(topic)
subscriber_topics = self.__subscriber_topics.get(session_id)
if subscriber_topics != None:
subscriber_topics.pop(topic)
if len(subscriber_topics) == 0:
self.__subscriber_topics.pop(session_id)

def __send(self, handler: Callable, args: Iterable):
th = threading.Thread(
target=handler,
args=args,
daemon=True,
)
th.start()


class PubSub:
def __init__(self, pubsub: PubSubHub, session_id: str):
self.__pubsub = pubsub
self.__session_id = session_id

def send_all(self, message: any):
self.__pubsub.send_all(message)

def send_all_on_topic(self, topic: str, message: any):
self.__pubsub.send_all_on_topic(topic, message)

def send_others(self, message: any):
self.__pubsub.send_others(self.__session_id, message)

def send_others_on_topic(self, topic: str, message: any):
self.__pubsub.send_others_on_topic(self.__session_id, topic, message)

def subscribe(self, handler: Callable):
self.__pubsub.subscribe(self.__session_id, handler)

def subscribe_topic(self, topic: str, handler: Callable):
self.__pubsub.subscribe_topic(self.__session_id, topic, handler)

def unsubscribe(self):
self.__pubsub.unsubscribe(self.__session_id)

def unsubscribe_topic(self, topic: str):
self.__pubsub.unsubscribe_topic(self.__session_id, topic)

def unsubscribe_all(self):
self.__pubsub.unsubscribe_all(self.__session_id)

0 comments on commit 3e2de16

Please sign in to comment.