Skip to content

Commit

Permalink
Accept /state requests in create_outlier_event (#1231)
Browse files Browse the repository at this point in the history
* Accept `/state` reqeusts in `create_outlier_event`

A follow-up to #1223: Another place that we can get `/state` requests.

* Register `/state` listeners earlier

Per comments on #1231: we ought to set up the listeners for `/state`
earlier. We also create a helper function to make it easier.
  • Loading branch information
richvdh authored Mar 31, 2022
1 parent 74c3cca commit 81c3c06
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 132 deletions.
111 changes: 83 additions & 28 deletions tests/50federation/00prepare.pl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,47 @@ sub create_federation_server
});
};


=head2 await_and_handle_request_state
$fut = await_and_handle_request_state(
$inbound_server, $event_id, [ $state_event, $state_event, ... ],
auth_chain => [ $auth_event, $auth_event, ... ],
);
Awaits an inbound request to `/_matrix/federation/v1/state/$room_id?event_id=$event_id`,
and, when it arrives, sends a response with the given state.
I<auth_chain> is optional; if omitted, the auth chain is calculated based on
the given state events.
=cut

sub await_and_handle_request_state {
my ( $inbound_server, $room, $event_id, $state_events, %args ) = @_;

my $auth_chain = $args{auth_chain} // [
map { $inbound_server->datastore->get_auth_chain_events( $room->id_for_event( $_ )) } @$state_events
];

$inbound_server->await_request_v1_state(
$room->room_id, $event_id,
)->then( sub {
my ( $req, @params ) = @_;
log_if_fail "/state request", \@params;

my $resp = {
pdus => $state_events,
auth_chain => $auth_chain,
};

log_if_fail "/state response", $resp;
$req->respond_json( $resp );
});
}
push @EXPORT, qw( await_and_handle_request_state );


=head2 send_and_await_event
send_and_await_event( $outbound_client, $room, $server_user, %fields ) -> then( sub {
Expand Down Expand Up @@ -232,6 +273,8 @@ sub send_and_await_outlier {

log_if_fail "create_outlier_event: events Q, R, S", [ $outlier_event_id_Q, $backfilled_event_id_R, $sent_event_id_S ];

my $state_req_fut;

Future->needs_all(
# send S
$outbound_client->send_event(
Expand All @@ -258,35 +301,47 @@ sub send_and_await_outlier {
log_if_fail "create_outlier_event: /get_missing_events response", $resp;
$req->respond_json( $resp );
Future->done(1);
}),

# there will still be a gap, so then we expect a state_ids request
$inbound_server->await_request_state_ids(
$room_id, $outlier_event_id_Q,
)->then( sub {
my ( $req, @params ) = @_;
log_if_fail "create_outlier_event: /state_ids request", \@params;

my $resp = {
pdu_ids => [
map { $room->id_for_event( $_ ) } values( %initial_room_state ),
],
auth_chain_ids => $room->event_ids_from_refs( $outlier_event_Q->{auth_events} ),
};

log_if_fail "create_outlier_event: /state_ids response", $resp;
$req->respond_json( $resp );
Future->done(1);
}),
)->then( sub {
# wait for S to turn up in /sync
await_sync_timeline_contains(
$receiving_user, $room_id, check => sub {
my ( $event ) = @_;
log_if_fail "create_outlier_event: Got event", $event;
my $event_id = $event->{event_id};
return $event_id eq $sent_event_id_S;
},

# there will still be a gap, so then we expect a state_ids request
$inbound_server->await_request_state_ids(
$room_id, $outlier_event_id_Q,
)->then( sub {
my ( $req, @params ) = @_;
log_if_fail "create_outlier_event: /state_ids request", \@params;

my $resp = {
pdu_ids => [
map { $room->id_for_event( $_ ) } values( %initial_room_state ),
],
auth_chain_ids => $room->event_ids_from_refs( $outlier_event_Q->{auth_events} ),
};

log_if_fail "create_outlier_event: /state_ids response", $resp;

# once we respond to `/state_ids`, the server may send a /state request;
# be prepared to answer that. (it may, alternatively, send individual
# /event requests)
$state_req_fut = await_and_handle_request_state(
$inbound_server, $room, $outlier_event_id_Q, [ values( %initial_room_state ) ]
);

$req->respond_json( $resp );
Future->done(1);
}),
)->then( sub {
# wait for either S to turn up in /sync, or $state_req_fut to fail.
Future->wait_any(
$state_req_fut->then( sub { Future->new() } ),

await_sync_timeline_contains(
$receiving_user, $room_id, check => sub {
my ( $event ) = @_;
log_if_fail "create_outlier_event: Got event", $event;
my $event_id = $event->{event_id};
return $event_id eq $sent_event_id_S;
},
),
);
})->then_done( $outlier_event_Q, $backfilled_event_R, $sent_event_S );
}
Expand Down
36 changes: 11 additions & 25 deletions tests/50federation/33room-get-missing-events.pl
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ sub sytest_user_and_room_fixture {

log_if_fail "events Q, R, S", [ $event_id_Q, $event_id_R, $event_id_S ];

my $state_req_fut;

Future->needs_all(
# send S
$outbound_client->send_event(
Expand Down Expand Up @@ -373,37 +375,21 @@ sub sytest_user_and_room_fixture {
};

log_if_fail "/state_ids response", $resp;

# once we respond to `/state_ids`, the server may send a /state request;
# be prepared to answer that. (it may, alternatively, send individual
# /event requests)
$state_req_fut = await_and_handle_request_state(
$inbound_server, $room2, $event_id_Q, [ values( %initial_room2_state ) ]
);

$req->respond_json( $resp );
Future->done(1);
}),
)->then( sub {
# the server may send a /state request; be prepared to answer that.
# (it may, alternatively, send individual /event requests)
my $state_req_fut = $inbound_server->await_request_v1_state(
$room2->{room_id}, $event_id_Q,
)->then( sub {
my ( $req, @params ) = @_;
log_if_fail "/state request", \@params;

my $resp = {
pdus => [ values( %initial_room2_state ) ],
auth_chain => [
map { $inbound_server->datastore->get_event( $_ ) } @{ $room2->event_ids_from_refs( $event_Q->{auth_events} ) },
],
};

log_if_fail "/state response", $resp;
$req->respond_json( $resp );

# return a future which never completes, so that wait_any is not
# satisfied.
return Future->new();
});


# wait for either S to turn up in /sync, or $state_req_fut to fail.
Future->wait_any(
$state_req_fut,
$state_req_fut->then( sub { Future->new() } ),

await_sync_timeline_contains(
$creator_user, $room2->room_id, check => sub {
Expand Down
40 changes: 11 additions & 29 deletions tests/50federation/34room-backfill.pl
Original file line number Diff line number Diff line change
Expand Up @@ -304,33 +304,7 @@

log_if_fail "events P, Q, R, S", [ $event_id_P, $event_id_Q, $event_id_R, $event_id_S ];

# the server may send a /state request; be prepared to answer that.
# (it may, alternatively, send individual /event requests)
my $state_req_fut = $inbound_server->await_request_v1_state(
$room2->{room_id}, $event_id_Q,
)->then( sub {
my ( $req, @params ) = @_;
log_if_fail "/state request (1)", \@params;

my %state = %{ $room2->{current_state} };
my $resp = {
pdus => [ values( %state ) ],

# XXX we're supposed to return the whole auth chain here,
# not just Q's auth_events. It doesn't matter too much
# here though.
auth_chain => [
map { $inbound_server->datastore->get_event( $_ ) } @{ $room2->event_ids_from_refs( $event_Q->{auth_events} ) },
],
};

log_if_fail "/state response (1)", $resp;
$req->respond_json( $resp );

# return a future which never completes, so that wait_any is not
# satisfied.
return Future->new();
});
my $state_req_fut;

Future->needs_all(
# kick things off by sending S over federation
Expand Down Expand Up @@ -381,6 +355,14 @@
};

log_if_fail "/state_ids response (1)", $resp;

# once we respond to /state_ids, the server may send a /state request;
# be prepared to answer that. (it may, alternatively, send individual
# /event requests)
$state_req_fut = await_and_handle_request_state(
$inbound_server, $room2, $event_id_Q, [ values( %state ) ]
);

$req->respond_json( $resp );
Future->done(1);
}),
Expand All @@ -395,14 +377,14 @@

# wait for either S to turn up in /sync, or $state_req_fut to fail.
Future->wait_any(
$state_req_fut,
$state_req_fut->then( sub { Future->new() } ),

await_sync_timeline_contains(
$creator_user, $room2_id,
check => sub {
$_[0]->{event_id} eq $event_id_S
},
),
),
);
})->then( sub {
my $filter = $json->encode( { room => { timeline => { limit => 2 }}} );
Expand Down
74 changes: 24 additions & 50 deletions tests/50federation/36state.pl
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ sub get_state_ids_from_server {
$room_state_at_y{$k} = $event;
}

my $state_req_fut;

Future->needs_all(
$outbound_client->send_event(
event => $sent_event_c,
Expand Down Expand Up @@ -603,38 +605,21 @@ sub get_state_ids_from_server {

log_if_fail "/state_ids response", $resp;

# once we respond to `/state_ids`, the server may send a /state request;
# be prepared to answer that. (it may, alternatively, send individual
# /event requests)
$state_req_fut = await_and_handle_request_state(
$inbound_server, $room, $missing_event_y->{event_id}, [ values( %room_state_at_y ) ]
);

$req->respond_json( $resp );

Future->done(1);
}),
)->then( sub {
# the server may send a /state request; be prepared to answer that.
# (it may, alternatively, send individual /event requests)
my $state_req_fut = $inbound_server->await_request_v1_state(
$room_id, $missing_event_y->{event_id},
)->then( sub {
my ( $req, @params ) = @_;
log_if_fail "/state request", \@params;

my $resp = {
pdus => [ values( %room_state_at_y ) ],
auth_chain => [
map { $inbound_server->datastore->get_auth_chain_events( $room->id_for_event( $_ )) }
values( %room_state_at_y )
],
};

log_if_fail "/state response", $resp;
$req->respond_json( $resp );

# return a future which never completes, so that wait_any is not
# satisfied.
return Future->new();
});

# creator user should eventually receive the events
# Wait for either the creator user to receive the events, or $state_req_fut to fail.
Future->wait_any(
$state_req_fut,
$state_req_fut->then( sub { Future->new() } ),

Future->needs_all(
await_sync_timeline_contains($creator, $room_id, check => sub {
Expand Down Expand Up @@ -803,6 +788,8 @@ sub get_state_ids_from_server {
", " . $missing_event_y->{event_id};
log_if_fail "Sent event C: " . $sent_event_c->{event_id};

my $state_req_fut;

Future->needs_all(
$outbound_client->send_event(
event => $sent_event_c,
Expand Down Expand Up @@ -835,45 +822,32 @@ sub get_state_ids_from_server {
)->then( sub {
my ( $req ) = @_;

my @state = values( %{ $room->{current_state} } );
my $resp = {
pdu_ids => [
map { $_->{event_id} } values( %{ $room->{current_state} } ),
map { $_->{event_id} } @state,
],
auth_chain_ids => [],
};

log_if_fail "/state_ids response", $resp;

# once we respond to `/state_ids`, the server may send a /state request;
# be prepared to answer that. (it may, alternatively, send individual
# /event requests)
$state_req_fut = await_and_handle_request_state(
$inbound_server, $room, $missing_event_y->{event_id}, \@state,
auth_chain => [],
);

$req->respond_json( $resp );

Future->done(1);
}),
)->then( sub {
# the server may send a /state request; be prepared to answer that.
# (it may, alternatively, send individual /event requests)
my $state_req_fut = $inbound_server->await_request_v1_state(
$room_id, $missing_event_y->{event_id},
)->then( sub {
my ( $req, @params ) = @_;
log_if_fail "/state request", \@params;

my $resp = {
pdus => [ values( %{ $room->{current_state} } ) ],
auth_chain => [],
};

log_if_fail "/state response", $resp;
$req->respond_json( $resp );

# return a future which never completes, so that wait_any is not
# satisfied.
return Future->new();
});


# creator user should eventually receive X and C.
Future->wait_any(
$state_req_fut,
$state_req_fut->then_done( Future->new() ),

Future->needs_all(
await_sync_timeline_contains( $creator, $room_id, check => sub {
Expand Down

0 comments on commit 81c3c06

Please sign in to comment.