Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumers

import (
"context"
"encoding/json"

"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/typingserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
"gopkg.in/Shopify/sarama.v1"
)

// OutputTypingEventConsumer consumes events that originate in typing server.
type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer
db *storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
}

// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers.
func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
c := &OutputTypingEventConsumer{
consumer: &consumer,
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
}
consumer.ProcessMessage = c.onMessage

return c
}

// Start consuming from typing servers
func (t *OutputTypingEventConsumer) Start() error {
return t.consumer.Start()
}

// onMessage is called for OutputTypingEvent received from the typing servers.
// Parses the msg, creates a matrix federation EDU and sends it to joined hosts.
func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("typingserver output log: message parse failed")
return nil
}

joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID)
if err != nil {
return err
}

names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}

edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
if edu.Content, err = json.Marshal(map[string]interface{}{
"room_id": ote.Event.RoomID,
"user_id": ote.Event.UserID,
"typing": ote.Event.Typing,
}); err != nil {
return err
}

return t.queues.SendEDU(edu, t.ServerName, names)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ func SetupFederationSenderComponent(

queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)

consumer := consumers.NewOutputRoomEventConsumer(
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,
federationSenderDB, queryAPI,
)
if err = consumer.Start(); err != nil {
if err = rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start room server consumer")
}

tsConsumer := consumers.NewOutputTypingEventConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ type destinationQueue struct {
origin gomatrixserverlib.ServerName
destination gomatrixserverlib.ServerName
// The running mutex protects running, sentCounter, lastTransactionIDs and
// pendingEvents.
// pendingEvents, pendingEDUs.
runningMutex sync.Mutex
running bool
sentCounter int
lastTransactionIDs []gomatrixserverlib.TransactionID
pendingEvents []*gomatrixserverlib.Event
pendingEDUs []*gomatrixserverlib.EDU
}

// Send event adds the event to the pending queue for the destination.
Expand All @@ -54,6 +55,19 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.Event) {
}
}

// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending event to that destination.
func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
oq.pendingEDUs = append(oq.pendingEDUs, e)
if !oq.running {
oq.running = true
go oq.backgroundSend()
}
}

func (oq *destinationQueue) backgroundSend() {
for {
t := oq.next()
Expand Down Expand Up @@ -82,10 +96,12 @@ func (oq *destinationQueue) backgroundSend() {
func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
oq.runningMutex.Lock()
defer oq.runningMutex.Unlock()
if len(oq.pendingEvents) == 0 {

if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 {
oq.running = false
return nil
}

var t gomatrixserverlib.Transaction
now := gomatrixserverlib.AsTimestamp(time.Now())
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.sentCounter))
Expand All @@ -96,11 +112,20 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction {
if t.PreviousIDs == nil {
t.PreviousIDs = []gomatrixserverlib.TransactionID{}
}

oq.lastTransactionIDs = []gomatrixserverlib.TransactionID{t.TransactionID}

for _, pdu := range oq.pendingEvents {
t.PDUs = append(t.PDUs, *pdu)
}
oq.pendingEvents = nil
oq.sentCounter += len(t.PDUs)

for _, edu := range oq.pendingEDUs {
t.EDUs = append(t.EDUs, *edu)
}
oq.pendingEDUs = nil
oq.sentCounter += len(t.EDUs)

return &t
}
46 changes: 42 additions & 4 deletions src/github.com/matrix-org/dendrite/federationsender/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ func (oqs *OutgoingQueues) SendEvent(
destinations []gomatrixserverlib.ServerName,
) error {
if origin != oqs.origin {
// TODO: Support virtual hosting by allowing us to send events using
// different origin server names.
// For now assume we are always asked to send as the single server configured
// in the dendrite config.
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q",
origin, oqs.origin,
Expand All @@ -76,8 +73,49 @@ func (oqs *OutgoingQueues) SendEvent(
}
oqs.queues[destination] = oq
}

oq.sendEvent(ev)
}

return nil
}

// SendEDU sends an EDU event to the destinations
func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
destinations []gomatrixserverlib.ServerName,
) error {
if origin != oqs.origin {
// TODO: Support virtual hosting; gh issue #577.
return fmt.Errorf(
"sendevent: unexpected server to send as: got %q expected %q",
origin, oqs.origin,
)
}

// Remove our own server from the list of destinations.
destinations = filterDestinations(oqs.origin, destinations)

log.WithFields(log.Fields{
"destinations": destinations, "edu_type": e.Type,
}).Info("Sending EDU event")

oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
for _, destination := range destinations {
oq := oqs.queues[destination]
if oq == nil {
oq = &destinationQueue{
origin: oqs.origin,
destination: destination,
client: oqs.client,
}
oqs.queues[destination] = oq
}

oq.sendEDU(e)
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,22 @@ func (s *joinedHostsStatements) deleteJoinedHosts(
return err
}

func (s *joinedHostsStatements) selectJoinedHosts(
func (s *joinedHostsStatements) selectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) {
stmt := common.TxStmt(txn, s.selectJoinedHostsStmt)
return joinedHostsFromStmt(ctx, stmt, roomID)
}

func (s *joinedHostsStatements) selectJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
}

func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) {
rows, err := stmt.QueryContext(ctx, roomID)
if err != nil {
return nil, err
Expand All @@ -118,5 +130,6 @@ func (s *joinedHostsStatements) selectJoinedHosts(
ServerName: gomatrixserverlib.ServerName(serverName),
})
}

return result, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *Database) UpdateRoom(
}
}

joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID)
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
Expand All @@ -110,3 +110,12 @@ func (d *Database) UpdateRoom(
})
return
}

// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}
16 changes: 8 additions & 8 deletions src/github.com/matrix-org/dendrite/typingserver/api/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
package api

// OutputTypingEvent is an entry in typing server output kafka log.
// This contains the event with extra fields used to create 'm.typing' event
// in clientapi & federation.
type OutputTypingEvent struct {
// The Event for the typing edu event.
Event TypingEvent `json:"event"`
// Users typing in the room when the event was generated.
TypingUsers []string `json:"typing_users"`
}

// TypingEvent represents a matrix edu event of type 'm.typing'.
type TypingEvent struct {
Type string `json:"type"`
RoomID string `json:"room_id"`
Content TypingEventContent `json:"content"`
}

// TypingEventContent for TypingEvent
type TypingEventContent struct {
UserIDs []string `json:"user_ids"`
Type string `json:"type"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Typing bool `json:"typing"`
}
23 changes: 14 additions & 9 deletions src/github.com/matrix-org/dendrite/typingserver/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,29 @@ func (t *TypingServerInputAPI) InputTypingEvent(
t.Cache.RemoveUser(ite.UserID, ite.RoomID)
}

return t.sendUpdateForRoom(ite.RoomID)
return t.sendEvent(ite)
}

func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error {
userIDs := t.Cache.GetTypingUsers(roomID)
event := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: roomID,
Content: api.TypingEventContent{UserIDs: userIDs},
func (t *TypingServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
userIDs := t.Cache.GetTypingUsers(ite.RoomID)
ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
RoomID: ite.RoomID,
UserID: ite.UserID,
}
eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event})
ote := &api.OutputTypingEvent{
Event: *ev,
TypingUsers: userIDs,
}

eventJSON, err := json.Marshal(ote)
if err != nil {
return err
}

m := &sarama.ProducerMessage{
Topic: string(t.OutputTypingEventTopic),
Key: sarama.StringEncoder(roomID),
Key: sarama.StringEncoder(ite.RoomID),
Value: sarama.ByteEncoder(eventJSON),
}

Expand Down