Skip to content

PubSub #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import 'web_socket_client.dart';
const bool isProduction = bool.fromEnvironment('dart.vm.product');

void main([List<String>? args]) async {
// if (isProduction) {
// // ignore: avoid_returning_null_for_void
// debugPrint = (String? message, {int? wrapWidth}) => null;
// }
if (isProduction) {
// ignore: avoid_returning_null_for_void
debugPrint = (String? message, {int? wrapWidth}) => null;
}

await setupDesktop();

Expand Down
1 change: 1 addition & 0 deletions sdk/python/flet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from flet.popup_menu_button import PopupMenuButton, PopupMenuItem
from flet.progress_bar import ProgressBar
from flet.progress_ring import ProgressRing
from flet.pubsub import PubSub
from flet.radio import Radio
from flet.radio_group import RadioGroup
from flet.ref import Ref
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/flet/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid

from flet.protocol import *
from flet.pubsub import PubSubHub
from flet.reconnecting_websocket import ReconnectingWebSocket


Expand All @@ -18,6 +19,7 @@ def __init__(self, ws: ReconnectingWebSocket):
self.page_name = None
self.page_url = None
self.sessions = {}
self.pubsubhub = PubSubHub()

@property
def on_event(self):
Expand Down
8 changes: 7 additions & 1 deletion sdk/python/flet/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from flet.control_event import ControlEvent
from flet.floating_action_button import FloatingActionButton
from flet.protocol import Command
from flet.pubsub import PubSub
from flet.snack_bar import SnackBar
from flet.theme import Theme

Expand Down Expand Up @@ -49,12 +50,12 @@ def __init__(self, conn: Connection, session_id):
self._last_event = None
self._event_available = threading.Event()
self._fetch_page_details()
self.lock = threading.Lock()

self.__offstage = Offstage()
self.__appbar = None
self.__theme = None
self.__dark_theme = None
self.__pubsub = PubSub(conn.pubsubhub, session_id)

def __enter__(self):
return self
Expand Down Expand Up @@ -243,6 +244,11 @@ def index(self):
def session_id(self):
return self._session_id

# pubsub
@property
def pubsub(self):
return self.__pubsub

# controls
@property
def controls(self):
Expand Down
140 changes: 140 additions & 0 deletions sdk/python/flet/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import logging
import threading
from typing import Callable, Dict, Iterable


class PubSubHub:
def __init__(self):
self.__lock = threading.Lock()
self.__subscribers: Dict[str, Callable] = {} # key: session_id, value: handler
self.__topic_subscribers: Dict[
str, Dict[str, Callable]
] = {} # key: topic, value: dict[session_id, handler]
self.__subscriber_topics: Dict[
str, Dict[str, Callable]
] = {} # key: session_id, value: dict[topic, handler]

def send_all(self, message: any):
logging.debug(f"pubsub.send_all({message})")
with self.__lock:
for handler in self.__subscribers.values():
self.__send(handler, [message])

def send_all_on_topic(self, topic: str, message: any):
logging.debug(f"pubsub.send_all_on_topic({topic}, {message})")
with self.__lock:
if topic in self.__topic_subscribers:
for handler in self.__topic_subscribers[topic].values():
self.__send(handler, [topic, message])

def send_others(self, except_session_id: str, message: any):
logging.debug(f"pubsub.send_others({except_session_id}, {message})")
with self.__lock:
for session_id, handler in self.__subscribers.items():
if except_session_id != session_id:
self.__send(handler, [message])

def send_others_on_topic(self, except_session_id: str, topic: str, message: any):
logging.debug(
f"pubsub.send_others_on_topic({except_session_id}, {topic}, {message})"
)
with self.__lock:
if topic in self.__topic_subscribers:
for session_id, handler in self.__topic_subscribers[topic].values():
if except_session_id != session_id:
self.__send(handler, [topic, message])

def subscribe(self, session_id: str, handler: Callable):
logging.debug(f"pubsub.subscribe({session_id})")
with self.__lock:
self.__subscribers[session_id] = handler

def subscribe_topic(self, session_id: str, topic: str, handler: Callable):
logging.debug(f"pubsub.subscribe_topic({session_id}, {topic})")
with self.__lock:
topic_subscribers = self.__topic_subscribers.get(topic)
if topic_subscribers == None:
topic_subscribers = {}
self.__topic_subscribers[topic] = topic_subscribers
topic_subscribers[session_id] = handler
subscriber_topics = self.__subscriber_topics.get(session_id)
if subscriber_topics == None:
subscriber_topics = {}
self.__subscriber_topics[session_id] = subscriber_topics
subscriber_topics[topic] = handler

def unsubscribe(self, session_id: str):
logging.debug(f"pubsub.unsubscribe({session_id})")
with self.__lock:
self.__unsubscribe(session_id)

def unsubscribe_topic(self, session_id: str, topic: str):
logging.debug(f"pubsub.unsubscribe({session_id}, {topic})")
with self.__lock:
self.__unsubscribe_topic(session_id, topic)

def unsubscribe_all(self, session_id: str):
logging.debug(f"pubsub.unsubscribe_all({session_id})")
with self.__lock:
self.__unsubscribe(session_id)
if session_id in self.__subscriber_topics:
for topic in self.__subscriber_topics[session_id].keys():
self.__unsubscribe_topic(session_id, topic)

def __unsubscribe(self, session_id: str):
logging.debug(f"pubsub.__unsubscribe({session_id})")
self.__subscribers.pop(session_id)

def __unsubscribe_topic(self, session_id: str, topic: str):
logging.debug(f"pubsub.__unsubscribe_topic({session_id}, {topic})")
topic_subscribers = self.__topic_subscribers.get(topic)
if topic_subscribers != None:
topic_subscribers.pop(session_id)
if len(topic_subscribers) == 0:
self.__topic_subscribers.pop(topic)
subscriber_topics = self.__subscriber_topics.get(session_id)
if subscriber_topics != None:
subscriber_topics.pop(topic)
if len(subscriber_topics) == 0:
self.__subscriber_topics.pop(session_id)

def __send(self, handler: Callable, args: Iterable):
th = threading.Thread(
target=handler,
args=args,
daemon=True,
)
th.start()


class PubSub:
def __init__(self, pubsub: PubSubHub, session_id: str):
self.__pubsub = pubsub
self.__session_id = session_id

def send_all(self, message: any):
self.__pubsub.send_all(message)

def send_all_on_topic(self, topic: str, message: any):
self.__pubsub.send_all_on_topic(topic, message)

def send_others(self, message: any):
self.__pubsub.send_others(self.__session_id, message)

def send_others_on_topic(self, topic: str, message: any):
self.__pubsub.send_others_on_topic(self.__session_id, topic, message)

def subscribe(self, handler: Callable):
self.__pubsub.subscribe(self.__session_id, handler)

def subscribe_topic(self, topic: str, handler: Callable):
self.__pubsub.subscribe_topic(self.__session_id, topic, handler)

def unsubscribe(self):
self.__pubsub.unsubscribe(self.__session_id)

def unsubscribe_topic(self, topic: str):
self.__pubsub.unsubscribe_topic(self.__session_id, topic)

def unsubscribe_all(self):
self.__pubsub.unsubscribe_all(self.__session_id)