Skip to content

Commit

Permalink
Model execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Krever committed Mar 8, 2024
1 parent a5c3b9a commit 6494f7b
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object EventEvaluator {
}
def onNoop(wio: WIO.Noop): DispatchResult = None
override def onNamed(wio: WIO.Named[Err, Out, StIn, StOut]): DispatchResult = recurse(wio.base, state)
override def onPure(wio: WIO.Pure[Err, Out, StIn, StOut]): DispatchResult = None
override def onPure(wio: WIO.Pure[Err, Out, StIn, StOut]): DispatchResult = Some(NewValue(wio.value(state)))

override def onHandleError[ErrIn <: Err](wio: WIO.HandleError[Err, Out, StIn, StOut, ErrIn]): DispatchResult = {
recurse(wio.base, state).map(applyHandleError(wio, _))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package workflow4s.example

import workflow4s.example.WithdrawalService.Fee
import workflow4s.example.WithdrawalService.{Fee, Iban}
import workflow4s.example.checks.ChecksState

sealed trait WithdrawalData

object WithdrawalData {
case class Empty(txId: String) extends WithdrawalData {
def initiated(amount: BigDecimal) = Initiated(txId, amount)
case class Empty(txId: String) extends WithdrawalData {
def initiated(amount: BigDecimal, recipient: Iban) = Initiated(txId, amount, recipient)
}
case class Initiated(txId: String, amount: BigDecimal) extends WithdrawalData {
def validated(fee: Fee) = Validated(txId, amount, fee)
case class Initiated(txId: String, amount: BigDecimal, recipient: Iban) extends WithdrawalData {
def validated(fee: Fee) = Validated(txId, amount, recipient, fee)
}
case class Validated(txId: String, amount: BigDecimal, fee: Fee) extends WithdrawalData {
def checked(checksState: ChecksState) = Checked(txId, amount, fee, checksState)
case class Validated(txId: String, amount: BigDecimal, recipient: Iban, fee: Fee) extends WithdrawalData {
def checked(checksState: ChecksState) = Checked(txId, amount, recipient, fee, checksState)
}
case class Checked(txId: String, amount: BigDecimal, recipient: Iban, fee: Fee, checkResults: ChecksState) extends WithdrawalData {
def netAmount = amount - fee.value
def executed(externalTxId: String) = Executed(txId, amount, recipient, fee, checkResults, externalTxId)
}
case class Checked(txId: String, amount: BigDecimal, fee: Fee, checkResults: ChecksState) extends WithdrawalData

case class Executed(txId: String, amount: BigDecimal, fee: Fee, checkResults: ChecksState, externalTransactionId: String) extends WithdrawalData
case class Executed(txId: String, amount: BigDecimal, recipient: Iban, fee: Fee, checkResults: ChecksState, externalTransactionId: String)
extends WithdrawalData

case class Completed() extends WithdrawalData
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package workflow4s.example

import workflow4s.example.WithdrawalService.Fee
import workflow4s.example.WithdrawalService.{ExecutionResponse, Fee, Iban}
import workflow4s.wio.JournalWrite

sealed trait WithdrawalEvent
object WithdrawalEvent {
case class WithdrawalInitiated(amount: BigDecimal)
case class WithdrawalInitiated(amount: BigDecimal, recipient: Iban)
implicit val WithdrawalInitiatedJournalWrite: JournalWrite[WithdrawalInitiated] = null

case class FeeSet(fee: Fee)
Expand All @@ -17,4 +17,10 @@ object WithdrawalEvent {
case class NotEnoughFunds() extends MoneyLocked
}
implicit val MoneyLockedWrite: JournalWrite[MoneyLocked] = null

case class ExecutionInitiated(response: ExecutionResponse)
implicit val executionInitiatedWrite: JournalWrite[ExecutionInitiated] = null

case class ExecutionCompleted(status: WithdrawalSignal.ExecutionCompleted)
implicit val ExecutionCompletedWrite: JournalWrite[ExecutionCompleted] = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ sealed trait WithdrawalRejection

object WithdrawalRejection {

case class NotEnoughFunds() extends WithdrawalRejection
case class RejectedInChecks(txId: String) extends WithdrawalRejection
case class RejectedByExecutionEngine(txId: String) extends WithdrawalRejection
case class NotEnoughFunds() extends WithdrawalRejection
case class RejectedInChecks(txId: String) extends WithdrawalRejection
case class RejectedByExecutionEngine(txId: String, error: String) extends WithdrawalRejection

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package workflow4s.example

import cats.effect.IO
import workflow4s.example.WithdrawalService.{Fee, NotEnoughFunds}
import workflow4s.example.WithdrawalService.{ExecutionResponse, Fee, Iban, NotEnoughFunds}

trait WithdrawalService {
def calculateFees(amount: BigDecimal): IO[Fee]

def putMoneyOnHold(amount: BigDecimal): IO[Either[NotEnoughFunds, Unit]]

def initiateExecution(amount: BigDecimal, recepient: Iban): IO[ExecutionResponse]
}

object WithdrawalService {

case class NotEnoughFunds()

case class Fee(value: BigDecimal)

case class Iban(value: String)

sealed trait ExecutionResponse
object ExecutionResponse {
case class Accepted(externalId: String) extends ExecutionResponse
case class Rejected(error: String) extends ExecutionResponse
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package workflow4s.example

import workflow4s.example.WithdrawalService.Iban

object WithdrawalSignal {

case class CreateWithdrawal(amount: BigDecimal)
case class CreateWithdrawal(amount: BigDecimal, recipient: Iban)

sealed trait ExecutionCompleted
object ExecutionCompleted {
case object Succeeded extends ExecutionCompleted
case object Failed extends ExecutionCompleted
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package workflow4s.example
import cats.effect.IO
import cats.implicits.catsSyntaxEitherId
import workflow4s.example.WithdrawalEvent.{MoneyLocked, WithdrawalInitiated}
import workflow4s.example.WithdrawalSignal.CreateWithdrawal
import workflow4s.example.WithdrawalWorkflow.{createWithdrawalSignal, dataQuery}
import workflow4s.example.WithdrawalService.ExecutionResponse
import workflow4s.example.WithdrawalSignal.{CreateWithdrawal, ExecutionCompleted}
import workflow4s.example.WithdrawalWorkflow.{createWithdrawalSignal, dataQuery, executionCompletedSignal}
import workflow4s.example.checks.{ChecksEngine, ChecksInput, ChecksState, Decision}
import workflow4s.wio.{SignalDef, WIO}

object WithdrawalWorkflow {
val createWithdrawalSignal = SignalDef[CreateWithdrawal, Unit]()
val dataQuery = SignalDef[Unit, WithdrawalData]()
val createWithdrawalSignal = SignalDef[CreateWithdrawal, Unit]()
val dataQuery = SignalDef[Unit, WithdrawalData]()
val executionCompletedSignal = SignalDef[ExecutionCompleted, Unit]()
}

class WithdrawalWorkflow(service: WithdrawalService, checksEngine: ChecksEngine) {
Expand All @@ -32,20 +34,23 @@ class WithdrawalWorkflow(service: WithdrawalService, checksEngine: ChecksEngine)

val workflowDeclarative: WIO[WithdrawalRejection, Unit, WithdrawalData.Empty, Nothing] =
handleDataQuery(
(receiveWithdrawal >>>
calculateFees >>>
putMoneyOnHold >>>
runChecks >>>
execute >>>
releaseFunds).handleError(handleRejection),
) >>> handleDataQuery(WIO.Noop())
(
receiveWithdrawal >>>
calculateFees >>>
putMoneyOnHold >>>
runChecks >>>
execute >>>
releaseFunds
).handleError(handleRejection) >>> WIO.Noop(),
)

private def receiveWithdrawal: WIO[Nothing, Unit, WithdrawalData.Empty, WithdrawalData.Initiated] =
WIO
.handleSignal[WithdrawalData.Empty](createWithdrawalSignal) { (_, signal) =>
IO(WithdrawalInitiated(signal.amount))
// TODO validate amount to be positive (show rejected signal)
IO(WithdrawalInitiated(signal.amount, signal.recipient))
}
.handleEvent { (st, event) => (st.initiated(event.amount), ()) }
.handleEvent { (st, event) => (st.initiated(event.amount, event.recipient), ()) }
.produceResponse((_, _) => ())
.autoNamed()

Expand Down Expand Up @@ -89,8 +94,36 @@ class WithdrawalWorkflow(service: WithdrawalService, checksEngine: ChecksEngine)
}
} yield ()

// TODO can fail with provider fatal failure, need retries
private def execute: WIO[WithdrawalRejection.RejectedByExecutionEngine, Unit, WithdrawalData.Checked, WithdrawalData.Executed] = WIO.Noop()
// TODO can fail with provider fatal failure, need retries, needs cancellation
private def execute: WIO[WithdrawalRejection.RejectedByExecutionEngine, Unit, WithdrawalData.Checked, WithdrawalData.Executed] =
initiateExecution >>> awaitExecutionCompletion

private def initiateExecution: WIO[WithdrawalRejection.RejectedByExecutionEngine, Unit, WithdrawalData.Checked, WithdrawalData.Executed] =
WIO
.runIO[WithdrawalData.Checked](s =>
service
.initiateExecution(s.netAmount, s.recipient)
.map(WithdrawalEvent.ExecutionInitiated),
)
.handleEventWithError((s, event) =>
event.response match {
case ExecutionResponse.Accepted(externalId) => Right(s.executed(externalId) -> ())
case ExecutionResponse.Rejected(error) => Left(WithdrawalRejection.RejectedByExecutionEngine(s.txId, error))
},
)
.autoNamed()

private def awaitExecutionCompletion: WIO[WithdrawalRejection.RejectedByExecutionEngine, Unit, WithdrawalData.Executed, WithdrawalData.Executed] =
WIO
.handleSignal[WithdrawalData.Executed](executionCompletedSignal)((state, sig) => IO(WithdrawalEvent.ExecutionCompleted(sig)))
.handleEventWithError((s, e: WithdrawalEvent.ExecutionCompleted) =>
e.status match {
case ExecutionCompleted.Succeeded => Right(s, ())
case ExecutionCompleted.Failed => Left(WithdrawalRejection.RejectedByExecutionEngine(s.txId, "Execution failed"))
},
)
.produceResponse((_, _) => ())
.autoNamed()

private def releaseFunds: WIO[Nothing, Unit, WithdrawalData.Executed, WithdrawalData.Completed] = WIO.Noop()

Expand All @@ -99,9 +132,9 @@ class WithdrawalWorkflow(service: WithdrawalService, checksEngine: ChecksEngine)

private def handleRejection(r: WithdrawalRejection): WIO[Nothing, Unit, Any, WithdrawalData.Completed] =
r match {
case WithdrawalRejection.NotEnoughFunds() => WIO.setState(WithdrawalData.Completed())
case WithdrawalRejection.RejectedInChecks(txId) => cancelFunds(txId)
case WithdrawalRejection.RejectedByExecutionEngine(txId) => cancelFunds(txId)
case WithdrawalRejection.NotEnoughFunds() => WIO.setState(WithdrawalData.Completed())
case WithdrawalRejection.RejectedInChecks(txId) => cancelFunds(txId)
case WithdrawalRejection.RejectedByExecutionEngine(txId, error) => cancelFunds(txId)
}

private def cancelFunds(txId: String): WIO[Nothing, Unit, Any, WithdrawalData.Completed] = WIO.Noop()
Expand Down
Loading

0 comments on commit 6494f7b

Please sign in to comment.