-
Notifications
You must be signed in to change notification settings - Fork 127
Description
Before opening, please confirm:
- I have searched for duplicate or closed issues and discussions.
Language and Async Model
Java
Amplify Categories
GraphQL API, DataStore
Gradle script dependencies
implementation 'com.amplifyframework:aws-api:2.14.11'
implementation 'com.amplifyframework:aws-datastore:2.14.11'
coreLibraryDesugaring 'com.android.tools:desugar_jdk_libs:1.1.5'
Environment information
------------------------------------------------------------
Gradle 8.0
------------------------------------------------------------
Build time: 2023-02-13 13:15:21 UTC
Revision: 62ab9b7c7f884426cf79fbedcf07658b2dbe9e97
Kotlin: 1.8.10
Groovy: 3.0.13
Ant: Apache Ant(TM) version 1.10.11 compiled on July 10 2021
JVM: 17.0.6 (JetBrains s.r.o. 17.0.6+0-b2043.56-10027231)
OS: Windows 10 10.0 amd64
Please include any relevant guides or documentation you're referencing
No response
Describe the bug
Since I have included Amplify in our production app on Android, I'm seeing a significant amount of java.util.concurrent.TimeoutException
crashes with the crash message: "The source did not signal an event for 5000 milliseconds and has been terminated."
Full error message: "The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 5000 milliseconds and has been terminated."
This crash already occurred for ~2% of my users, which is quite significant since usually our crash rate is below 0.3%. Is there perhaps an issue in my implementation, or could this be an issue with the Amplify library? (it does mention "missing onError handler in the subscribe() method call") I'm using the DataStore and GraphQL subscriptions, I'll provide a code snippet of the way I use Amplify below.
P.S. Interestingly, I saw a similar crash happening on this GitHub issue from 2020
Full stack trace (if helpful)
Fatal Exception: kf.e
The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 5000 milliseconds and has been terminated.
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.accept (CallbackCompletableObserver.java:47)
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.accept (CallbackCompletableObserver.java:26)
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.onError (CallbackCompletableObserver.java:64)
io.reactivex.rxjava3.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onError (CompletablePeek.java:95)
io.reactivex.rxjava3.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onError (CompletablePeek.java:95)
io.reactivex.rxjava3.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.onError (CompletableSubscribeOn.java:74)
io.reactivex.rxjava3.internal.operators.completable.CompletableTimeout$DisposeTask.run (CompletableTimeout.java:109)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:41)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:28)
java.util.concurrent.FutureTask.run (FutureTask.java:264)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:307)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:644)
java.lang.Thread.run (Thread.java:1012)
Caused by java.util.concurrent.TimeoutException
The source did not signal an event for 5000 milliseconds and has been terminated.
io.reactivex.rxjava3.internal.operators.completable.CompletableTimeout$DisposeTask.run (CompletableTimeout.java:109)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:41)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:28)
java.util.concurrent.FutureTask.run (FutureTask.java:264)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:307)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:644)
java.lang.Thread.run (Thread.java:1012)
GraphQL schema
input AMPLIFY { globalAuthRule: AuthRule = { allow: public } } # FOR TESTING ONLY!
enum UnblockStatus {
inProgress
cancelled
success
error
}
type UnblockRequest @model @auth(rules: [{ allow: public }]) {
id: ID!
userEmail: String!
barrier: String!
barrierDifficulty: Int!
timestamp: AWSTimestamp!
}
type UnblockResult @model @auth(rules: [{ allow: public }]) {
id: ID!
userEmail: String!
timestamp: AWSTimestamp!
unblockStatus: UnblockStatus!
}
Reproduction steps (if applicable)
No response
Code Snippet
// This is the relevant code where I send / update data
public void sendUnblockResult(Context context, UnblockStatus unblockStatus) {
UnblockResult unblockResult = UnblockResult.builder()
.userEmail(AppConfig.getInstance(context).getUser().getEmail())
.timestamp(Temporal.Timestamp.now())
.unblockStatus(unblockStatus)
.build();
Amplify.DataStore.save(unblockResult,
success -> {
Log.i(AMPLIFY_DEBUG_TAG, "Saved item: " + success.item());
setLatestUnblockResultId(context, success.item().getId());
},
error -> Log.e(AMPLIFY_DEBUG_TAG, "Could not save item to DataStore", error)
);
}
public void updateUnblockStatus(Context context, UnblockStatus unblockStatus) {
String unblockId = getLatestUnblockId(context);
Amplify.DataStore.query(UnblockResult.class, Where.matches(UnblockResult.ID.eq(unblockId)),
matches -> {
if (matches.hasNext()) {
UnblockResult unblockResult = matches.next();
UnblockResult updatedUnblockResult = unblockResult.copyOfBuilder()
.unblockStatus(unblockStatus)
.timestamp(Temporal.Timestamp.now())
.build();
Amplify.DataStore.save(updatedUnblockResult,
updated -> Log.i(AMPLIFY_DEBUG_TAG, "Updated unblock result: " + updatedUnblockResult),
failure -> Log.e(AMPLIFY_DEBUG_TAG, "Update failed.", failure)
);
}
},
failure -> {
Log.e(AMPLIFY_DEBUG_TAG, "Update failed.", failure);
}
);
}
// And this is how I subscribe to updates, from my MainActivity
Amplify.DataStore.observe(UnblockRequest.class,
cancelable -> Log.i(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation began."),
unblockRequestChanged -> {
// Do some stuff
},
failure -> Log.e(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation failed", failure),
() -> Log.i(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation complete")
);
Log output
// Put your logs below this line
amplifyconfiguration.json
No response
GraphQL Schema
// Put your schema below this line
Additional information and screenshots
No response