Skip to content

Commit 0433d9f

Browse files
committed
fix: resolve presence re-entry on channel re-attach
- Ensure presence update messages are correctly sent. - Add logging for presence updates in `Presence`, improving debuggability. - Update presence re-entry tests.
1 parent 96fbe1b commit 0433d9f

File tree

3 files changed

+98
-112
lines changed

3 files changed

+98
-112
lines changed

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,13 +461,13 @@ private void setAttached(ProtocolMessage message) {
461461
if(state == ChannelState.attached) {
462462
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
463463
if (!message.hasFlag(Flag.resumed)) { // RTL12
464-
presence.onAttached(message.hasFlag(Flag.has_presence));
465464
emitUpdate(message.error, false);
465+
presence.onAttached(message.hasFlag(Flag.has_presence));
466466
}
467467
}
468468
else {
469-
presence.onAttached(message.hasFlag(Flag.has_presence));
470469
setState(ChannelState.attached, message.error, message.hasFlag(Flag.resumed));
470+
presence.onAttached(message.hasFlag(Flag.has_presence));
471471
}
472472
}
473473

lib/src/main/java/io/ably/lib/realtime/Presence.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,9 +755,11 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
755755
case initialized:
756756
channel.attach();
757757
case attaching:
758+
Log.v(TAG, "updatePresence(); put message in pending presence queue");
758759
pendingPresence.add(new QueuedPresence(msg, listener));
759760
break;
760761
case attached:
762+
Log.v(TAG, "updatePresence(); send message to connection manager");
761763
ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name);
762764
message.presence = new PresenceMessage[] { msg };
763765
ConnectionManager connectionManager = ably.connection.connectionManager;

lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java

Lines changed: 94 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.UUID;
27+
import java.util.function.Predicate;
2728

2829
import com.google.gson.JsonArray;
2930
import com.google.gson.JsonElement;
@@ -2250,135 +2251,118 @@ public void realtime_presence_get_throws_when_channel_failed() throws AblyExcept
22502251
*
22512252
* Tests RTP17, RTP19, RTP19a, RTP5f, RTP6b
22522253
*/
2253-
@Ignore("FIXME: fix exception")
22542254
@Test
2255-
public void realtime_presence_suspended_reenter() throws AblyException {
2256-
AblyRealtime ably = null;
2257-
try {
2258-
MockWebsocketFactory mockTransport = new MockWebsocketFactory();
2259-
DebugOptions opts = new DebugOptions(testVars.keys[0].keyStr);
2260-
fillInOptions(opts);
2261-
opts.transportFactory = mockTransport;
2255+
public void realtime_presence_suspended_reenter() throws Exception {
2256+
MockWebsocketFactory mockTransport = new MockWebsocketFactory();
2257+
DebugOptions opts = new DebugOptions(testVars.keys[0].keyStr);
2258+
fillInOptions(opts);
2259+
opts.transportFactory = mockTransport;
2260+
final String channelName = "presence_suspended_reenter" + testParams.name;
2261+
mockTransport.allowSend();
22622262

2263-
for (int i=0; i<2; i++) {
2264-
final String channelName = "presence_suspended_reenter" + testParams.name + i;
2263+
try (AblyRealtime ably = new AblyRealtime(opts)) {
22652264

2266-
mockTransport.allowSend();
2265+
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);
2266+
connectionWaiter.waitFor(ConnectionState.connected);
22672267

2268-
ably = new AblyRealtime(opts);
2268+
final Channel channel = ably.channels.get(channelName);
2269+
channel.attach();
2270+
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
22692271

2270-
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);
2271-
connectionWaiter.waitFor(ConnectionState.connected);
2272+
channelWaiter.waitFor(ChannelState.attached);
22722273

2273-
final Channel channel = ably.channels.get(channelName);
2274-
channel.attach();
2275-
ChannelWaiter channelWaiter = new ChannelWaiter(channel);
2274+
final String presenceData = "PRESENCE_DATA";
2275+
final String connId = ably.connection.id;
22762276

2277-
channelWaiter.waitFor(ChannelState.attached);
2277+
/*
2278+
* On the first run to test RTP19a we don't enter client1 so the server on
2279+
* return from suspend sees no presence data and sends ATTACHED without HAS_PRESENCE
2280+
* The client then should remove all the members from the presence map and then
2281+
* re-enter client2. On the second loop run we enter client1 and receive ATTACHED with
2282+
* HAS_PRESENCE
2283+
*/
2284+
final boolean[] wrongPresenceEmitted = new boolean[] {false};
2285+
final ArrayList<PresenceMessage> leaveMessages = new ArrayList<>();
2286+
/* Subscribe for message type, test RTP6b */
2287+
channel.presence.subscribe(Action.leave, new Presence.PresenceListener() {
2288+
@Override
2289+
public void onPresenceMessage(PresenceMessage message) {
2290+
leaveMessages.add(message);
2291+
}
2292+
});
22782293

2279-
final String presenceData = "PRESENCE_DATA";
2280-
final String connId = ably.connection.id;
2294+
/*
2295+
* We put testClientId2 presence data into the client library presence map but we
2296+
* don't send it to the server
2297+
*/
22812298

2282-
/*
2283-
* On the first run to test RTP19a we don't enter client1 so the server on
2284-
* return from suspend sees no presence data and sends ATTACHED without HAS_PRESENCE
2285-
* The client then should remove all the members from the presence map and then
2286-
* re-enter client2. On the second loop run we enter client1 and receive ATTACHED with
2287-
* HAS_PRESENCE
2288-
*/
2289-
final boolean[] wrongPresenceEmitted = new boolean[] {false};
2290-
if (i == 1) {
2291-
CompletionWaiter completionWaiter = new CompletionWaiter();
2292-
channel.presence.enterClient(testClientId1, presenceData, completionWaiter);
2293-
completionWaiter.waitFor();
2299+
channel.presence.enterClient(testClientId1, presenceData);
2300+
2301+
ProtocolMessage msg = new ProtocolMessage();
2302+
msg.connectionId = "randomConnectionId";
2303+
msg.action = ProtocolMessage.Action.sync;
2304+
msg.channel = channelName;
2305+
msg.presence = new PresenceMessage[]{
2306+
new PresenceMessage() {{
2307+
action = Action.present;
2308+
id = String.format("%s:0:0", "randomConnectionId");
2309+
timestamp = System.currentTimeMillis();
2310+
clientId = testClientId2;
2311+
connectionId = "randomConnectionId";
2312+
data = presenceData;
2313+
}}
2314+
};
2315+
ably.connection.connectionManager.onMessage(null, msg);
22942316

2295-
// RTP5f: after this point there should be no presence event for client1
2296-
channel.presence.subscribe(new Presence.PresenceListener() {
2297-
@Override
2298-
public void onPresenceMessage(PresenceMessage message) {
2299-
if (message.clientId.equals(testClientId1))
2300-
wrongPresenceEmitted[0] = true;
2301-
}
2302-
});
2303-
}
2317+
ably.connection.connectionManager.requestState(ConnectionState.suspended);
2318+
channelWaiter.waitFor(ChannelState.suspended);
23042319

2305-
final ArrayList<PresenceMessage> leaveMessages = new ArrayList<>();
2306-
/* Subscribe for message type, test RTP6b */
2307-
channel.presence.subscribe(Action.leave, new Presence.PresenceListener() {
2308-
@Override
2309-
public void onPresenceMessage(PresenceMessage message) {
2310-
leaveMessages.add(message);
2311-
}
2312-
});
2313-
2314-
/*
2315-
* We put testClientId2 presence data into the client library presence map but we
2316-
* don't send it to the server
2317-
*/
2318-
2319-
mockTransport.blockSend();
2320-
channel.presence.enterClient(testClientId2, presenceData);
2321-
2322-
ProtocolMessage msg = new ProtocolMessage();
2323-
msg.connectionId = connId;
2324-
msg.action = ProtocolMessage.Action.sync;
2325-
msg.channel = channelName;
2326-
msg.presence = new PresenceMessage[]{
2327-
new PresenceMessage() {{
2328-
action = Action.present;
2329-
id = String.format("%s:0:0", connId);
2330-
timestamp = System.currentTimeMillis();
2331-
clientId = testClientId2;
2332-
connectionId = connId;
2333-
data = presenceData;
2334-
}}
2335-
};
2336-
ably.connection.connectionManager.onMessage(null, msg);
2337-
2338-
mockTransport.allowSend();
2339-
2340-
ably.connection.connectionManager.requestState(ConnectionState.suspended);
2341-
channelWaiter.waitFor(ChannelState.suspended);
2342-
2343-
/*
2344-
* When restoring from suspended state server will send sync message erasing
2345-
* testClientId2 record from the presence map. Client should re-send presence message
2346-
* for testClientId2 and restore its presence data.
2347-
*/
2348-
2349-
ably.connection.connectionManager.requestState(ConnectionState.connected);
2350-
channelWaiter.waitFor(ChannelState.attached);
2351-
long reconnectTimestamp = System.currentTimeMillis();
2352-
2353-
try {
2354-
Thread.sleep(500);
2355-
} catch (InterruptedException e) {
2320+
/*
2321+
* When restoring from suspended state server will send sync message erasing
2322+
* testClientId2 record from the presence map. Client should re-send presence message
2323+
* for testClientId2 and restore its presence data.
2324+
*/
2325+
2326+
ably.connection.connectionManager.requestState(ConnectionState.connected);
2327+
channelWaiter.waitFor(ChannelState.attached);
2328+
long reconnectTimestamp = System.currentTimeMillis();
2329+
2330+
PaginatedResult<PresenceMessage> presentMembers;
2331+
2332+
try (AblyRest restClient = new AblyRest(opts)) {
2333+
long timeout = 10_000;
2334+
presentMembers = restClient.channels.get(channelName).presence.get(null);
2335+
while (presentMembers.items().length != 1 && System.currentTimeMillis() - reconnectTimestamp < timeout) {
2336+
Thread.sleep(250);
2337+
presentMembers = restClient.channels.get(channelName).presence.get(null);
23562338
}
2339+
}
23572340

2358-
AblyRest ablyRest = new AblyRest(opts);
2359-
io.ably.lib.rest.Channel restChannel = ablyRest.channels.get(channelName);
2360-
assertEquals("Verify presence data is received by the server",
2361-
restChannel.presence.get(null).items().length, i==0 ? 1 : 2);
2341+
assertEquals("Verify presence data is received by the server",
2342+
1, presentMembers.items().length);
23622343

2363-
/* In both cases we should have one leave message in the leaveMessages */
2364-
assertEquals("Verify exactly one LEAVE message was generated", leaveMessages.size(), 1);
2344+
assertEquals(testClientId1, presentMembers.items()[0].clientId);
23652345

2366-
PresenceMessage leaveMessage = leaveMessages.get(0);
2367-
assertEquals("Verify LEAVE message follows specs",leaveMessage.action, Action.leave);
2368-
assertEquals("Verify LEAVE message follows specs",leaveMessage.clientId, testClientId2);
2369-
assertEquals("Verify LEAVE message follows specs",leaveMessage.data, presenceData);
2370-
assertTrue("Verify LEAVE message follows specs", Math.abs(leaveMessage.timestamp-reconnectTimestamp) < 2000);
23712346

2372-
/* According to RTP5f there should be no presence event emitted for client1 */
2373-
assertFalse("Verify no presence event emitted on return from suspend on SYNC for client1",
2374-
wrongPresenceEmitted[0]);
2375-
}
2376-
} finally {
2377-
if(ably != null)
2378-
ably.close();
2347+
/* In both cases we should have one leave message in the leaveMessages */
2348+
assertEquals("Verify exactly one LEAVE message was generated", leaveMessages.size(), 1);
2349+
2350+
PresenceMessage leaveMessage = leaveMessages.get(0);
2351+
assertEquals("Verify LEAVE message follows specs",leaveMessage.action, Action.leave);
2352+
assertEquals("Verify LEAVE message follows specs",leaveMessage.clientId, testClientId2);
2353+
assertEquals("Verify LEAVE message follows specs",leaveMessage.data, presenceData);
2354+
assertTrue("Verify LEAVE message follows specs", Math.abs(leaveMessage.timestamp-reconnectTimestamp) < 2000);
2355+
2356+
/* According to RTP5f there should be no presence event emitted for client1 */
2357+
assertFalse("Verify no presence event emitted on return from suspend on SYNC for client1",
2358+
wrongPresenceEmitted[0]);
23792359
}
23802360
}
23812361

2362+
private void waitForPresence(Predicate<PaginatedResult<PresenceMessage>> predicate) throws AblyException {
2363+
2364+
}
2365+
23822366
/**
23832367
* Test presence message map behaviour (RTP2 features)
23842368
* Tests RTP2a, RTP2b1, RTP2b2, RTP2c, RTP2d, RTP2g, RTP18c, RTP6a features

0 commit comments

Comments
 (0)