Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.

Commit 5d52863

Browse files
authored
[Federation] Send typing events (#572)
* GetJoinedHosts from federation server db * Add dummy api.OutputTypingEvent * Add a typing server consumer to federation sender * Update queue to support EDU events * Update OutputTypingEvent format * Use SendEDU in federation server, remove dummy/api * Add helpful comments * fix typo * remove origin field * Count EDUs in sendCounter
1 parent bab4d14 commit 5d52863

File tree

8 files changed

+220
-27
lines changed

8 files changed

+220
-27
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
package consumers
14+
15+
import (
16+
"context"
17+
"encoding/json"
18+
19+
"github.com/matrix-org/dendrite/common"
20+
"github.com/matrix-org/dendrite/common/config"
21+
"github.com/matrix-org/dendrite/federationsender/queue"
22+
"github.com/matrix-org/dendrite/federationsender/storage"
23+
"github.com/matrix-org/dendrite/typingserver/api"
24+
"github.com/matrix-org/gomatrixserverlib"
25+
log "github.com/sirupsen/logrus"
26+
"gopkg.in/Shopify/sarama.v1"
27+
)
28+
29+
// OutputTypingEventConsumer consumes events that originate in typing server.
30+
type OutputTypingEventConsumer struct {
31+
consumer *common.ContinualConsumer
32+
db *storage.Database
33+
queues *queue.OutgoingQueues
34+
ServerName gomatrixserverlib.ServerName
35+
}
36+
37+
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers.
38+
func NewOutputTypingEventConsumer(
39+
cfg *config.Dendrite,
40+
kafkaConsumer sarama.Consumer,
41+
queues *queue.OutgoingQueues,
42+
store *storage.Database,
43+
) *OutputTypingEventConsumer {
44+
consumer := common.ContinualConsumer{
45+
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
46+
Consumer: kafkaConsumer,
47+
PartitionStore: store,
48+
}
49+
c := &OutputTypingEventConsumer{
50+
consumer: &consumer,
51+
queues: queues,
52+
db: store,
53+
ServerName: cfg.Matrix.ServerName,
54+
}
55+
consumer.ProcessMessage = c.onMessage
56+
57+
return c
58+
}
59+
60+
// Start consuming from typing servers
61+
func (t *OutputTypingEventConsumer) Start() error {
62+
return t.consumer.Start()
63+
}
64+
65+
// onMessage is called for OutputTypingEvent received from the typing servers.
66+
// Parses the msg, creates a matrix federation EDU and sends it to joined hosts.
67+
func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
68+
// Extract the typing event from msg.
69+
var ote api.OutputTypingEvent
70+
if err := json.Unmarshal(msg.Value, &ote); err != nil {
71+
// Skip this msg but continue processing messages.
72+
log.WithError(err).Errorf("typingserver output log: message parse failed")
73+
return nil
74+
}
75+
76+
joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID)
77+
if err != nil {
78+
return err
79+
}
80+
81+
names := make([]gomatrixserverlib.ServerName, len(joined))
82+
for i := range joined {
83+
names[i] = joined[i].ServerName
84+
}
85+
86+
edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
87+
if edu.Content, err = json.Marshal(map[string]interface{}{
88+
"room_id": ote.Event.RoomID,
89+
"user_id": ote.Event.UserID,
90+
"typing": ote.Event.Typing,
91+
}); err != nil {
92+
return err
93+
}
94+
95+
return t.queues.SendEDU(edu, t.ServerName, names)
96+
}

src/github.com/matrix-org/dendrite/federationsender/federationsender.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,18 @@ func SetupFederationSenderComponent(
3838

3939
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)
4040

41-
consumer := consumers.NewOutputRoomEventConsumer(
41+
rsConsumer := consumers.NewOutputRoomEventConsumer(
4242
base.Cfg, base.KafkaConsumer, queues,
4343
federationSenderDB, queryAPI,
4444
)
45-
if err = consumer.Start(); err != nil {
45+
if err = rsConsumer.Start(); err != nil {
4646
logrus.WithError(err).Panic("failed to start room server consumer")
4747
}
48+
49+
tsConsumer := consumers.NewOutputTypingEventConsumer(
50+
base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
51+
)
52+
if err := tsConsumer.Start(); err != nil {
53+
logrus.WithError(err).Panic("failed to start typing server consumer")
54+
}
4855
}

src/github.com/matrix-org/dendrite/federationsender/queue/destinationqueue.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ type destinationQueue struct {
3333
origin gomatrixserverlib.ServerName
3434
destination gomatrixserverlib.ServerName
3535
// The running mutex protects running, sentCounter, lastTransactionIDs and
36-
// pendingEvents.
36+
// pendingEvents, pendingEDUs.
3737
runningMutex sync.Mutex
3838
running bool
3939
sentCounter int
4040
lastTransactionIDs []gomatrixserverlib.TransactionID
4141
pendingEvents []*gomatrixserverlib.Event
42+
pendingEDUs []*gomatrixserverlib.EDU
4243
}
4344

4445
// Send event adds the event to the pending queue for the destination.
@@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
5455
}
5556
}
5657

58+
// sendEDU adds the EDU event to the pending queue for the destination.
59+
// If the queue is empty then it starts a background goroutine to
60+
// start sending event to that destination.
61+
func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
62+
oq.runningMutex.Lock()
63+
defer oq.runningMutex.Unlock()
64+
oq.pendingEDUs = append(oq.pendingEDUs, e)
65+
if !oq.running {
66+
oq.running = true
67+
go oq.backgroundSend()
68+
}
69+
}
70+
5771
func (oq *destinationQueue) backgroundSend() {
5872
for {
5973
t := oq.next()
@@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() {
8296
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
8397
oq.runningMutex.Lock()
8498
defer oq.runningMutex.Unlock()
85-
if len(oq.pendingEvents) == 0 {
99+
100+
if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
86101
oq.running = false
87102
return nil
88103
}
104+
89105
var t gomatrixserverlib.Transaction
90106
now := gomatrixserverlib.AsTimestamp(time.Now())
91107
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter))
@@ -96,11 +112,20 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
96112
if t.PreviousIDs == nil {
97113
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
98114
}
115+
99116
oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}
117+
100118
for _, pdu := range oq.pendingEvents {
101119
t.PDUs = append(t.PDUs, *pdu)
102120
}
103121
oq.pendingEvents = nil
104122
oq.sentCounter += len(t.PDUs)
123+
124+
for _, edu := range oq.pendingEDUs {
125+
t.EDUs = append(t.EDUs, *edu)
126+
}
127+
oq.pendingEDUs = nil
128+
oq.sentCounter += len(t.EDUs)
129+
105130
return &t
106131
}

src/github.com/matrix-org/dendrite/federationsender/queue/queue.go

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent(
4747
destinations []gomatrixserverlib.ServerName,
4848
) error {
4949
if origin != oqs.origin {
50-
// TODO: Support virtual hosting by allowing us to send events using
51-
// different origin server names.
52-
// For now assume we are always asked to send as the single server configured
53-
// in the dendrite config.
50+
// TODO: Support virtual hosting; gh issue #577.
5451
return fmt.Errorf(
5552
"sendevent: unexpected server to send as: got %q expected %q",
5653
origin, oqs.origin,
@@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent(
7673
}
7774
oqs.queues[destination] = oq
7875
}
76+
7977
oq.sendEvent(ev)
8078
}
79+
80+
return nil
81+
}
82+
83+
// SendEDU sends an EDU event to the destinations
84+
func (oqs *OutgoingQueues) SendEDU(
85+
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
86+
destinations []gomatrixserverlib.ServerName,
87+
) error {
88+
if origin != oqs.origin {
89+
// TODO: Support virtual hosting; gh issue #577.
90+
return fmt.Errorf(
91+
"sendevent: unexpected server to send as: got %q expected %q",
92+
origin, oqs.origin,
93+
)
94+
}
95+
96+
// Remove our own server from the list of destinations.
97+
destinations = filterDestinations(oqs.origin, destinations)
98+
99+
log.WithFields(log.Fields{
100+
"destinations": destinations, "edu_type": e.Type,
101+
}).Info("Sending EDU event")
102+
103+
oqs.queuesMutex.Lock()
104+
defer oqs.queuesMutex.Unlock()
105+
for _, destination := range destinations {
106+
oq := oqs.queues[destination]
107+
if oq == nil {
108+
oq = &destinationQueue{
109+
origin: oqs.origin,
110+
destination: destination,
111+
client: oqs.client,
112+
}
113+
oqs.queues[destination] = oq
114+
}
115+
116+
oq.sendEDU(e)
117+
}
118+
81119
return nil
82120
}
83121

src/github.com/matrix-org/dendrite/federationsender/storage/joined_hosts_table.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,22 @@ func (s *joinedHostsStatements) deleteJoinedHosts(
9797
return err
9898
}
9999

100-
func (s *joinedHostsStatements) selectJoinedHosts(
100+
func (s *joinedHostsStatements) selectJoinedHostsWithTx(
101101
ctx context.Context, txn *sql.Tx, roomID string,
102102
) ([]types.JoinedHost, error) {
103103
stmt := common.TxStmt(txn, s.selectJoinedHostsStmt)
104+
return joinedHostsFromStmt(ctx, stmt, roomID)
105+
}
106+
107+
func (s *joinedHostsStatements) selectJoinedHosts(
108+
ctx context.Context, roomID string,
109+
) ([]types.JoinedHost, error) {
110+
return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
111+
}
112+
113+
func joinedHostsFromStmt(
114+
ctx context.Context, stmt *sql.Stmt, roomID string,
115+
) ([]types.JoinedHost, error) {
104116
rows, err := stmt.QueryContext(ctx, roomID)
105117
if err != nil {
106118
return nil, err
@@ -118,5 +130,6 @@ func (s *joinedHostsStatements) selectJoinedHosts(
118130
ServerName: gomatrixserverlib.ServerName(serverName),
119131
})
120132
}
133+
121134
return result, nil
122135
}

src/github.com/matrix-org/dendrite/federationsender/storage/storage.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (d *Database) UpdateRoom(
9292
}
9393
}
9494

95-
joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID)
95+
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
9696
if err != nil {
9797
return err
9898
}
@@ -110,3 +110,12 @@ func (d *Database) UpdateRoom(
110110
})
111111
return
112112
}
113+
114+
// GetJoinedHosts returns the currently joined hosts for room,
115+
// as known to federationserver.
116+
// Returns an error if something goes wrong.
117+
func (d *Database) GetJoinedHosts(
118+
ctx context.Context, roomID string,
119+
) ([]types.JoinedHost, error) {
120+
return d.selectJoinedHosts(ctx, roomID)
121+
}

src/github.com/matrix-org/dendrite/typingserver/api/output.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@
1313
package api
1414

1515
// OutputTypingEvent is an entry in typing server output kafka log.
16+
// This contains the event with extra fields used to create 'm.typing' event
17+
// in clientapi & federation.
1618
type OutputTypingEvent struct {
1719
// The Event for the typing edu event.
1820
Event TypingEvent `json:"event"`
21+
// Users typing in the room when the event was generated.
22+
TypingUsers []string `json:"typing_users"`
1923
}
2024

2125
// TypingEvent represents a matrix edu event of type 'm.typing'.
2226
type TypingEvent struct {
23-
Type string `json:"type"`
24-
RoomID string `json:"room_id"`
25-
Content TypingEventContent `json:"content"`
26-
}
27-
28-
// TypingEventContent for TypingEvent
29-
type TypingEventContent struct {
30-
UserIDs []string `json:"user_ids"`
27+
Type string `json:"type"`
28+
RoomID string `json:"room_id"`
29+
UserID string `json:"user_id"`
30+
Typing bool `json:"typing"`
3131
}

src/github.com/matrix-org/dendrite/typingserver/input/input.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,24 +53,29 @@ func (t *TypingServerInputAPI) InputTypingEvent(
5353
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
5454
}
5555

56-
return t.sendUpdateForRoom(ite.RoomID)
56+
return t.sendEvent(ite)
5757
}
5858

59-
func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error {
60-
userIDs := t.Cache.GetTypingUsers(roomID)
61-
event := &api.TypingEvent{
62-
Type: gomatrixserverlib.MTyping,
63-
RoomID: roomID,
64-
Content: api.TypingEventContent{UserIDs: userIDs},
59+
func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
60+
userIDs := t.Cache.GetTypingUsers(ite.RoomID)
61+
ev := &api.TypingEvent{
62+
Type: gomatrixserverlib.MTyping,
63+
RoomID: ite.RoomID,
64+
UserID: ite.UserID,
6565
}
66-
eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event})
66+
ote := &api.OutputTypingEvent{
67+
Event: *ev,
68+
TypingUsers: userIDs,
69+
}
70+
71+
eventJSON, err := json.Marshal(ote)
6772
if err != nil {
6873
return err
6974
}
7075

7176
m := &sarama.ProducerMessage{
7277
Topic: string(t.OutputTypingEventTopic),
73-
Key: sarama.StringEncoder(roomID),
78+
Key: sarama.StringEncoder(ite.RoomID),
7479
Value: sarama.ByteEncoder(eventJSON),
7580
}
7681

0 commit comments

Comments
 (0)