Skip to content

Commit

Permalink
🗄️ Do not delay emission of items in watch() (#3950)
Browse files Browse the repository at this point in the history
* add a test

* buffer the flow

* add isLast and use it as a hint to postpone the emission of the last item to after the watcher has started

* update apiDump

* minor change to trigger a new CI run after yesterday's Github issues
  • Loading branch information
martinbonnin authored Mar 25, 2022
1 parent a33a29d commit 9bd26f4
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 29 deletions.
4 changes: 3 additions & 1 deletion apollo-api/api/apollo-api.api
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ public final class com/apollographql/apollo3/api/ApolloResponse {
public final field errors Ljava/util/List;
public final field executionContext Lcom/apollographql/apollo3/api/ExecutionContext;
public final field extensions Ljava/util/Map;
public final field isLast Z
public final field operation Lcom/apollographql/apollo3/api/Operation;
public final field requestUuid Ljava/util/UUID;
public synthetic fun <init> (Ljava/util/UUID;Lcom/apollographql/apollo3/api/Operation;Lcom/apollographql/apollo3/api/Operation$Data;Ljava/util/List;Ljava/util/Map;Lcom/apollographql/apollo3/api/ExecutionContext;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> (Ljava/util/UUID;Lcom/apollographql/apollo3/api/Operation;Lcom/apollographql/apollo3/api/Operation$Data;Ljava/util/List;Ljava/util/Map;Lcom/apollographql/apollo3/api/ExecutionContext;ZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun dataAssertNoErrors ()Lcom/apollographql/apollo3/api/Operation$Data;
public final fun hasErrors ()Z
public final fun newBuilder ()Lcom/apollographql/apollo3/api/ApolloResponse$Builder;
Expand All @@ -98,6 +99,7 @@ public final class com/apollographql/apollo3/api/ApolloResponse$Builder {
public final fun build ()Lcom/apollographql/apollo3/api/ApolloResponse;
public final fun errors (Ljava/util/List;)Lcom/apollographql/apollo3/api/ApolloResponse$Builder;
public final fun extensions (Ljava/util/Map;)Lcom/apollographql/apollo3/api/ApolloResponse$Builder;
public final fun isLast (Z)Lcom/apollographql/apollo3/api/ApolloResponse$Builder;
public final fun requestUuid (Ljava/util/UUID;)Lcom/apollographql/apollo3/api/ApolloResponse$Builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ private constructor(
*/
@JvmField
val executionContext: ExecutionContext,

/**
* Indicates that this [ApolloResponse] is the last [ApolloResponse] in a given [Flow] and that no
* other items are expected.
*
* This is used as a hint by the watchers to make sure to subscribe before the last item is emitted.
*
* There can be false negatives where [isLast] is false if the producer does not know in advance if
* other items are emitted. For an example, the CacheAndNetwork fetch policy doesn't emit the network
* item if it fails.
*
* There must not be false positives. If [isLast] is true, no other items must follow.
*/
@JvmField
val isLast: Boolean,
) {

/**
Expand All @@ -72,6 +87,7 @@ private constructor(
.errors(errors)
.extensions(extensions)
.addExecutionContext(executionContext)
.isLast(isLast)
}

class Builder<D : Operation.Data>(
Expand All @@ -82,6 +98,7 @@ private constructor(
private var executionContext: ExecutionContext = ExecutionContext.Empty
private var errors: List<Error>? = null
private var extensions: Map<String, Any?>? = null
private var isLast = false

fun addExecutionContext(executionContext: ExecutionContext) = apply {
this.executionContext = this.executionContext + executionContext
Expand All @@ -99,6 +116,10 @@ private constructor(
this.requestUuid = requestUuid
}

fun isLast(isLast: Boolean) = apply {
this.isLast = isLast
}

fun build(): ApolloResponse<D> {
@Suppress("DEPRECATION")
return ApolloResponse(
Expand All @@ -107,7 +128,8 @@ private constructor(
data = data,
executionContext = executionContext,
extensions = extensions ?: emptyMap(),
errors = errors ,
errors = errors,
isLast = isLast,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ import com.apollographql.apollo3.exception.ApolloException
import com.apollographql.apollo3.exception.CacheMissException
import com.apollographql.apollo3.interceptor.ApolloInterceptor
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlin.jvm.JvmName
import kotlin.jvm.JvmOverloads
Expand Down Expand Up @@ -135,27 +131,48 @@ fun <D : Query.Data> ApolloCall<D>.watch(
): Flow<ApolloResponse<D>> {
return flow {
var lastResponse: ApolloResponse<D>? = null
var response: ApolloResponse<D>? = null

toFlow()
.catch {
if (it !is ApolloException || fetchThrows) throw it
}.collect {
response = it

if (it.isLast) {
if (lastResponse != null) {
/**
* If we ever come here it means some interceptors built a new Flow and forgot to reset the isLast flag
* Better safe than sorry: emit them when we realize that. This will introduce a delay in the response.
*/
println("ApolloGraphQL: extra response received after the last one")
emit(lastResponse!!)
}
/**
* Remember the last response so that we can send it after we subscribe to the store
*
* This allows callers to use the last element as a synchronisation point to modify the store and still have the watcher
* receive subsequent updates
*
* See https://github.com/apollographql/apollo-kotlin/pull/3853
*/
lastResponse = it
} else {
emit(it)
}
}

copy().fetchPolicyInterceptor(refetchPolicyInterceptor)
.watch(response?.data) { _, _ ->
// If the exception is ignored (refetchThrows is false), we should continue watching - so retry
!refetchThrows
}.onStart {
if (lastResponse != null) {
emit(lastResponse!!)
}
lastResponse = it
}.collect {
emit(it)
}

emitAll(
copy().fetchPolicyInterceptor(refetchPolicyInterceptor)
.watch(lastResponse?.data) { _, _ ->
// If the exception is ignored (refetchThrows is false), we should continue watching - so retry
!refetchThrows
}.onStart {
if (lastResponse != null) {
emit(lastResponse!!)
}
}
)
}
}

Expand Down Expand Up @@ -312,6 +329,9 @@ fun <D : Mutation.Data> ApolloCall<D>.optimisticUpdates(data: D) = addExecutionC
internal val <D : Operation.Data> ApolloRequest<D>.fetchPolicyInterceptor
get() = executionContext[FetchPolicyContext]?.interceptor ?: CacheFirstInterceptor

internal val <D : Operation.Data> ApolloCall<D>.fetchPolicyInterceptor
get() = executionContext[FetchPolicyContext]?.interceptor ?: CacheFirstInterceptor

private val <T> MutableExecutionOptions<T>.refetchPolicyInterceptor
get() = executionContext[RefetchPolicyContext]?.interceptor ?: CacheOnlyInterceptor

Expand Down Expand Up @@ -533,4 +553,3 @@ internal fun <D : Operation.Data> ApolloRequest.Builder<D>.fetchFromCache(fetchF

internal val <D : Operation.Data> ApolloRequest<D>.fetchFromCache
get() = executionContext[FetchFromCacheContext]?.value ?: false

Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ val CacheAndNetworkInterceptor = object : ApolloInterceptor {
}.singleOrNull()

if (cacheResponse != null) {
emit(cacheResponse)
emit(cacheResponse.newBuilder().isLast(false).build())
}

val networkResponse = chain.proceed(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ internal class ApolloCacheInterceptor(
.cacheHit(true)
.build()
)
.isLast(true)
.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private constructor(

emitAll(
bodies.map { body ->
// On native, body will be freezed in worker.doWork, but reading a BufferedSource is a mutating operation.
// On native, body will be frozen in worker.doWork, but reading a BufferedSource is a mutating operation.
// So we read the bytes into a ByteString first, and create a new BufferedSource from it after freezing.
val bodyByteString = if (platform() == Platform.Native) body.readByteString() else null
// Do not capture request
Expand Down Expand Up @@ -124,6 +124,7 @@ private constructor(
headers = httpResponse.headers
)
)
.isLast(true)
.build()
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class QueueTestNetworkTransport : NetworkTransport {
val response = lock.withLock { queue.removeFirstOrNull() } ?: error("No more responses in queue")
if (response is TestResponse.NetworkError) throw ApolloNetworkException("Network error queued in QueueTestNetworkTransport")
@Suppress("UNCHECKED_CAST")
return flowOf((response as TestResponse.Response).response as ApolloResponse<D>)
val apolloResponse = (response as TestResponse.Response).response as ApolloResponse<D>
return flowOf(apolloResponse.newBuilder().isLast(true).build())
}

fun <D : Operation.Data> enqueue(response: ApolloResponse<D>) {
Expand Down Expand Up @@ -56,7 +57,8 @@ class MapTestNetworkTransport : NetworkTransport {
?: error("No response registered for operation ${request.operation}")
if (response is TestResponse.NetworkError) throw ApolloNetworkException("Network error queued in QueueTestNetworkTransport")
@Suppress("UNCHECKED_CAST")
return flowOf((response as TestResponse.Response).response as ApolloResponse<D>)
val apolloResponse = (response as TestResponse.Response).response as ApolloResponse<D>
return flowOf(apolloResponse.newBuilder().isLast(true).build())
}

fun <D : Operation.Data> register(operation: Operation<D>, response: ApolloResponse<D>) {
Expand Down
40 changes: 35 additions & 5 deletions tests/integration-tests/src/commonTest/kotlin/test/WatcherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package test
import IdCacheKeyGenerator
import com.apollographql.apollo3.ApolloClient
import com.apollographql.apollo3.annotations.ApolloExperimental
import com.apollographql.apollo3.api.ApolloResponse
import com.apollographql.apollo3.api.CustomScalarAdapters
import com.apollographql.apollo3.cache.normalized.ApolloStore
import com.apollographql.apollo3.cache.normalized.FetchPolicy
import com.apollographql.apollo3.cache.normalized.api.CacheHeaders
import com.apollographql.apollo3.cache.normalized.api.MemoryCacheFactory
import com.apollographql.apollo3.cache.normalized.fetchPolicy
import com.apollographql.apollo3.cache.normalized.normalizedCache
import com.apollographql.apollo3.cache.normalized.refetchPolicy
import com.apollographql.apollo3.cache.normalized.store
import com.apollographql.apollo3.cache.normalized.watch
Expand All @@ -19,25 +19,26 @@ import com.apollographql.apollo3.integration.normalizer.EpisodeHeroNameWithIdQue
import com.apollographql.apollo3.integration.normalizer.HeroAndFriendsNamesWithIDsQuery
import com.apollographql.apollo3.integration.normalizer.StarshipByIdQuery
import com.apollographql.apollo3.integration.normalizer.type.Episode
import com.apollographql.apollo3.testing.MapTestNetworkTransport
import com.apollographql.apollo3.mockserver.MockResponse
import com.apollographql.apollo3.mockserver.MockServer
import com.apollographql.apollo3.testing.QueueTestNetworkTransport
import com.apollographql.apollo3.testing.enqueue
import com.apollographql.apollo3.testing.enqueueTestNetworkError
import com.apollographql.apollo3.testing.enqueueTestResponse
import com.apollographql.apollo3.testing.receiveOrTimeout
import com.apollographql.apollo3.testing.runTest
import com.benasher44.uuid.uuid4
import com.benasher44.uuid.uuidFrom
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlin.test.Ignore
import kotlinx.coroutines.withTimeout
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.fail
Expand Down Expand Up @@ -500,6 +501,35 @@ class WatcherTest {
job.cancel()
}

@Test
fun cacheAndNetworkEmitsCacheImmediately() = runTest {
// This doesn't use TestNetworkTransport because we need timing control
val mockServer = MockServer()
val apolloClient = ApolloClient.Builder()
.normalizedCache(MemoryCacheFactory())
.serverUrl(mockServer.url())
.build()

val query = EpisodeHeroNameQuery(Episode.EMPIRE)

// Set up the cache with a "R2-D2" name
mockServer.enqueue(query, episodeHeroNameData)
apolloClient.query(query).fetchPolicy(FetchPolicy.NetworkOnly).execute()

// Prepare next call to be a network error
mockServer.enqueue(MockResponse(delayMillis = Long.MAX_VALUE))

withTimeout(500) {
// make sure we get the cache only result
val response = apolloClient.query(query).fetchPolicy(FetchPolicy.CacheAndNetwork).watch().first()
assertEquals("R2-D2", response.data?.hero?.name)
}

mockServer.stop()
apolloClient.dispose()
}


/**
* watchCacheAndNetwork() with a network error on the initial call
*/
Expand Down

0 comments on commit 9bd26f4

Please sign in to comment.