diff --git a/client/lib/main.dart b/client/lib/main.dart index 86ef16695..84874e416 100644 --- a/client/lib/main.dart +++ b/client/lib/main.dart @@ -20,10 +20,10 @@ import 'web_socket_client.dart'; const bool isProduction = bool.fromEnvironment('dart.vm.product'); void main([List? args]) async { - // if (isProduction) { - // // ignore: avoid_returning_null_for_void - // debugPrint = (String? message, {int? wrapWidth}) => null; - // } + if (isProduction) { + // ignore: avoid_returning_null_for_void + debugPrint = (String? message, {int? wrapWidth}) => null; + } await setupDesktop(); diff --git a/sdk/python/flet/__init__.py b/sdk/python/flet/__init__.py index c60e2d595..2b6f43ac0 100644 --- a/sdk/python/flet/__init__.py +++ b/sdk/python/flet/__init__.py @@ -26,6 +26,7 @@ from flet.popup_menu_button import PopupMenuButton, PopupMenuItem from flet.progress_bar import ProgressBar from flet.progress_ring import ProgressRing +from flet.pubsub import PubSub from flet.radio import Radio from flet.radio_group import RadioGroup from flet.ref import Ref diff --git a/sdk/python/flet/connection.py b/sdk/python/flet/connection.py index 886d12f09..6fe0408d4 100644 --- a/sdk/python/flet/connection.py +++ b/sdk/python/flet/connection.py @@ -4,6 +4,7 @@ import uuid from flet.protocol import * +from flet.pubsub import PubSubHub from flet.reconnecting_websocket import ReconnectingWebSocket @@ -18,6 +19,7 @@ def __init__(self, ws: ReconnectingWebSocket): self.page_name = None self.page_url = None self.sessions = {} + self.pubsubhub = PubSubHub() @property def on_event(self): diff --git a/sdk/python/flet/page.py b/sdk/python/flet/page.py index dc5a83121..c47f64ca7 100644 --- a/sdk/python/flet/page.py +++ b/sdk/python/flet/page.py @@ -22,6 +22,7 @@ from flet.control_event import ControlEvent from flet.floating_action_button import FloatingActionButton from flet.protocol import Command +from flet.pubsub import PubSub from flet.snack_bar import SnackBar from flet.theme import Theme @@ -49,12 +50,12 @@ def __init__(self, conn: Connection, session_id): self._last_event = None self._event_available = threading.Event() self._fetch_page_details() - self.lock = threading.Lock() self.__offstage = Offstage() self.__appbar = None self.__theme = None self.__dark_theme = None + self.__pubsub = PubSub(conn.pubsubhub, session_id) def __enter__(self): return self @@ -243,6 +244,11 @@ def index(self): def session_id(self): return self._session_id + # pubsub + @property + def pubsub(self): + return self.__pubsub + # controls @property def controls(self): diff --git a/sdk/python/flet/pubsub.py b/sdk/python/flet/pubsub.py new file mode 100644 index 000000000..e202a50d8 --- /dev/null +++ b/sdk/python/flet/pubsub.py @@ -0,0 +1,140 @@ +import logging +import threading +from typing import Callable, Dict, Iterable + + +class PubSubHub: + def __init__(self): + self.__lock = threading.Lock() + self.__subscribers: Dict[str, Callable] = {} # key: session_id, value: handler + self.__topic_subscribers: Dict[ + str, Dict[str, Callable] + ] = {} # key: topic, value: dict[session_id, handler] + self.__subscriber_topics: Dict[ + str, Dict[str, Callable] + ] = {} # key: session_id, value: dict[topic, handler] + + def send_all(self, message: any): + logging.debug(f"pubsub.send_all({message})") + with self.__lock: + for handler in self.__subscribers.values(): + self.__send(handler, [message]) + + def send_all_on_topic(self, topic: str, message: any): + logging.debug(f"pubsub.send_all_on_topic({topic}, {message})") + with self.__lock: + if topic in self.__topic_subscribers: + for handler in self.__topic_subscribers[topic].values(): + self.__send(handler, [topic, message]) + + def send_others(self, except_session_id: str, message: any): + logging.debug(f"pubsub.send_others({except_session_id}, {message})") + with self.__lock: + for session_id, handler in self.__subscribers.items(): + if except_session_id != session_id: + self.__send(handler, [message]) + + def send_others_on_topic(self, except_session_id: str, topic: str, message: any): + logging.debug( + f"pubsub.send_others_on_topic({except_session_id}, {topic}, {message})" + ) + with self.__lock: + if topic in self.__topic_subscribers: + for session_id, handler in self.__topic_subscribers[topic].values(): + if except_session_id != session_id: + self.__send(handler, [topic, message]) + + def subscribe(self, session_id: str, handler: Callable): + logging.debug(f"pubsub.subscribe({session_id})") + with self.__lock: + self.__subscribers[session_id] = handler + + def subscribe_topic(self, session_id: str, topic: str, handler: Callable): + logging.debug(f"pubsub.subscribe_topic({session_id}, {topic})") + with self.__lock: + topic_subscribers = self.__topic_subscribers.get(topic) + if topic_subscribers == None: + topic_subscribers = {} + self.__topic_subscribers[topic] = topic_subscribers + topic_subscribers[session_id] = handler + subscriber_topics = self.__subscriber_topics.get(session_id) + if subscriber_topics == None: + subscriber_topics = {} + self.__subscriber_topics[session_id] = subscriber_topics + subscriber_topics[topic] = handler + + def unsubscribe(self, session_id: str): + logging.debug(f"pubsub.unsubscribe({session_id})") + with self.__lock: + self.__unsubscribe(session_id) + + def unsubscribe_topic(self, session_id: str, topic: str): + logging.debug(f"pubsub.unsubscribe({session_id}, {topic})") + with self.__lock: + self.__unsubscribe_topic(session_id, topic) + + def unsubscribe_all(self, session_id: str): + logging.debug(f"pubsub.unsubscribe_all({session_id})") + with self.__lock: + self.__unsubscribe(session_id) + if session_id in self.__subscriber_topics: + for topic in self.__subscriber_topics[session_id].keys(): + self.__unsubscribe_topic(session_id, topic) + + def __unsubscribe(self, session_id: str): + logging.debug(f"pubsub.__unsubscribe({session_id})") + self.__subscribers.pop(session_id) + + def __unsubscribe_topic(self, session_id: str, topic: str): + logging.debug(f"pubsub.__unsubscribe_topic({session_id}, {topic})") + topic_subscribers = self.__topic_subscribers.get(topic) + if topic_subscribers != None: + topic_subscribers.pop(session_id) + if len(topic_subscribers) == 0: + self.__topic_subscribers.pop(topic) + subscriber_topics = self.__subscriber_topics.get(session_id) + if subscriber_topics != None: + subscriber_topics.pop(topic) + if len(subscriber_topics) == 0: + self.__subscriber_topics.pop(session_id) + + def __send(self, handler: Callable, args: Iterable): + th = threading.Thread( + target=handler, + args=args, + daemon=True, + ) + th.start() + + +class PubSub: + def __init__(self, pubsub: PubSubHub, session_id: str): + self.__pubsub = pubsub + self.__session_id = session_id + + def send_all(self, message: any): + self.__pubsub.send_all(message) + + def send_all_on_topic(self, topic: str, message: any): + self.__pubsub.send_all_on_topic(topic, message) + + def send_others(self, message: any): + self.__pubsub.send_others(self.__session_id, message) + + def send_others_on_topic(self, topic: str, message: any): + self.__pubsub.send_others_on_topic(self.__session_id, topic, message) + + def subscribe(self, handler: Callable): + self.__pubsub.subscribe(self.__session_id, handler) + + def subscribe_topic(self, topic: str, handler: Callable): + self.__pubsub.subscribe_topic(self.__session_id, topic, handler) + + def unsubscribe(self): + self.__pubsub.unsubscribe(self.__session_id) + + def unsubscribe_topic(self, topic: str): + self.__pubsub.unsubscribe_topic(self.__session_id, topic) + + def unsubscribe_all(self): + self.__pubsub.unsubscribe_all(self.__session_id)