Skip to content

Commit c93d30a

Browse files
committed
bridgev2: add option to deduplicate Matrix messages by event or transaction ID
1 parent 72f6229 commit c93d30a

File tree

7 files changed

+69
-28
lines changed

7 files changed

+69
-28
lines changed

bridgev2/bridgeconfig/config.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,24 +58,25 @@ type CleanupOnLogouts struct {
5858
}
5959

6060
type BridgeConfig struct {
61-
CommandPrefix string `yaml:"command_prefix"`
62-
PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"`
63-
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
64-
AsyncEvents bool `yaml:"async_events"`
65-
SplitPortals bool `yaml:"split_portals"`
66-
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
67-
NoBridgeInfoStateKey bool `yaml:"no_bridge_info_state_key"`
68-
BridgeStatusNotices string `yaml:"bridge_status_notices"`
69-
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`
70-
BridgeNotices bool `yaml:"bridge_notices"`
71-
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
72-
OnlyBridgeTags []event.RoomTag `yaml:"only_bridge_tags"`
73-
MuteOnlyOnCreate bool `yaml:"mute_only_on_create"`
74-
OutgoingMessageReID bool `yaml:"outgoing_message_re_id"`
75-
CleanupOnLogout CleanupOnLogouts `yaml:"cleanup_on_logout"`
76-
Relay RelayConfig `yaml:"relay"`
77-
Permissions PermissionConfig `yaml:"permissions"`
78-
Backfill BackfillConfig `yaml:"backfill"`
61+
CommandPrefix string `yaml:"command_prefix"`
62+
PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"`
63+
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
64+
AsyncEvents bool `yaml:"async_events"`
65+
SplitPortals bool `yaml:"split_portals"`
66+
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
67+
NoBridgeInfoStateKey bool `yaml:"no_bridge_info_state_key"`
68+
BridgeStatusNotices string `yaml:"bridge_status_notices"`
69+
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`
70+
BridgeNotices bool `yaml:"bridge_notices"`
71+
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
72+
OnlyBridgeTags []event.RoomTag `yaml:"only_bridge_tags"`
73+
MuteOnlyOnCreate bool `yaml:"mute_only_on_create"`
74+
DeduplicateMatrixMessages bool `yaml:"deduplicate_matrix_messages"`
75+
OutgoingMessageReID bool `yaml:"outgoing_message_re_id"`
76+
CleanupOnLogout CleanupOnLogouts `yaml:"cleanup_on_logout"`
77+
Relay RelayConfig `yaml:"relay"`
78+
Permissions PermissionConfig `yaml:"permissions"`
79+
Backfill BackfillConfig `yaml:"backfill"`
7980
}
8081

8182
type MatrixConfig struct {

bridgev2/bridgeconfig/upgrade.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func doUpgrade(helper up.Helper) {
3737
helper.Copy(up.Bool, "bridge", "tag_only_on_create")
3838
helper.Copy(up.List, "bridge", "only_bridge_tags")
3939
helper.Copy(up.Bool, "bridge", "mute_only_on_create")
40+
helper.Copy(up.Bool, "bridge", "deduplicate_matrix_messages")
4041
helper.Copy(up.Bool, "bridge", "cleanup_on_logout", "enabled")
4142
helper.Copy(up.Str, "bridge", "cleanup_on_logout", "manual", "private")
4243
helper.Copy(up.Str, "bridge", "cleanup_on_logout", "manual", "relayed")

bridgev2/database/message.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,23 @@ type Message struct {
4343
ThreadRoot networkid.MessageID
4444
ReplyTo networkid.MessageOptionalPartID
4545

46+
SendTxnID networkid.RawTransactionID
47+
4648
Metadata any
4749
}
4850

4951
const (
5052
getMessageBaseQuery = `
5153
SELECT rowid, bridge_id, id, part_id, mxid, room_id, room_receiver, sender_id, sender_mxid,
52-
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, metadata
54+
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id,
55+
send_txn_id, metadata
5356
FROM message
5457
`
5558
getAllMessagePartsByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3`
5659
getMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 AND part_id=$4`
5760
getMessagePartByRowIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND rowid=$2`
5861
getMessageByMXIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND mxid=$2`
62+
getMessageByTxnIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND (mxid=$3 OR send_txn_id=$4)`
5963
getLastMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 ORDER BY part_id DESC LIMIT 1`
6064
getFirstMessagePartByIDQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3 ORDER BY part_id ASC LIMIT 1`
6165
getMessagesBetweenTimeQuery = getMessageBaseQuery + `WHERE bridge_id=$1 AND room_id=$2 AND room_receiver=$3 AND timestamp>$4 AND timestamp<=$5`
@@ -74,16 +78,17 @@ const (
7478
insertMessageQuery = `
7579
INSERT INTO message (
7680
bridge_id, id, part_id, mxid, room_id, room_receiver, sender_id, sender_mxid,
77-
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id, metadata
81+
timestamp, edit_count, double_puppeted, thread_root_id, reply_to_id, reply_to_part_id,
82+
send_txn_id, metadata
7883
)
79-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
84+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
8085
RETURNING rowid
8186
`
8287
updateMessageQuery = `
8388
UPDATE message SET id=$2, part_id=$3, mxid=$4, room_id=$5, room_receiver=$6, sender_id=$7, sender_mxid=$8,
8489
timestamp=$9, edit_count=$10, double_puppeted=$11, thread_root_id=$12, reply_to_id=$13,
85-
reply_to_part_id=$14, metadata=$15
86-
WHERE bridge_id=$1 AND rowid=$16
90+
reply_to_part_id=$14, send_txn_id=$15, metadata=$16
91+
WHERE bridge_id=$1 AND rowid=$17
8792
`
8893
deleteAllMessagePartsByIDQuery = `
8994
DELETE FROM message WHERE bridge_id=$1 AND (room_receiver=$2 OR room_receiver='') AND id=$3
@@ -105,6 +110,10 @@ func (mq *MessageQuery) GetPartByMXID(ctx context.Context, mxid id.EventID) (*Me
105110
return mq.QueryOne(ctx, getMessageByMXIDQuery, mq.BridgeID, mxid)
106111
}
107112

113+
func (mq *MessageQuery) GetPartByTxnID(ctx context.Context, receiver networkid.UserLoginID, mxid id.EventID, txnID networkid.RawTransactionID) (*Message, error) {
114+
return mq.QueryOne(ctx, getMessageByTxnIDQuery, mq.BridgeID, receiver, mxid, txnID)
115+
}
116+
108117
func (mq *MessageQuery) GetLastPartByID(ctx context.Context, receiver networkid.UserLoginID, id networkid.MessageID) (*Message, error) {
109118
return mq.QueryOne(ctx, getLastMessagePartByIDQuery, mq.BridgeID, receiver, id)
110119
}
@@ -178,11 +187,12 @@ func (mq *MessageQuery) CountMessagesInPortal(ctx context.Context, key networkid
178187

179188
func (m *Message) Scan(row dbutil.Scannable) (*Message, error) {
180189
var timestamp int64
181-
var threadRootID, replyToID, replyToPartID sql.NullString
190+
var threadRootID, replyToID, replyToPartID, sendTxnID sql.NullString
182191
var doublePuppeted sql.NullBool
183192
err := row.Scan(
184193
&m.RowID, &m.BridgeID, &m.ID, &m.PartID, &m.MXID, &m.Room.ID, &m.Room.Receiver, &m.SenderID, &m.SenderMXID,
185-
&timestamp, &m.EditCount, &doublePuppeted, &threadRootID, &replyToID, &replyToPartID, dbutil.JSON{Data: m.Metadata},
194+
&timestamp, &m.EditCount, &doublePuppeted, &threadRootID, &replyToID, &replyToPartID, &sendTxnID,
195+
dbutil.JSON{Data: m.Metadata},
186196
)
187197
if err != nil {
188198
return nil, err
@@ -196,6 +206,9 @@ func (m *Message) Scan(row dbutil.Scannable) (*Message, error) {
196206
m.ReplyTo.PartID = (*networkid.PartID)(&replyToPartID.String)
197207
}
198208
}
209+
if sendTxnID.Valid {
210+
m.SendTxnID = networkid.RawTransactionID(sendTxnID.String)
211+
}
199212
return m, nil
200213
}
201214

@@ -210,7 +223,8 @@ func (m *Message) sqlVariables() []any {
210223
return []any{
211224
m.BridgeID, m.ID, m.PartID, m.MXID, m.Room.ID, m.Room.Receiver, m.SenderID, m.SenderMXID,
212225
m.Timestamp.UnixNano(), m.EditCount, m.IsDoublePuppeted, dbutil.StrPtr(m.ThreadRoot),
213-
dbutil.StrPtr(m.ReplyTo.MessageID), m.ReplyTo.PartID, dbutil.JSON{Data: m.Metadata},
226+
dbutil.StrPtr(m.ReplyTo.MessageID), m.ReplyTo.PartID, dbutil.StrPtr(m.SendTxnID),
227+
dbutil.JSON{Data: m.Metadata},
214228
}
215229
}
216230

bridgev2/database/upgrades/00-latest.sql

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
-- v0 -> v21 (compatible with v9+): Latest revision
1+
-- v0 -> v22 (compatible with v9+): Latest revision
22
CREATE TABLE "user" (
33
bridge_id TEXT NOT NULL,
44
mxid TEXT NOT NULL,
@@ -108,6 +108,7 @@ CREATE TABLE message (
108108
thread_root_id TEXT,
109109
reply_to_id TEXT,
110110
reply_to_part_id TEXT,
111+
send_txn_id TEXT,
111112
metadata jsonb NOT NULL,
112113

113114
CONSTRAINT message_room_fkey FOREIGN KEY (bridge_id, room_id, room_receiver)
@@ -117,7 +118,8 @@ CREATE TABLE message (
117118
REFERENCES ghost (bridge_id, id)
118119
ON DELETE CASCADE ON UPDATE CASCADE,
119120
CONSTRAINT message_real_pkey UNIQUE (bridge_id, room_receiver, id, part_id),
120-
CONSTRAINT message_mxid_unique UNIQUE (bridge_id, mxid)
121+
CONSTRAINT message_mxid_unique UNIQUE (bridge_id, mxid),
122+
CONSTRAINT message_txn_id_unique UNIQUE (bridge_id, room_receiver, send_txn_id)
121123
);
122124
CREATE INDEX message_room_idx ON message (bridge_id, room_id, room_receiver);
123125

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- v22 (compatible with v9+): Add message send transaction ID column
2+
ALTER TABLE message ADD COLUMN send_txn_id TEXT;
3+
-- only: postgres
4+
ALTER TABLE message ADD CONSTRAINT message_txn_id_unique UNIQUE (bridge_id, room_receiver, send_txn_id);
5+
-- only: sqlite
6+
CREATE UNIQUE INDEX message_txn_id_unique ON message (bridge_id, room_receiver, send_txn_id);

bridgev2/matrix/mxmain/example-config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ bridge:
3838
# Should room mute status only be synced when creating the portal?
3939
# Like tags, mutes can't currently be synced back to the remote network.
4040
mute_only_on_create: true
41+
# Should the bridge check the db to ensure that incoming events haven't been handled before
42+
deduplicate_matrix_messages: false
4143

4244
# What should be done to portal rooms when a user logs out or is logged out?
4345
# Permitted values:

bridgev2/portal.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,18 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *UserLogin
951951
ThreadRoot: threadRoot,
952952
ReplyTo: replyTo,
953953
}
954+
if portal.Bridge.Config.DeduplicateMatrixMessages {
955+
if part, err := portal.Bridge.DB.Message.GetPartByTxnID(ctx, portal.Receiver, evt.ID, wrappedMsgEvt.InputTransactionID); err != nil {
956+
log.Err(err).Msg("Failed to check db if message is already sent")
957+
} else if part != nil {
958+
log.Debug().
959+
Stringer("message_mxid", part.MXID).
960+
Stringer("input_event_id", evt.ID).
961+
Msg("Message already sent, ignoring")
962+
return
963+
}
964+
}
965+
954966
var resp *MatrixMessageResponse
955967
if msgContent != nil {
956968
resp, err = sender.Client.HandleMatrixMessage(ctx, wrappedMsgEvt)
@@ -1091,6 +1103,9 @@ func (evt *MatrixMessage) fillDBMessage(message *database.Message) *database.Mes
10911103
if message.SenderMXID == "" {
10921104
message.SenderMXID = evt.Event.Sender
10931105
}
1106+
if message.SendTxnID != "" {
1107+
message.SendTxnID = evt.InputTransactionID
1108+
}
10941109
return message
10951110
}
10961111

0 commit comments

Comments
 (0)