Skip to content

Commit

Permalink
Clean up and document a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Krever committed Feb 13, 2024
1 parent 08d2397 commit 6bb33cc
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 34 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,39 @@ The following items are planned in scope of this PoC
- [ ] Handling posponed executions (await)
- [ ] Pekko backend PoC
- [ ] Full example

## Design

### What it does?

* workflows are built using `WIO` monad
* a workflow supports following operations:
* running side-effectful/non-deterministic comuptations
* receiving signals that can modify the workflow state
* querying the workflow state
* recovering workflow state without re-triggering of side-effecting operations
* `WIO` is just a pure value object describing the workflow
* to run it you need an interpreter with ability to persist events in a journal and read them

### How it works?

* on the first run
* it executes IOs on its path. Each IO has to produce an event that is persisted in the journal.
* event handlers are allowed to modify the workflow state
* workflow stops when signal is expected and moves forward once signal is received
* during recovery (e.g. after service restart)
* events are read from the journal and applied to the workflow
* all IOs and signals are skipped if the corresponding event is registered
* once events are exhausted the workflow continues to run as usual

Caveats:
* all the IOs need to be idempotent, we can't gaurantee exactly-once execution, only at-least-once
* workflow migrations (modify the workflow structure, e.g. order of operations) is a very complicated topic
and will be described separately in due time

Internals:
* [WIO.scala](src%2Fmain%2Fscala%2Fworkflow4s%2Fwio%2FWIO.scala)[`WIO`](src/main/scala/workflow4s/wio/WIO.scala) - the basic building block and algebra defining the supported operations
* [Interpreter.scala](src%2Fmain%2Fscala%2Fworkflow4s%2Fwio%2FInterpreter.scala) - the logic for handling particular operations
* [ActiveWorkflow.scala](src%2Fmain%2Fscala%2Fworkflow4s%2Fwio%2FActiveWorkflow.scala) - its the workflow state which also the interpretation result
* [SimpleActor.scala](src%2Fmain%2Fscala%2Fworkflow4s%2Fwio%2Fsimple%2FSimpleActor.scala) - a very simple implementation of a mutable actor, until we have a proper, pekko-based, example
* [WithdrawalWorkflowTest.scala](src%2Ftest%2Fscala%2Fworkflow4s%2Fexample%2FWithdrawalWorkflowTest.scala) - a test intended to showcase actor usage and the general behaviour
8 changes: 3 additions & 5 deletions src/main/scala/workflow4s/example/WithdrawalWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import workflow4s.example.WithdrawalSignal.CreateWithdrawal
import workflow4s.wio.{SignalDef, WIO}

object WithdrawalWorkflow {

val createWithdrawalSignal = SignalDef[CreateWithdrawal, Unit]()
val dataQuery = SignalDef[Unit, WithdrawalData]()

}

class WithdrawalWorkflow(service: WithdrawalService) {
Expand All @@ -20,10 +18,10 @@ class WithdrawalWorkflow(service: WithdrawalService) {
_ <- initSignal
_ <- calculateFees
} yield (),
hadnleDataQuery,
handleDataQuery,
)

private def initSignal =
private def initSignal: WIO[Nothing, Unit, WithdrawalData] =
WIO
.handleSignal[WithdrawalData](createWithdrawalSignal) { (_, signal) =>
IO(WithdrawalInitiated(signal.amount))
Expand All @@ -34,7 +32,7 @@ class WithdrawalWorkflow(service: WithdrawalService) {
.runIO[WithdrawalData](state => service.calculateFees(state.asInstanceOf[WithdrawalData.Initiated].amount).map(WithdrawalEvent.FeeSet))
.handleEvent { (state, event) => (state.asInstanceOf[WithdrawalData.Initiated].copy(fee = Some(event.fee)), ()) }

private def hadnleDataQuery =
private def handleDataQuery =
WIO.handleQuery[WithdrawalData](dataQuery) { (state, _) => state }

}
2 changes: 1 addition & 1 deletion src/main/scala/workflow4s/wio/ActiveWorkflow.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package workflow4s.wio

import workflow4s.wio.Interpreter.{EventResponse, ProceedResponse}
import workflow4s.wio.Interpreter.{EventResponse, ProceedResponse, SignalResponse, QueryResponse}

case class ActiveWorkflow[St, +Out](state: St, wio: WIO.Total[St], interpreter: Interpreter[St], value: Out) {

Expand Down
14 changes: 13 additions & 1 deletion src/main/scala/workflow4s/wio/Interpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package workflow4s.wio

import cats.effect.IO
import cats.implicits.catsSyntaxOptionId
import workflow4s.wio.Interpreter.{EventResponse, ProceedResponse}
import workflow4s.wio.Interpreter.{EventResponse, ProceedResponse, QueryResponse, SignalResponse}
import workflow4s.wio.WIO.{EventHandler, HandleSignal}

class Interpreter[St](journal: JournalPersistance) {
Expand Down Expand Up @@ -123,4 +123,16 @@ object Interpreter {
case class Noop[St]() extends ProceedResponse[St]
}

sealed trait SignalResponse[St, Resp]
object SignalResponse {
case class Ok[St, Resp](value: IO[(ActiveWorkflow[St, Any], Resp)]) extends SignalResponse[St, Resp]
case class UnexpectedSignal[St, Resp]() extends SignalResponse[St, Resp]
}

sealed trait QueryResponse[Resp]
object QueryResponse {
case class Ok[Resp](value: Resp) extends QueryResponse[Resp]
case class UnexpectedQuery[Resp]() extends QueryResponse[Resp]
}

}
12 changes: 0 additions & 12 deletions src/main/scala/workflow4s/wio/QueryResponse.scala

This file was deleted.

12 changes: 0 additions & 12 deletions src/main/scala/workflow4s/wio/SignalResponse.scala

This file was deleted.

4 changes: 2 additions & 2 deletions src/main/scala/workflow4s/wio/simple/SimpleActor.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package workflow4s.wio.simple

import cats.effect.unsafe.IORuntime
import workflow4s.wio.Interpreter.{EventResponse, ProceedResponse}
import workflow4s.wio.{ActiveWorkflow, QueryResponse, SignalDef, SignalResponse}
import workflow4s.wio.Interpreter.{EventResponse, ProceedResponse, QueryResponse, SignalResponse}
import workflow4s.wio.{ActiveWorkflow, SignalDef}

class SimpleActor[State]( /*private*/ var wf: ActiveWorkflow[State, Any])(implicit IORuntime: IORuntime) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import cats.effect.unsafe.implicits.global
import org.scalatest.freespec.AnyFreeSpec
import workflow4s.example.WithdrawalService.Fee
import workflow4s.example.WithdrawalSignal.CreateWithdrawal
import workflow4s.wio.Interpreter.QueryResponse
import workflow4s.wio.simple.SimpleActor.EventResponse
import workflow4s.wio.simple.{InMemoryJournal, SimpleActor}
import workflow4s.wio.{ActiveWorkflow, Interpreter, QueryResponse}
import workflow4s.wio.{ActiveWorkflow, Interpreter}

class WithdrawalWorkflowTest extends AnyFreeSpec {

Expand Down

0 comments on commit 6bb33cc

Please sign in to comment.