Skip to content

Commit

Permalink
Feature/make reduce synchronous (#18)
Browse files Browse the repository at this point in the history
* fix race condition

fix race condition when tying a Bloc lifecycle to the ViewModel lifecycle

* some library updates

* Duplicate send fun in Bloc

duplicate send function in Bloc so it uses an Action instead of a Proposal argument

* Make reduce synchronous

* delete obsolete class ReducerContainer

* Revert "delete obsolete class ReducerContainer"

This reverts commit d1a8324.

* Revert "Make reduce synchronous"

This reverts commit 8201b7b.

* dispatch is synchronous now

in thunks and initializers calling dispatch now synchronously triggers the reducers
the reducers are still added to the reducer queue but the thunk/initializer executing is suspended till the reducer has finished

* Improve documentation for the different contexts

* Improve side effect tests

* Side effect test with reducer and thunk

* Improve lifecycle tests

* improve initializer tests

* Add test for dispatch and reduce in initializer

* Improve thunk tests

* reduce was only partly running blocking when triggered from thunks and initializers

* Update the documentation

* Create new version

---------

Co-authored-by: Emanuel Moecklin <emanuel.moecklin@vivint.com>
  • Loading branch information
1gravity and 1gravity authored Jun 29, 2023
1 parent 44deed3 commit a911759
Show file tree
Hide file tree
Showing 30 changed files with 714 additions and 353 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ Note, this readme offers a quick overview of the framework. For more in-depth in
```kotlin
dependencies {
// the core library
implementation("com.1gravity:bloc-core:0.10.0")
implementation("com.1gravity:bloc-core:0.11.0")

// add to use the framework together with Redux
implementation("com.1gravity:bloc-redux:0.10.0")
implementation("com.1gravity:bloc-redux:0.11.0")

// useful extensions for Android and Jetpack/JetBrains Compose
implementation("com.1gravity:bloc-compose:0.10.0")
implementation("com.1gravity:bloc-compose:0.11.0")
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ fun ViewModel.blocContext(): BlocContext =
* The ViewModel would have to extend some BaseViewModel and we don't want that.
*/
private fun ViewModel.viewModelLifeCycle(): Lifecycle = object : LifecycleOwner {
override fun getLifecycle() = lifecycleRegistry
private val lifecycleRegistry = LifecycleRegistry(this)

override val lifecycle = lifecycleRegistry

init {
viewModelScope.launch(Dispatchers.Main) {
// viewModelScope is tied to Dispatchers.Main.immediate but we want to be explicit here
viewModelScope.launch(Dispatchers.Main.immediate) {
lifecycleRegistry.currentState = Lifecycle.State.CREATED // triggers onCreate()
lifecycleRegistry.currentState = Lifecycle.State.STARTED // triggers onStart()
lifecycleRegistry.currentState = Lifecycle.State.RESUMED // triggers onResume()
Expand Down
6 changes: 6 additions & 0 deletions bloc-core/src/commonMain/kotlin/com/onegravity/bloc/Bloc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ public abstract class Bloc<out State : Any, in Action : Any, SideEffect : Any> :
sideEffect: BlocObserver<SideEffect>?
)

/**
* Sink.send(Action)
*/
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
abstract override fun send(action: Action)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.suspendCoroutine

private const val QUEUE_INITIAL_SIZE = 10

Expand Down Expand Up @@ -73,17 +75,29 @@ internal class BlocImpl<State : Any, Action : Any, SideEffect : Any, Proposal :
reducers = reducers
)

// this reducer emits the proposal directly to the BlocState, no reduce functionality
private val reducer: (proposal: Proposal) -> Unit = { proposal ->
reduceProcessor.reduce { Effect(proposal, emptyList()) }
/**
* Function that sends the proposal directly to the BlocState and waits for the process to
* finish by suspending execution.
*/
private val reducer: suspend (proposal: Proposal) -> Unit = { proposal ->
suspendCoroutine { continuation ->
reduceProcessor.reduce({ Effect(proposal, emptyList()) }, continuation)
}
}

/**
* Reduces the given action by suspending the coroutine until the action has been processed.
*/
private suspend fun reduce(action: Action) = suspendCoroutine { continuation ->
reduceProcessor.send(action, continuation)
}

private val thunkProcessor = ThunkProcessor(
lifecycle = blocLifecycle,
state = blocState,
dispatcher = thunkDispatcher,
thunks = thunks,
dispatch = reduceProcessor::send,
dispatch = { reduce(it) },
reduce = reducer
)

Expand All @@ -92,7 +106,10 @@ internal class BlocImpl<State : Any, Action : Any, SideEffect : Any, Proposal :
state = blocState,
dispatcher = initDispatcher,
initializer = initialize,
dispatch = { thunkProcessor.send(it) },
dispatch = {
val processed = thunkProcessor.send(it)
if (processed.not()) reduce(it)
},
reduce = reducer
)

Expand Down Expand Up @@ -123,15 +140,22 @@ internal class BlocImpl<State : Any, Action : Any, SideEffect : Any, Proposal :
* The Sink to dispatch Actions.
* All it does is add the Action to a queue to be processed asynchronously.
*/
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun send(action: Action) {
when {
// we need to cache actions if the initializer is still running
blocLifecycle.isStarting() -> actionQueue += ActionQueueElement(action = action)

// thunks are always processed first
// ThunkProcessor will send the action to ReduceProcessor if there's no matching thunk
blocLifecycle.isStarted() -> thunkProcessor.send(action)
blocLifecycle.isStarted() -> {
val processed = thunkProcessor.send(action)
if (processed.not()) {
// reducers are meant to run on the main thread -> using runBlocking here is OK
runBlocking {
reduce(action)
}
}
}

else -> { /* NOP*/ }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ internal class InitializeProcessor<State : Any, Action : Any, Proposal : Any>(
private val state: BlocState<State, Proposal>,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
private var initializer: Initializer<State, Action, Proposal>? = null,
private val dispatch: (Action) -> Unit,
private val reduce: (proposal: Proposal) -> Unit
private val dispatch: suspend (Action) -> Unit,
private val reduce: suspend (proposal: Proposal) -> Unit
) {

/**
Expand Down Expand Up @@ -68,7 +68,7 @@ internal class InitializeProcessor<State : Any, Action : Any, Proposal : Any>(
coroutineHelper.launch {
if (mutex.tryLock(this@InitializeProcessor)) {
val context = InitializerContext(
state = state.value,
getState = { state.value },
dispatch = dispatch,
reduce = reduce,
launchBlock = coroutineHelper::launch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.receiveAsFlow
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume

/**
* The ReduceProcessor is responsible for processing the reduce { }, reduceAnd { } and
Expand Down Expand Up @@ -73,8 +75,10 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop

coroutineHelper.launch {
for (element in reduceChannel) {
element.action?.run(::runReducers)
element.reducer?.run(::runReducer)
// reducers triggered by actions
element.action?.let { runReducers(it, element.continuation) }
// reducers triggered MVVM+ style
element.reducer?.let { runReducer(it, element.continuation) }
}
}
}
Expand All @@ -83,24 +87,27 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop
* BlocDSL:
* reduce { } -> run a Reducer Redux style
*/
internal fun send(action: Action) {
internal fun send(action: Action, continuation: Continuation<Unit>) {
logger.d("received reducer with action ${action.trimOutput()}")
reduceChannel.trySend(ReducerContainer(action))
reduceChannel.trySend(ReducerContainer(action = action, continuation = continuation))
}

/**
* BlocExtension interface implementation:
* reduce { } -> run a Reducer MVVM+ style
*/
internal fun reduce(reduce: ReducerNoAction<State, Effect<Proposal, SideEffect>>) {
internal fun reduce(
reduce: ReducerNoAction<State, Effect<Proposal, SideEffect>>,
continuation: Continuation<Unit>? = null
) {
logger.d("received reducer without action")
reduceChannel.trySend(ReducerContainer(reducer = reduce))
reduceChannel.trySend(ReducerContainer(reducer = reduce, continuation = continuation))
}

/**
* Triggered to execute reducers with a matching Action
*/
private fun runReducers(action: Action) {
private fun runReducers(action: Action, continuation: Continuation<Unit>?) {
logger.d("run reducers for action ${action.trimOutput()}")
getMatchingReducers(action).fold(false) { proposalEmitted, matcherReducer ->
val (_, reducer, expectsProposal) = matcherReducer
Expand All @@ -122,6 +129,7 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop
else -> proposalEmitted
}
}
continuation?.resume(Unit)
}

private fun getMatchingReducers(action: Action) = reducers
Expand All @@ -144,11 +152,15 @@ internal class ReduceProcessor<State : Any, Action : Any, SideEffect : Any, Prop
/**
* Triggered to execute a specific reducer (dispatched MVVM+ style)
*/
private fun runReducer(reduce: ReducerNoAction<State, Effect<Proposal, SideEffect>>) {
private fun runReducer(
reduce: ReducerNoAction<State, Effect<Proposal, SideEffect>>,
continuation: Continuation<Unit>?
) {
val context = ReducerContextNoAction(state.value, coroutineHelper::launch)
val (proposal, sideEffects) = context.reduce()
proposal?.let(state::send)
sideEffects.forEach(sideEffectChannel::trySend)
continuation?.resume(Unit)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package com.onegravity.bloc.internal

import com.onegravity.bloc.utils.Effect
import com.onegravity.bloc.utils.ReducerNoAction
import kotlin.coroutines.Continuation

/**
* Wrapper class for reducers that are submitted Redux style (send(Action)) or MVVM+ style (reduce { })
*
* @param action Action if the reducer was triggered by an Action
* @param reducer Reducer function if the reducer was triggered MVVM+ style
* @param continuation Continuation if the caller is suspending till the reducer is done
*/
internal data class ReducerContainer<State : Any, Action : Any, SideEffect : Any, Proposal : Any>(
val action: Action? = null,
val reducer: ReducerNoAction<State, Effect<Proposal, SideEffect>>? = null
val reducer: ReducerNoAction<State, Effect<Proposal, SideEffect>>? = null,
val continuation: Continuation<Unit>?
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ internal class ThunkProcessor<State : Any, Action : Any, Proposal : Any>(
private val state: BlocState<State, Proposal>,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
private val thunks: List<MatcherThunk<State, Action, Action, Proposal>> = emptyList(),
private val dispatch: (action: Action) -> Unit,
private val reduce: (proposal: Proposal) -> Unit
private val dispatch: suspend (action: Action) -> Unit,
private val reduce: suspend (proposal: Proposal) -> Unit
) {

/**
Expand Down Expand Up @@ -65,13 +65,13 @@ internal class ThunkProcessor<State : Any, Action : Any, Proposal : Any>(
* BlocDSL:
* thunk { } -> run a thunk Redux style
*/
internal fun send(action: Action) {
internal fun send(action: Action): Boolean {
logger.d("received thunk with action ${action.trimOutput()}")
if (thunks.any { it.matcher == null || it.matcher.matches(action) }) {
thunkChannel.trySend(action)
} else {
dispatch(action)
return true
}
return false
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class BlocBuilder<State : Any, Action : Any, SE : Any, Proposal : Any> {
public fun onCreate(initialize: Initializer<State, Action, Proposal>) {
when (_initialize) {
null -> _initialize = initialize
else -> logger.w("Initializer already defined -> ignoring this one")
else -> error("Initializer already defined, there can be only one!")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import kotlinx.coroutines.flow.FlowCollector
*
* It needs to be a class so generic types aren't erased in Swift.
*/
public abstract class BlocState<out State : Any, in Proposal : Any> : StateStream<State>,
public abstract class BlocState<out State : Any, in Proposal : Any> :
StateStream<State>,
Sink<Proposal> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ package com.onegravity.bloc.utils
* // dispatch(action)
* }
* ```
*
* @param getState returns the current state
* @param dispatch dispatches an action
* @param reduce reduces a proposal
* @param launchBlock launch a coroutine without exposing the bloc's CoroutineScope,
* it's internal to allow for JobConfig default values via extension functions
*/
public data class InitializerContext<State, Action, Proposal>(
val state: State,
val getState: GetState<State>,
val dispatch: Dispatcher<Action>,
val reduce: (proposal: Proposal) -> Unit,
val reduce: suspend (proposal: Proposal) -> Unit,
internal val launchBlock: Launch
)

Expand All @@ -27,12 +33,19 @@ public data class InitializerContext<State, Action, Proposal>(
* thunk(EnumAction) {
* }
* ```
*
* @param getState returns the current state
* @param action the action that triggered the thunk
* @param dispatch dispatches an action
* @param reduce reduces a proposal
* @param launchBlock launch a coroutine without exposing the bloc's CoroutineScope,
* it's internal to allow for JobConfig default values via extension functions
*/
public data class ThunkContext<State, Action, A : Action, Proposal>(
val getState: GetState<State>,
val action: A,
val dispatch: Dispatcher<Action>,
val reduce: (proposal: Proposal) -> Unit,
val reduce: suspend (proposal: Proposal) -> Unit,
internal val launchBlock: Launch
)

Expand All @@ -42,11 +55,17 @@ public data class ThunkContext<State, Action, A : Action, Proposal>(
* fun doSomething() = thunk {
* }
* ```
*
* @param getState returns the current state
* @param dispatch dispatches an action
* @param reduce reduces a proposal
* @param launchBlock launch a coroutine without exposing the bloc's CoroutineScope,
* it's internal to allow for JobConfig default values via extension functions
*/
public data class ThunkContextNoAction<State, Action, Proposal>(
val getState: GetState<State>,
val dispatch: Dispatcher<Action>,
val reduce: (proposal: Proposal) -> Unit,
val reduce: suspend (proposal: Proposal) -> Unit,
internal val launchBlock: Launch
)

Expand All @@ -61,6 +80,11 @@ public data class ThunkContextNoAction<State, Action, Proposal>(
* reduce(EnumAction) {
* }
* ```
*
* @param state the current state
* @param action the action that triggered the reducer
* @param launchBlock launch a coroutine without exposing the bloc's CoroutineScope,
* it's internal to allow for JobConfig default values via extension functions
*/
public data class ReducerContext<State, Action>(
val state: State,
Expand All @@ -74,6 +98,10 @@ public data class ReducerContext<State, Action>(
* fun doSomething() = reduce {
* }
* ```
*
* @param state the current state
* @param launchBlock launch a coroutine without exposing the bloc's CoroutineScope,
* it's internal to allow for JobConfig default values via extension functions
*/
public data class ReducerContextNoAction<State>(
val state: State,
Expand Down
5 changes: 3 additions & 2 deletions bloc-core/src/commonTest/kotlin/com/onegravity/bloc/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ fun <State : Any, Action : Any, SideEffect : Any> collectSideEffects(
suspend fun <State : Any, Action : Any, SideEffect : Any> testState(
bloc: Bloc<State, Action, SideEffect>,
action: Action?,
expected: State
expected: State,
delay: Long = 10
) {
if (action != null) bloc.send(action)
delay(100)
delay(delay)
assertEquals(expected, bloc.value)
}
Loading

0 comments on commit a911759

Please sign in to comment.