Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add a module API to send an HTTP push notification
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Velten committed Apr 4, 2023
1 parent 6d10337 commit debbc8d
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 61 deletions.
1 change: 1 addition & 0 deletions changelog.d/15387.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a module API to send an HTTP push notification.
35 changes: 35 additions & 0 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Generator,
Iterable,
List,
Mapping,
Optional,
Tuple,
TypeVar,
Expand Down Expand Up @@ -105,6 +106,7 @@
ON_LEGACY_SEND_MAIL_CALLBACK,
ON_USER_REGISTRATION_CALLBACK,
)
from synapse.push.httppusher import HttpPusher
from synapse.rest.client.login import LoginResponse
from synapse.storage import DataStore
from synapse.storage.background_updates import (
Expand Down Expand Up @@ -248,6 +250,7 @@ def __init__(self, hs: "HomeServer", auth_handler: AuthHandler) -> None:
self._registration_handler = hs.get_registration_handler()
self._send_email_handler = hs.get_send_email_handler()
self._push_rules_handler = hs.get_push_rules_handler()
self._pusherpool = hs.get_pusherpool()
self._device_handler = hs.get_device_handler()
self.custom_template_dir = hs.config.server.custom_template_directory
self._callbacks = hs.get_module_api_callbacks()
Expand Down Expand Up @@ -1226,6 +1229,38 @@ async def sleep(self, seconds: float) -> None:

await self._clock.sleep(seconds)

async def send_http_push_notification(
self,
user_id: str,
device_id: str,
content: JsonDict,
tweaks: Mapping[str, str] = {},
) -> bool:
"""Send an HTTP push notification that is forwarded to the registered push gateway
for the specified device.
Added in Synapse v1.82.0.
Args:
user_id: The user ID of the device where to send the push notification.
device_id: The device ID of the device where to send the push notification.
content: A dict of values that will be put in the `notification` field of the push
(cf Push Gatway spec). `devices` field will be overrided if included.
tweaks: A dict of `tweaks` that will be inserted in the `devices` section, cf spec.
Returns:
True if at least one push was succesfully sent, False in case of error or if no
pusher is registered for the specified device.
"""
sent = False
if user_id in self._pusherpool.pushers:
for p in self._pusherpool.pushers[user_id].values():
if isinstance(p, HttpPusher) and p.device_id == device_id:
res = await p.dispatch_push(content, tweaks)
if res is not False:
sent = True
return sent

async def send_mail(
self,
recipient: str,
Expand Down
134 changes: 73 additions & 61 deletions synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Tuple, Union

from prometheus_client import Counter

Expand All @@ -27,6 +27,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.storage.databases.main.event_push_actions import HttpPushAction
from synapse.types import JsonDict

from . import push_tools

Expand Down Expand Up @@ -56,7 +57,7 @@
)


def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]:
def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, str]:
"""
Converts a list of actions into a `tweaks` dict (which can then be passed to
the push gateway).
Expand Down Expand Up @@ -101,6 +102,7 @@ def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
self._storage_controllers = self.hs.get_storage_controllers()
self.app_display_name = pusher_config.app_display_name
self.device_display_name = pusher_config.device_display_name
self.device_id = pusher_config.device_id
self.pushkey_ts = pusher_config.ts
self.data = pusher_config.data
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
Expand Down Expand Up @@ -324,7 +326,7 @@ async def _process_one(self, push_action: HttpPushAction) -> bool:
event = await self.store.get_event(push_action.event_id, allow_none=True)
if event is None:
return True # It's been redacted
rejected = await self.dispatch_push(event, tweaks, badge)
rejected = await self.dispatch_push_event(event, tweaks, badge)
if rejected is False:
return False

Expand All @@ -342,9 +344,12 @@ async def _process_one(self, push_action: HttpPushAction) -> bool:
await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
return True

async def _build_notification_dict(
self, event: EventBase, tweaks: Dict[str, bool], badge: int
) -> Dict[str, Any]:
async def _build_event_notification(
self,
event: EventBase,
tweaks: Mapping[str, str],
badge: int,
) -> Tuple[JsonDict, Mapping[str, str]]:
priority = "low"
if (
event.type == EventTypes.Encrypted
Expand All @@ -358,80 +363,70 @@ async def _build_notification_dict(
# This was checked in the __init__, but mypy doesn't seem to know that.
assert self.data is not None
if self.data.get("format") == "event_id_only":
d: Dict[str, Any] = {
"notification": {
"event_id": event.event_id,
"room_id": event.room_id,
"counts": {"unread": badge},
"prio": priority,
"devices": [
{
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": self.data_minus_url,
}
],
}
content: JsonDict = {
"event_id": event.event_id,
"room_id": event.room_id,
"counts": {"unread": badge},
"prio": priority,
}
return d
return content, {}

ctx = await push_tools.get_context_for_event(
self._storage_controllers, event, self.user_id
)

d = {
"notification": {
"id": event.event_id, # deprecated: remove soon
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"sender": event.user_id,
"prio": priority,
"counts": {
"unread": badge,
# 'missed_calls': 2
},
"devices": [
{
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": self.data_minus_url,
"tweaks": tweaks,
}
],
}
content = {
"id": event.event_id, # deprecated: remove soon
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"sender": event.user_id,
"prio": priority,
"counts": {
"unread": badge,
# 'missed_calls': 2
},
}
if event.type == "m.room.member" and event.is_state():
d["notification"]["membership"] = event.content["membership"]
d["notification"]["user_is_target"] = event.state_key == self.user_id
content["membership"] = event.content["membership"]
content["user_is_target"] = event.state_key == self.user_id
if self.hs.config.push.push_include_content and event.content:
d["notification"]["content"] = event.content
content["content"] = event.content

# We no longer send aliases separately, instead, we send the human
# readable name of the room, which may be an alias.
if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
d["notification"]["sender_display_name"] = ctx["sender_display_name"]
content["sender_display_name"] = ctx["sender_display_name"]
if "name" in ctx and len(ctx["name"]) > 0:
d["notification"]["room_name"] = ctx["name"]
content["room_name"] = ctx["name"]

return (content, tweaks)

def _build_notification_dict(
self, content: JsonDict, tweaks: Mapping[str, str]
) -> JsonDict:
device = {
"app_id": self.app_id,
"pushkey": self.pushkey,
"pushkey_ts": int(self.pushkey_ts / 1000),
"data": self.data_minus_url,
}
if tweaks:
device["tweaks"] = tweaks

return d
content["devices"] = [device]

return {"notification": content}

async def dispatch_push(
self, event: EventBase, tweaks: Dict[str, bool], badge: int
self, content: JsonDict, tweaks: Mapping[str, str] = {}
) -> Union[bool, Iterable[str]]:
notification_dict = await self._build_notification_dict(event, tweaks, badge)
if not notification_dict:
return []
notif_dict = self._build_notification_dict(content, tweaks)
try:
resp = await self.http_client.post_json_get_json(
self.url, notification_dict
)
resp = await self.http_client.post_json_get_json(self.url, notif_dict)
except Exception as e:
logger.warning(
"Failed to push event %s to %s: %s %s",
event.event_id,
"Failed to push data to %s: %s %s",
self.name,
type(e),
e,
Expand All @@ -440,10 +435,27 @@ async def dispatch_push(
rejected = []
if "rejected" in resp:
rejected = resp["rejected"]
if not rejected:
self.badge_count_last_call = badge
return rejected

async def dispatch_push_event(
self,
event: EventBase,
tweaks: Mapping[str, str],
badge: int,
) -> Union[bool, Iterable[str]]:
content, tweaks = await self._build_event_notification(event, tweaks, badge)
if not content:
return []

res = await self.dispatch_push(content, tweaks)

if res is False:
return False
if not res:
self.badge_count_last_call = badge

return res

async def _send_badge(self, badge: int) -> None:
"""
Args:
Expand Down

0 comments on commit debbc8d

Please sign in to comment.