Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class DefaultTypedCommandGateway[Cmd <: DomainCommand](
}
}

val gateway = system.systemActorOf(CommandGatewayGuardian(), "typedcommandgateway-" + aggregateRootCreator.getClass.getSimpleName)
val gateway =
system.systemActorOf(CommandGatewayGuardian(), "typedcommandgateway-" + aggregateRootCreator.getClass.getSimpleName)

override def ask[Res](aggregateRootId: String, replyTo: ActorRef[Res] => Cmd)(
implicit validator: ValidateableCommand[Cmd]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,58 @@

package io.cafienne.bounded.aggregate

import akka.actor.typed.Behavior
import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, ReplyEffect}
import com.typesafe.scalalogging.Logger
import io.cafienne.bounded.aggregate.typed.TypedAggregateRootManager

import scala.concurrent.duration._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.persistence.RecoveryCompleted
import io.cafienne.bounded.SampleProtocol.{CommandMetaData, MetaData}
import io.cafienne.bounded.aggregate.TypedSimpleAggregate.SimpleAggregateCommand

//This another aggregate is for testing the use of command gateways with multiple aggregates.
object TypedAnotherAggregate {
val aggregateRootTag = "ar-another"
import TypedSimpleAggregate._

final case class AnotherAddCommand(
aggregateRootId: String,
metaData: CommandMetaData,
item: String,
replyTo: ActorRef[Response]
) extends SimpleAggregateCommand

final case class AnotherAdded(id: String, metaData: MetaData, item: String) extends SimpleAggregateEvent

final case class AnotherAggregateState(items: List[String] = List.empty[String]) {
def update(evt: SimpleAggregateEvent): AnotherAggregateState = {
evt match {
case evt: Created => this
case evt: ItemAdded => this.copy(items = items.::(evt.item))
case evt: StoppedAfter => this
case evt: AnotherAdded => this.copy(items = items.::(evt.item))
}
}
}

var replayed = false

final val logger = Logger(TypedSimpleAggregate.getClass.getSimpleName)


// Command handler logic
def commandHandler(
timers: TimerScheduler[SimpleAggregateCommand]
): (SimpleAggregateState, SimpleAggregateCommand) => ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = {
): (AnotherAggregateState, SimpleAggregateCommand) => ReplyEffect[SimpleAggregateEvent, AnotherAggregateState] = {
(state, command) =>
logger.debug("Received command: " + command)
command match {
case cmd: Create => createAggregate(cmd)
case cmd: AddItem => addItem(cmd)
case cmd: Create => createAggregate(cmd)
case cmd: AddItem => addItem(cmd)
case cmd: AnotherAddCommand => addAnother(cmd)
case cmd: Stop =>
logger.debug("Stopping Aggregate {}", cmd.aggregateRootId)
Effect.stop().thenReply(cmd.replyTo)(_ => OK)
Expand All @@ -48,50 +71,55 @@ object TypedAnotherAggregate {
}
}

private def createAggregate(cmd: Create): ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = {
private def createAggregate(cmd: Create): ReplyEffect[SimpleAggregateEvent, AnotherAggregateState] = {
logger.debug(s"Create Aggregate (replayed: $replayed)" + cmd)
Effect.persist(Created(cmd.aggregateRootId, TestMetaData.fromCommand(cmd.metaData))).thenReply(cmd.replyTo)(_ ⇒ OK)
}

private def addItem(cmd: AddItem): ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = {
private def addItem(cmd: AddItem): ReplyEffect[SimpleAggregateEvent, AnotherAggregateState] = {
logger.debug("AddItem " + cmd)
Effect
.persist(ItemAdded(cmd.aggregateRootId, TestMetaData.fromCommand(cmd.metaData), cmd.item))
.thenReply(cmd.replyTo)(_ ⇒ OK)
}

private def addAnother(cmd: AnotherAddCommand): ReplyEffect[SimpleAggregateEvent, AnotherAggregateState] = {
logger.debug("AddItem " + cmd)
Effect
.persist(AnotherAdded(cmd.aggregateRootId, TestMetaData.fromCommand(cmd.metaData), cmd.item))
.thenReply(cmd.replyTo)(_ ⇒ OK)
}

// event handler to keep internal aggregate state
val eventHandler: (SimpleAggregateState, SimpleAggregateEvent) => SimpleAggregateState = { (state, event) =>
val eventHandler: (AnotherAggregateState, SimpleAggregateEvent) => AnotherAggregateState = { (state, event) =>
logger.debug("updating state with {}", event)
state.update(event)
}

}

import TypedSimpleAggregate._
class AnotherAggregateManager() extends TypedAggregateRootManager[SimpleAggregateCommand] {
import TypedSimpleAggregate._
class AnotherAggregateManager() extends TypedAggregateRootManager[TypedSimpleAggregate.SimpleAggregateCommand] {
final val logger = Logger(this.getClass.getSimpleName)

override def behavior(id: String): Behavior[SimpleAggregateCommand] = {
logger.debug("Create aggregate behavior for {}", id)
override def behavior(id: String): Behavior[TypedSimpleAggregate.SimpleAggregateCommand] = {
logger.debug("Create Another aggregate behavior for {}", id)
Behaviors.withTimers { timers ⇒
EventSourcedBehavior
.withEnforcedReplies(
PersistenceId(aggregateRootTag, id),
SimpleAggregateState(List.empty[String]),
commandHandler(timers),
eventHandler
PersistenceId(TypedAnotherAggregate.aggregateRootTag, id),
TypedAnotherAggregate.AnotherAggregateState(List.empty[String]),
TypedAnotherAggregate.commandHandler(timers),
TypedAnotherAggregate.eventHandler
)
.receiveSignal {
case (state, b: RecoveryCompleted) => {
logger.debug("Received RecoveryCompleted for actor with state {}", state)
replayed = true
TypedAnotherAggregate.replayed = true
}
}
}
}

override def entityTypeKey: EntityTypeKey[SimpleAggregateCommand] =
EntityTypeKey[SimpleAggregateCommand](aggregateRootTag)
EntityTypeKey[SimpleAggregateCommand](TypedAnotherAggregate.aggregateRootTag)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AsyncFlatSpecLike
import com.typesafe.config.ConfigFactory
import io.cafienne.bounded.aggregate.TypedAnotherAggregate.AnotherAddCommand

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
Expand Down Expand Up @@ -151,13 +152,6 @@ class TypedCommandGatewaySpec
}
}

"Command Gateway" should "be able to be created multiple times to route to other aggregates" in {
val creator = new AnotherAggregateManager()
val anotherTypedCommandGateway = new DefaultTypedCommandGateway[SimpleAggregateCommand](system, creator, 6.seconds)

assert(anotherTypedCommandGateway != null)
}

protected override def afterAll(): Unit = {
Await.ready(typedCommandGateway.shutdown(), 5.seconds).map(_ => (): Unit)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2016-2021 Cafienne B.V. <https://www.cafienne.io/bounded>
*/

package io.cafienne.bounded.aggregate

import akka.actor.testkit.typed.scaladsl.{LogCapturing, LoggingTestKit, ManualTime, ScalaTestWithActorTestKit}
import akka.actor.typed.ActorSystem
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import io.cafienne.bounded.aggregate.TypedAnotherAggregate.AnotherAddCommand
import io.cafienne.bounded.aggregate.typed.DefaultTypedCommandGateway
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.flatspec.AsyncFlatSpecLike

import java.time.OffsetDateTime
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

//ManualTime.config.withFallback(
class TypedMultiCommandGatewaySpec extends ScalaTestWithActorTestKit(ConfigFactory.parseString(s"""
akka.persistence.publish-plugin-commands = on
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.journal.inmem.test-serialization = on
akka.actor.warn-about-java-serializer-usage = off
# snapshot store plugin is NOT defined, things should still work
akka.persistence.snapshot-store.local.dir = "target/snapshots-${classOf[TypedMultiCommandGatewaySpec].getName}/"
#PLEASE NOTE THAT CoordinatedShutdown needs to be disabled as below in order to run the test properly
#SEE https://doc.akka.io/docs/akka/current/coordinated-shutdown.html at the end of the page
akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.cluster.run-coordinated-shutdown-when-down = off
""")) with AsyncFlatSpecLike with ScalaFutures with BeforeAndAfterAll {

import TypedSimpleAggregate._

implicit val commandValidator = new ValidateableCommand[SimpleAggregateCommand] {
override def validate(cmd: SimpleAggregateCommand): Future[SimpleAggregateCommand] =
Future.successful(cmd)
}

implicit val actorSystem: ActorSystem[_] = system
implicit val ec = system.executionContext
implicit val scheduler = system.scheduler

// val manualTime: ManualTime = ManualTime()

behavior of "Multi Typed Command Gateway"

implicit val gatewayTimeout = Timeout(10.seconds)

val commandMetaData = AggregateCommandMetaData(OffsetDateTime.now(), None)
val creator = new SimpleAggregateManager()
val typedCommandGateway = new DefaultTypedCommandGateway[SimpleAggregateCommand](system, creator, 6.seconds)

val anotherCreator = new AnotherAggregateManager()
val anotherTypedCommandGateway =
new DefaultTypedCommandGateway[SimpleAggregateCommand](system, anotherCreator, 6.seconds)

"Command Gateway" should "be able to be created multiple times to route to other aggregates" in {
assert(anotherTypedCommandGateway != null)
}

"Command Gateway" should "be able to be send commands to multiple aggregates" in {
val probe = testKit.createTestProbe[Response]()
val aggregateId = "ar1"

assert(anotherTypedCommandGateway != null)
val otherAggregateId = "other1"
whenReady(
anotherTypedCommandGateway.tell(AnotherAddCommand(otherAggregateId, commandMetaData, "other item 3", probe.ref))
) { answer =>
answer shouldEqual (())
val probeAnswer2 = probe.expectMessage(OK)
assert(probeAnswer2.equals(OK))
}

whenReady(typedCommandGateway.tell(AddItem(aggregateId, commandMetaData, "item 1", probe.ref))) { answer =>
answer shouldEqual (())
val probeAnswer2 = probe.expectMessage(OK)
assert(probeAnswer2.equals(OK))
}

}

protected override def afterAll(): Unit = {
Await.ready(typedCommandGateway.shutdown(), 5.seconds).map(_ => (): Unit)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object TypedSimpleAggregate {
final case class AggregateCommandMetaData(timestamp: OffsetDateTime, userContext: Option[UserContext])
extends CommandMetaData

sealed trait SimpleAggregateCommand extends DomainCommand
trait SimpleAggregateCommand extends DomainCommand

final case class Create(aggregateRootId: String, metaData: CommandMetaData, replyTo: ActorRef[Response])
extends SimpleAggregateCommand
Expand All @@ -48,8 +48,7 @@ object TypedSimpleAggregate {
final case class Stop(aggregateRootId: String, metaData: CommandMetaData, replyTo: ActorRef[Response])
extends SimpleAggregateCommand

final case class InternalStop(aggregateRootId: String, metaData: CommandMetaData)
extends SimpleAggregateCommand
final case class InternalStop(aggregateRootId: String, metaData: CommandMetaData) extends SimpleAggregateCommand

final case class StopAfter(
aggregateRootId: String,
Expand All @@ -65,7 +64,7 @@ object TypedSimpleAggregate {
extends SimpleDirectAggregateQuery

// Event Type
sealed trait SimpleAggregateEvent extends DomainEvent
trait SimpleAggregateEvent extends DomainEvent
final case class Created(id: String, metaData: MetaData) extends SimpleAggregateEvent
final case class ItemAdded(id: String, metaData: MetaData, item: String) extends SimpleAggregateEvent
final case class StoppedAfter(id: String, metaData: MetaData, waitInSecs: Integer) extends SimpleAggregateEvent
Expand Down Expand Up @@ -104,6 +103,9 @@ object TypedSimpleAggregate {
case cmd: InternalStop =>
logger.debug("InternalStop for aggregate {}", cmd.aggregateRootId)
Effect.stop().thenNoReply()
case other =>
logger.error("Received {} that cannot be handled by {}", other, this.getClass.getSimpleName)
throw new RuntimeException(s"Command $other cannot be handled by SimpleAggregate")
}
}

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

version in ThisBuild := "0.3.2"
version in ThisBuild := "0.3.3-SNAPSHOT"