Skip to content

Commit 90387fe

Browse files
authored
Merge pull request #1184 from ably/AIT-84/message-publish-result
[AIT-84] feat: add support for returning serials on `publish`
2 parents 85af5c4 + 982be70 commit 90387fe

File tree

10 files changed

+322
-206
lines changed

10 files changed

+322
-206
lines changed

.github/workflows/integration-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
uses: actions/setup-java@v4
2020
with:
2121
java-version: '17'
22-
distribution: 'corretto'
22+
distribution: 'temurin'
2323

2424
- name: Set up Gradle
2525
uses: gradle/actions/setup-gradle@v3

android/src/main/java/io/ably/lib/push/PushChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void subscribeClient() throws AblyException {
4949
* @throws AblyException
5050
*/
5151
public void subscribeClientAsync(CompletionListener listener) {
52-
subscribeClientImpl().async(new CompletionListener.ToCallback(listener));
52+
subscribeClientImpl().async(new CompletionListener.ToCallback<>(listener));
5353
}
5454

5555
protected Http.Request<Void> subscribeClientImpl() {
@@ -83,7 +83,7 @@ public void subscribeDevice() throws AblyException {
8383
* @throws AblyException
8484
*/
8585
public void subscribeDeviceAsync(CompletionListener listener) {
86-
subscribeDeviceImpl().async(new CompletionListener.ToCallback(listener));
86+
subscribeDeviceImpl().async(new CompletionListener.ToCallback<>(listener));
8787
}
8888

8989
protected Http.Request<Void> subscribeDeviceImpl() {
@@ -131,7 +131,7 @@ public void unsubscribeClient() throws AblyException {
131131
* @throws AblyException
132132
*/
133133
public void unsubscribeClientAsync(CompletionListener listener) {
134-
unsubscribeClientImpl().async(new CompletionListener.ToCallback(listener));
134+
unsubscribeClientImpl().async(new CompletionListener.ToCallback<>(listener));
135135
}
136136

137137
protected Http.Request<Void> unsubscribeClientImpl() {
@@ -163,7 +163,7 @@ public void unsubscribeDevice() throws AblyException {
163163
* @throws AblyException
164164
*/
165165
public void unsubscribeDeviceAsync(CompletionListener listener) {
166-
unsubscribeDeviceImpl().async(new CompletionListener.ToCallback(listener));
166+
unsubscribeDeviceImpl().async(new CompletionListener.ToCallback<>(listener));
167167
}
168168

169169
protected Http.Request<Void> unsubscribeDeviceImpl() {

java/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ tasks.register<Test>("testRealtimeSuite") {
6767
}
6868
retry {
6969
maxRetries.set(3)
70-
maxFailures.set(8)
70+
maxFailures.set(15)
7171
failOnPassedAfterRetry.set(false)
7272
failOnSkippedAfterRetry.set(false)
7373
}

lib/src/main/java/io/ably/lib/push/PushBase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void publish(Param[] recipient, JsonObject payload) throws AblyException
8080
* @throws AblyException
8181
*/
8282
public void publishAsync(Param[] recipient, JsonObject payload, final CompletionListener listener) {
83-
publishImpl(recipient, payload).async(new CompletionListener.ToCallback(listener));
83+
publishImpl(recipient, payload).async(new CompletionListener.ToCallback<>(listener));
8484
}
8585

8686
private Http.Request<Void> publishImpl(final Param[] recipient, final JsonObject payload) {
@@ -275,7 +275,7 @@ public void remove(String deviceId) throws AblyException {
275275
* @param listener A listener to be notified of success or failure.
276276
*/
277277
public void removeAsync(String deviceId, CompletionListener listener) {
278-
removeImpl(deviceId).async(new CompletionListener.ToCallback(listener));
278+
removeImpl(deviceId).async(new CompletionListener.ToCallback<>(listener));
279279
}
280280

281281
protected Http.Request<Void> removeImpl(final String deviceId) {
@@ -310,7 +310,7 @@ public void removeWhere(Param[] params) throws AblyException {
310310
* @param listener A listener to be notified of success or failure.
311311
*/
312312
public void removeWhereAsync(Param[] params, CompletionListener listener) {
313-
removeWhereImpl(params).async(new CompletionListener.ToCallback(listener));
313+
removeWhereImpl(params).async(new CompletionListener.ToCallback<>(listener));
314314
}
315315

316316
protected Http.Request<Void> removeWhereImpl(Param[] params) {
@@ -435,7 +435,7 @@ public void remove(ChannelSubscription subscription) throws AblyException {
435435
* @throws AblyException
436436
*/
437437
public void removeAsync(ChannelSubscription subscription, CompletionListener listener) {
438-
removeImpl(subscription).async(new CompletionListener.ToCallback(listener));
438+
removeImpl(subscription).async(new CompletionListener.ToCallback<>(listener));
439439
}
440440

441441
protected Http.Request<Void> removeImpl(ChannelSubscription subscription) {
@@ -476,7 +476,7 @@ public void removeWhere(Param[] params) throws AblyException {
476476
* @throws AblyException
477477
*/
478478
public void removeWhereAsync(Param[] params, CompletionListener listener) {
479-
removeWhereImpl(params).async(new CompletionListener.ToCallback(listener));
479+
removeWhereImpl(params).async(new CompletionListener.ToCallback<>(listener));
480480
}
481481

482482
protected Http.Request<Void> removeWhereImpl(Param[] params) {

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.ably.lib.types.ProtocolMessage;
4444
import io.ably.lib.types.ProtocolMessage.Action;
4545
import io.ably.lib.types.ProtocolMessage.Flag;
46+
import io.ably.lib.types.PublishResult;
4647
import io.ably.lib.types.Summary;
4748
import io.ably.lib.types.UpdateDeleteResult;
4849
import io.ably.lib.util.CollectionUtils;
@@ -445,6 +446,16 @@ private static void callCompletionListenerError(CompletionListener listener, Err
445446
}
446447
}
447448

449+
private static void callCompletionListenerError(Callback<PublishResult> listener, ErrorInfo err) {
450+
if(listener != null) {
451+
try {
452+
listener.onError(err);
453+
} catch(Throwable t) {
454+
Log.e(TAG, "Unexpected exception calling CompletionListener", t);
455+
}
456+
}
457+
}
458+
448459
private void setAttached(ProtocolMessage message) {
449460
clearAttachTimers();
450461
properties.attachSerial = message.channelSerial;
@@ -1026,8 +1037,9 @@ private void unsubscribeImpl(String name, MessageListener listener) {
10261037
* @param data the message payload
10271038
* @throws AblyException
10281039
*/
1040+
@NonBlocking
10291041
public void publish(String name, Object data) throws AblyException {
1030-
publish(name, data, null);
1042+
publish(name, data, (Callback<PublishResult>) null);
10311043
}
10321044

10331045
/**
@@ -1038,8 +1050,9 @@ public void publish(String name, Object data) throws AblyException {
10381050
* @param message A {@link Message} object.
10391051
* @throws AblyException
10401052
*/
1053+
@NonBlocking
10411054
public void publish(Message message) throws AblyException {
1042-
publish(message, null);
1055+
publish(message, (Callback<PublishResult>) null);
10431056
}
10441057

10451058
/**
@@ -1050,8 +1063,9 @@ public void publish(Message message) throws AblyException {
10501063
* @param messages An array of {@link Message} objects.
10511064
* @throws AblyException
10521065
*/
1066+
@NonBlocking
10531067
public void publish(Message[] messages) throws AblyException {
1054-
publish(messages, null);
1068+
publish(messages, (Callback<PublishResult>) null);
10551069
}
10561070

10571071
/**
@@ -1067,12 +1081,36 @@ public void publish(Message[] messages) throws AblyException {
10671081
* <p>
10681082
* This listener is invoked on a background thread.
10691083
* @throws AblyException
1084+
* @deprecated Use {@link #publish(String, Object, Callback)} instead.
10701085
*/
1086+
@Deprecated
1087+
@NonBlocking
10711088
public void publish(String name, Object data, CompletionListener listener) throws AblyException {
10721089
Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name);
10731090
publish(new Message[] {new Message(name, data)}, listener);
10741091
}
10751092

1093+
/**
1094+
* Publishes a single message to the channel with the given event name and payload.
1095+
* When publish is called with this client library, it won't attempt to implicitly attach to the channel,
1096+
* so long as <a href="https://ably.com/docs/realtime/channels#transient-publish">transient publishing</a> is available in the library.
1097+
* Otherwise, the client will implicitly attach.
1098+
* <p>
1099+
* Spec: RTL6i
1100+
* @param name the event name
1101+
* @param data the message payload
1102+
* @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation,
1103+
* receiving a {@link PublishResult} with message serial(s) on success.
1104+
* <p>
1105+
* This callback is invoked on a background thread.
1106+
* @throws AblyException
1107+
*/
1108+
@NonBlocking
1109+
public void publish(String name, Object data, Callback<PublishResult> callback) throws AblyException {
1110+
Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name);
1111+
publish(new Message[] {new Message(name, data)}, callback);
1112+
}
1113+
10761114
/**
10771115
* Publishes a message to the channel.
10781116
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
@@ -1083,12 +1121,33 @@ public void publish(String name, Object data, CompletionListener listener) throw
10831121
* <p>
10841122
* This listener is invoked on a background thread.
10851123
* @throws AblyException
1124+
* @deprecated Use {@link #publish(Message, Callback)} instead.
10861125
*/
1126+
@Deprecated
1127+
@NonBlocking
10871128
public void publish(Message message, CompletionListener listener) throws AblyException {
10881129
Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name);
10891130
publish(new Message[] {message}, listener);
10901131
}
10911132

1133+
/**
1134+
* Publishes a message to the channel.
1135+
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
1136+
* <p>
1137+
* Spec: RTL6i
1138+
* @param message A {@link Message} object.
1139+
* @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation,
1140+
* receiving a {@link PublishResult} with message serial(s) on success.
1141+
* <p>
1142+
* This callback is invoked on a background thread.
1143+
* @throws AblyException
1144+
*/
1145+
@NonBlocking
1146+
public void publish(Message message, Callback<PublishResult> callback) throws AblyException {
1147+
Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name);
1148+
publish(new Message[] {message}, callback);
1149+
}
1150+
10921151
/**
10931152
* Publishes an array of messages to the channel.
10941153
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
@@ -1099,8 +1158,16 @@ public void publish(Message message, CompletionListener listener) throws AblyExc
10991158
* <p>
11001159
* This listener is invoked on a background thread.
11011160
* @throws AblyException
1161+
* @deprecated Use {@link #publish(Message[], Callback)} instead.
11021162
*/
1163+
@Deprecated
1164+
@NonBlocking
11031165
public synchronized void publish(Message[] messages, CompletionListener listener) throws AblyException {
1166+
publish(messages, Listeners.fromCompletionListener(listener));
1167+
}
1168+
1169+
@NonBlocking
1170+
public synchronized void publish(Message[] messages, Callback<PublishResult> listener) throws AblyException {
11041171
Log.v(TAG, "publish(Message[]); channel = " + this.name);
11051172
ConnectionManager connectionManager = ably.connection.connectionManager;
11061173
ConnectionManager.State connectionState = connectionManager.getConnectionState();
@@ -1127,7 +1194,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener
11271194
case suspended:
11281195
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000));
11291196
default:
1130-
connectionManager.send(msg, queueMessages, Listeners.fromCompletionListener(listener));
1197+
connectionManager.send(msg, queueMessages, listener);
11311198
}
11321199
}
11331200

lib/src/main/java/io/ably/lib/realtime/CompletionListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ public void onError(ErrorInfo reason) {
4343
}
4444
}
4545

46-
class ToCallback implements Callback<Void> {
46+
class ToCallback<T> implements Callback<T> {
4747
private CompletionListener listener;
4848
public ToCallback(CompletionListener listener) {
4949
this.listener = listener;
5050
}
5151

5252
@Override
53-
public void onSuccess(Void v) {
53+
public void onSuccess(T v) {
5454
listener.onSuccess();
5555
}
5656

0 commit comments

Comments
 (0)