Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test to check timeline ordering semantics #699

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 153 additions & 1 deletion tests/federation_room_send_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package tests

import (
"encoding/json"
"fmt"
"net/url"
"testing"
"time"

"github.com/matrix-org/complement"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"

"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/federation"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/match"
"github.com/matrix-org/complement/must"
"github.com/matrix-org/complement/runtime"
)

// TODO:
Expand Down Expand Up @@ -68,3 +77,146 @@ func TestOutboundFederationSend(t *testing.T) {
// the remote homeserver then waits for the desired event to appear in a transaction
waiter.Wait(t, 5*time.Second)
}

// Test ordering behaviour between /sync and /messages, when unexpected earlier federation events are injected.
// This aims to ensure that clients cannot miss events in the following case:
//
// consider two homeservers in the same room starting at some event 0. Then:
// - network partition the servers
// - HS1 sends 1, HS2 sends 1'
// - a local user on HS1 sends 2,3,4
// - network partition is fixed. HS2 sends 1' to HS1.
// - a local user on HS1 sends 5,6,7
// - At this point, from HS1's pov, the streaming ordering is [1,2,3,4,1',5,6,7] and the topological order is [1 | 1', 2,3,4,5,6,7]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// - At this point, from HS1's pov, the streaming ordering is [1,2,3,4,1',5,6,7] and the topological order is [1 | 1', 2,3,4,5,6,7]
// - At this point, from HS1's pov, the streaming ordering is [1,2,3,4,1',5,6,7] and the topological order is [1 | 1', 2,3,4,5,6,7]

That is Synapse's choice of topological ordering. Valid ones are basically [(1, 2, 3, 4) | 1', 5, 6, 7] (i.e. 1' could appear anywhere before/after/between 1, 2, 3, 4).

// - client requests timeline limit = 4 => [1',5,6,7] is returned because /sync is stream ordering. Everything is fine so far.
// - using the prev_batch token here in /messages SHOULD return 4,3,2,1, to ensure clients cannot missing 4,3,2.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And potentially 1', depending, e.g. if you do /messages?limit=1 repeatedly 1' would turn up again currently.

// It may actually decide to filter topologically and just return 0, which would be incorrect.
func TestNetworkPartitionOrdering(t *testing.T) {
runtime.SkipIf(t, runtime.Dendrite) // 500s trying to /backfill
deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)

// create a remote homeserver
srv := federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandleMakeSendJoinRequests(),
federation.HandleTransactionRequests(nil, nil),
)
srv.UnexpectedRequestsAreErrors = false // might get backfill requests
cancel := srv.Listen()
defer cancel()

alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
bob := deployment.Register(t, "hs1", helpers.RegistrationOpts{})

// the remote homeserver creates a public room
ver := alice.GetDefaultRoomVersion(t)
charlie := srv.UserID("charlie")
serverRoom := srv.MustMakeRoom(t, ver, federation.InitialRoomEvents(ver, charlie))
roomAlias := srv.MakeAliasMapping("flibble", serverRoom.RoomID)

// the local homeserver joins the room
alice.MustJoinRoom(t, roomAlias, []string{deployment.GetConfig().HostnameRunningComplement})
bob.MustJoinRoom(t, roomAlias, []string{deployment.GetConfig().HostnameRunningComplement})
// bob requests the last 4 timeline events. We don't care about it right now but do want the since token
_, bobSince := bob.MustSync(t, client.SyncReq{
Filter: `{"room":{"timeline":{"limit":4}}}`,
})

// create 1' on the remote homeserver but don't send it
event1prime := srv.MustCreateEvent(t, serverRoom, federation.Event{
Sender: charlie,
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "Event 1'",
},
})
serverRoom.AddEvent(event1prime)

// send 1,2,3,4 on the local homeserver
var eventIDs []string
for i := 0; i < 4; i++ {
eventIDs = append(eventIDs, alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": fmt.Sprintf("event %d", i+1),
},
}))
}

// remote homeserver now injects event 1'
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event1prime.JSON()}, nil)

// ensure it gets there
alice.MustSyncUntil(t, client.SyncReq{TimeoutMillis: "1000"}, client.SyncTimelineHasEventID(serverRoom.RoomID, event1prime.EventID()))

// send 5,6,7
for i := 5; i <= 7; i++ {
eventIDs = append(eventIDs, alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": fmt.Sprintf("event %d", i),
},
}))
}
t.Logf("events 1,2,3,4,5,6,7 = %v", eventIDs)

// now bob requests timeline limit 4 in the room, should see 1',5,6,7
res, _ := bob.MustSync(t, client.SyncReq{
Filter: `{"room":{"timeline":{"limit":4}}}`,
Since: bobSince,
})
wantEventIDs := append([]string{
event1prime.EventID()}, eventIDs[len(eventIDs)-3:]...,
)
i := 0
timeline := res.Get(fmt.Sprintf("rooms.join.%s.timeline", client.GjsonEscape(serverRoom.RoomID)))
must.MatchGJSON(t, timeline,
match.JSONKeyArrayOfSize("events", 4),
match.JSONArrayEach("events", func(r gjson.Result) error {
wantEventID := wantEventIDs[i]
gotEventID := r.Get("event_id").Str
if gotEventID != wantEventID {
return fmt.Errorf("got event id %v want %v", gotEventID, wantEventID)
}
if gotEventID == "" {
return fmt.Errorf("missing event ID")
}
i++
return nil
}),
)

// now use the prev_batch token in /messages, we should see 4,3,2,1
prevBatch := timeline.Get("prev_batch").Str
must.NotEqual(t, prevBatch, "", "missing prev_batch")
queryParams := url.Values{}
queryParams.Set("dir", "b")
queryParams.Set("limit", "4")
queryParams.Set("from", prevBatch)
wantEventIDs = eventIDs[0:4]
slices.Reverse(wantEventIDs)
t.Logf("want scrollback events %v", wantEventIDs)
i = 0
scrollbackRes := alice.MustDo(t, "GET", []string{"_matrix", "client", "v3", "rooms", serverRoom.RoomID, "messages"}, client.WithQueries(queryParams))
must.MatchResponse(t, scrollbackRes, match.HTTPResponse{
JSON: []match.JSON{
match.JSONKeyArrayOfSize("chunk", 4),
match.JSONArrayEach("chunk", func(j gjson.Result) error {
wantEventID := wantEventIDs[i]
gotEventID := j.Get("event_id").Str
if gotEventID != wantEventID {
return fmt.Errorf("got event id %v want %v", gotEventID, wantEventID)
}
if gotEventID == "" {
return fmt.Errorf("missing event ID")
}
i++
return nil
}),
},
})
}
Loading