Skip to content

Missing onError handler in subscribe() method call #2817

@JornR94

Description

@JornR94

Before opening, please confirm:

Language and Async Model

Java

Amplify Categories

GraphQL API, DataStore

Gradle script dependencies

    implementation 'com.amplifyframework:aws-api:2.14.11'
    implementation 'com.amplifyframework:aws-datastore:2.14.11'
    coreLibraryDesugaring 'com.android.tools:desugar_jdk_libs:1.1.5'

Environment information

------------------------------------------------------------
Gradle 8.0
------------------------------------------------------------

Build time:   2023-02-13 13:15:21 UTC
Revision:     62ab9b7c7f884426cf79fbedcf07658b2dbe9e97

Kotlin:       1.8.10
Groovy:       3.0.13
Ant:          Apache Ant(TM) version 1.10.11 compiled on July 10 2021
JVM:          17.0.6 (JetBrains s.r.o. 17.0.6+0-b2043.56-10027231)
OS:           Windows 10 10.0 amd64

Please include any relevant guides or documentation you're referencing

No response

Describe the bug

Since I have included Amplify in our production app on Android, I'm seeing a significant amount of java.util.concurrent.TimeoutException crashes with the crash message: "The source did not signal an event for 5000 milliseconds and has been terminated."
Full error message: "The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 5000 milliseconds and has been terminated."

This crash already occurred for ~2% of my users, which is quite significant since usually our crash rate is below 0.3%. Is there perhaps an issue in my implementation, or could this be an issue with the Amplify library? (it does mention "missing onError handler in the subscribe() method call") I'm using the DataStore and GraphQL subscriptions, I'll provide a code snippet of the way I use Amplify below.

P.S. Interestingly, I saw a similar crash happening on this GitHub issue from 2020

Full stack trace (if helpful)

Fatal Exception: kf.e
The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException: The source did not signal an event for 5000 milliseconds and has been terminated.
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.accept (CallbackCompletableObserver.java:47)
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.accept (CallbackCompletableObserver.java:26)
io.reactivex.rxjava3.internal.observers.CallbackCompletableObserver.onError (CallbackCompletableObserver.java:64)
io.reactivex.rxjava3.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onError (CompletablePeek.java:95)
io.reactivex.rxjava3.internal.operators.completable.CompletablePeek$CompletableObserverImplementation.onError (CompletablePeek.java:95)
io.reactivex.rxjava3.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.onError (CompletableSubscribeOn.java:74)
io.reactivex.rxjava3.internal.operators.completable.CompletableTimeout$DisposeTask.run (CompletableTimeout.java:109)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:41)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:28)
java.util.concurrent.FutureTask.run (FutureTask.java:264)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:307)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:644)
java.lang.Thread.run (Thread.java:1012)

Caused by java.util.concurrent.TimeoutException
The source did not signal an event for 5000 milliseconds and has been terminated.
io.reactivex.rxjava3.internal.operators.completable.CompletableTimeout$DisposeTask.run (CompletableTimeout.java:109)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:41)
io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call (ScheduledDirectTask.java:28)
java.util.concurrent.FutureTask.run (FutureTask.java:264)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:307)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:644)
java.lang.Thread.run (Thread.java:1012)

GraphQL schema

input AMPLIFY { globalAuthRule: AuthRule = { allow: public } } # FOR TESTING ONLY!

enum UnblockStatus {
    inProgress
    cancelled
    success
    error
}

type UnblockRequest @model @auth(rules: [{ allow: public }]) {
    id: ID!
    userEmail: String!
    barrier: String!
    barrierDifficulty: Int!
    timestamp: AWSTimestamp!
}

type UnblockResult @model @auth(rules: [{ allow: public }]) {
    id: ID!
    userEmail: String!
    timestamp: AWSTimestamp!
    unblockStatus: UnblockStatus!
}

Reproduction steps (if applicable)

No response

Code Snippet

// This is the relevant code where I send / update data

    public void sendUnblockResult(Context context, UnblockStatus unblockStatus) {
        UnblockResult unblockResult = UnblockResult.builder()
                .userEmail(AppConfig.getInstance(context).getUser().getEmail())
                .timestamp(Temporal.Timestamp.now())
                .unblockStatus(unblockStatus)
                .build();
        Amplify.DataStore.save(unblockResult,
                success -> {
                    Log.i(AMPLIFY_DEBUG_TAG, "Saved item: " + success.item());
                    setLatestUnblockResultId(context, success.item().getId());
                },
                error -> Log.e(AMPLIFY_DEBUG_TAG, "Could not save item to DataStore", error)
        );
    }

    public void updateUnblockStatus(Context context, UnblockStatus unblockStatus) {
        String unblockId = getLatestUnblockId(context);
        Amplify.DataStore.query(UnblockResult.class, Where.matches(UnblockResult.ID.eq(unblockId)),
                matches -> {
                    if (matches.hasNext()) {
                        UnblockResult unblockResult = matches.next();
                        UnblockResult updatedUnblockResult = unblockResult.copyOfBuilder()
                                .unblockStatus(unblockStatus)
                                .timestamp(Temporal.Timestamp.now())
                                .build();
                        Amplify.DataStore.save(updatedUnblockResult,
                                updated -> Log.i(AMPLIFY_DEBUG_TAG, "Updated unblock result: " + updatedUnblockResult),
                                failure -> Log.e(AMPLIFY_DEBUG_TAG, "Update failed.", failure)
                        );
                    }
                },
                failure -> {
                    Log.e(AMPLIFY_DEBUG_TAG, "Update failed.", failure);
                }
        );
    }


// And this is how I subscribe to updates, from my MainActivity
        Amplify.DataStore.observe(UnblockRequest.class,
                cancelable -> Log.i(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation began."),
                unblockRequestChanged -> {
                    // Do some stuff
                },
                failure -> Log.e(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation failed", failure),
                () -> Log.i(AmplifyManager.AMPLIFY_DEBUG_TAG, "Observation complete")
        );

Log output

// Put your logs below this line


amplifyconfiguration.json

No response

GraphQL Schema

// Put your schema below this line

Additional information and screenshots

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingdatastoreDataStore category/plugins

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions