Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support wildcard device_ids for direct to device messages #1084

Merged
merged 1 commit into from
Sep 8, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions synapse/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,33 +130,51 @@ def add_messages_txn(txn, now_ms, stream_id):

def _add_messages_to_local_device_inbox_txn(self, txn, stream_id,
messages_by_user_then_device):
local_users_and_devices = set()
local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
messages_json_for_user = {}
devices = messages_by_device.keys()
sql = (
"SELECT user_id, device_id FROM devices"
" WHERE user_id = ? AND device_id IN ("
+ ",".join("?" * len(devices))
+ ")"
)
# TODO: Maybe this needs to be done in batches if there are
# too many local devices for a given user.
txn.execute(sql, [user_id] + devices)
local_users_and_devices.update(map(tuple, txn.fetchall()))
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
sql = (
"SELECT device_id FROM devices"
" WHERE user_id = ?"
)
txn.execute(sql, (user_id,))
message_json = ujson.dumps(messages_by_device["*"])
for row in txn.fetchall():
# Add the message for all devices for this user on this
# server.
device = row[0]
messages_json_for_user[device] = message_json
else:
sql = (
"SELECT device_id FROM devices"
" WHERE user_id = ? AND device_id IN ("
+ ",".join("?" * len(devices))
+ ")"
)
# TODO: Maybe this needs to be done in batches if there are
# too many local devices for a given user.
txn.execute(sql, [user_id] + devices)
for row in txn.fetchall():
# Only insert into the local inbox if the device exists on
# this server
device = row[0]
message_json = ujson.dumps(messages_by_device[device])
messages_json_for_user[device] = message_json

local_by_user_then_device[user_id] = messages_json_for_user

sql = (
"INSERT INTO device_inbox"
" (user_id, device_id, stream_id, message_json)"
" VALUES (?,?,?,?)"
)
rows = []
for user_id, messages_by_device in messages_by_user_then_device.items():
for device_id, message in messages_by_device.items():
message_json = ujson.dumps(message)
# Only insert into the local inbox if the device exists on
# this server
if (user_id, device_id) in local_users_and_devices:
rows.append((user_id, device_id, stream_id, message_json))
for user_id, messages_by_device in local_by_user_then_device.items():
for device_id, message_json in messages_by_device.items():
rows.append((user_id, device_id, stream_id, message_json))

txn.executemany(sql, rows)

Expand Down