Skip to content

Add tests for ReplicatedEntityBehavior #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,111 @@ class ReplicatedEntityBehaviorSpec extends WordSpec with BeforeAndAfterAll with
private[this] val logEntryIndexGenerator = new AtomicInteger(1)
private[this] def nextLogEntryIndex() = LogEntryIndex(logEntryIndexGenerator.getAndIncrement())

"ReplicatedEntityBehavior" when {

"Inactive" should {

"stash ProcessCommand until recovery completed" in {
val bankAccount = testkit.spawn(BankAccountBehavior(entityContext))

// The entity will stash a command.
val depositReplyProbe = bankAccount.askWithTestProbe(BankAccountBehavior.Deposit(10, _))
// Assert: The entity doesn't reply to the command before it completes a recovery.
depositReplyProbe.expectNoMessage()

// Activate the entity.
val lastApplied = LogEntryIndex(6)
bankAccount.asEntity ! RaftProtocol.Activate(snapshotStoreProbe.ref.toClassic, recoveryIndex = lastApplied)

// Assert: The entity doesn't reply to the command before it completes the recovery.
depositReplyProbe.expectNoMessage()

// Recover the entity.
locally {
val metadata = SnapshotProtocol.EntitySnapshotMetadata(normalizedEntityId, lastApplied)
val state = SnapshotProtocol.EntityState(BankAccountBehavior.Account(100))
recoverWithState(bankAccount.asEntity, SnapshotProtocol.EntitySnapshot(metadata, state))
}

// The entity will unstash and handle the command after it completes the recovery.
locally {
val replicate = shardProbe.expectMessageType[RaftProtocol.Replicate]
replicate.replyTo ! RaftProtocol.ReplicationSucceeded(replicate.event, LogEntryIndex(7), replicate.instanceId)
depositReplyProbe.expectMessageType[BankAccountBehavior.DepositSuccess].balance should be(110)
}

testkit.stop(bankAccount)
}

"stash TakeSnapshot until recovery completed" in {
val bankAccount = testkit.spawn(BankAccountBehavior(entityContext))

val snapshotMetadata = SnapshotProtocol.EntitySnapshotMetadata(normalizedEntityId, LogEntryIndex(5))
val lastApplied = LogEntryIndex(6)
val newSnapshotMetadata = SnapshotProtocol.EntitySnapshotMetadata(normalizedEntityId, lastApplied)

// The entity will stash a TakeSnapshot.
val takeSnapshotReplyProbe = testkit.createTestProbe[RaftProtocol.Snapshot]()
bankAccount.asEntity ! RaftProtocol.TakeSnapshot(newSnapshotMetadata, takeSnapshotReplyProbe.ref.toClassic)
// Assert: The entity doesn't reply to the TakeSnapshot before it completes a recovery.
takeSnapshotReplyProbe.expectNoMessage()

// Activate the entity.
bankAccount.asEntity ! RaftProtocol.Activate(snapshotStoreProbe.ref.toClassic, recoveryIndex = lastApplied)

// Assert: The entity doesn't reply to the TakeSnapshot before it completes the recovery.
takeSnapshotReplyProbe.expectNoMessage()

// Recover the entity.
val state = SnapshotProtocol.EntityState(BankAccountBehavior.Account(100))
recoverWithState(bankAccount.asEntity, SnapshotProtocol.EntitySnapshot(snapshotMetadata, state))

// The entity will unstash and handle the TakeSnapshot after it completes the recovery.
takeSnapshotReplyProbe.expectMessage(RaftProtocol.Snapshot(newSnapshotMetadata, state))

testkit.stop(bankAccount)
}

"stash Replica until recovery completed" in {
val bankAccount = testkit.spawn(BankAccountBehavior(entityContext))

// The entity has a balance of 100. It will deposit 10 to the balance.
val bankAccountState = BankAccountBehavior.Account(100)
val bankAccountNewEvent = BankAccountBehavior.Deposited(10)

val lastApplied = LogEntryIndex(5)

// The entity will stash a Replica.
bankAccount.asEntity ! RaftProtocol.Replica(
LogEntry(LogEntryIndex(6), EntityEvent(Option(normalizedEntityId), bankAccountNewEvent), Term(2)),
)

// Activate the entity.
bankAccount.asEntity ! RaftProtocol.Activate(snapshotStoreProbe.ref.toClassic, recoveryIndex = lastApplied)

// Recover the entity.
locally {
val metadata = SnapshotProtocol.EntitySnapshotMetadata(normalizedEntityId, lastApplied)
val state = SnapshotProtocol.EntityState(bankAccountState)
recoverWithState(bankAccount.asEntity, SnapshotProtocol.EntitySnapshot(metadata, state))
}

// The entity will unstash and handle the Replica after it completes the recovery.
// The entity should have a new balance of 110 (= 100 + 10).
locally {
val getBalanceReplyProbe = bankAccount.askWithTestProbe(BankAccountBehavior.GetBalance)
val replicate = shardProbe.expectMessageType[RaftProtocol.Replicate]
replicate.replyTo ! RaftProtocol.ReplicationSucceeded(replicate.event, LogEntryIndex(7), replicate.instanceId)
getBalanceReplyProbe.expectMessage(BankAccountBehavior.AccountBalance(110))
}

testkit.stop(bankAccount)
}

}

}

"ReplicatedEntityBehavior" should {

"process command that updates the entity state" in {
Expand Down Expand Up @@ -249,14 +354,7 @@ class ReplicatedEntityBehaviorSpec extends WordSpec with BeforeAndAfterAll with
import SnapshotProtocol._
val metadata = EntitySnapshotMetadata(normalizedEntityId, nextLogEntryIndex())
val state = EntityState(BankAccountBehavior.Account(100))
inside(snapshotStoreProbe.receiveMessage()) {
case SnapshotProtocol.FetchSnapshot(_, replyTo) =>
replyTo ! SnapshotProtocol.SnapshotFound(EntitySnapshot(metadata, state))
}
inside(shardProbe.receiveMessage()) {
case FetchEntityEvents(_, _, _, replyTo) =>
replyTo ! FetchEntityEventsResponse(Seq())
}
recoverWithState(bankAccount.asEntity, EntitySnapshot(metadata, state))

// process a command with ensuring consistency
val replyProbe = bankAccount.askWithTestProbe(BankAccountBehavior.GetBalance)
Expand Down Expand Up @@ -364,14 +462,7 @@ class ReplicatedEntityBehaviorSpec extends WordSpec with BeforeAndAfterAll with
import SnapshotProtocol._
val metadata = EntitySnapshotMetadata(normalizedEntityId, nextLogEntryIndex())
val state = EntityState(BankAccountBehavior.Account(100))
inside(snapshotStoreProbe.receiveMessage()) {
case SnapshotProtocol.FetchSnapshot(_, replyTo) =>
replyTo ! SnapshotProtocol.SnapshotFound(EntitySnapshot(metadata, state))
}
inside(shardProbe.receiveMessage()) {
case FetchEntityEvents(_, _, _, replyTo) =>
replyTo ! FetchEntityEventsResponse(Seq())
}
recoverWithState(bankAccount.asEntity, EntitySnapshot(metadata, state))

// unstash the command when recovery completed
val replicate = shardProbe.expectMessageType[RaftProtocol.Replicate]
Expand All @@ -383,21 +474,68 @@ class ReplicatedEntityBehaviorSpec extends WordSpec with BeforeAndAfterAll with
testkit.stop(bankAccount)
}

"stash TakeSnapshot until recovery completed" in {
val bankAccount = spawnEntity(BankAccountBehavior(entityContext))

val snapshotMetadata = SnapshotProtocol.EntitySnapshotMetadata(normalizedEntityId, LogEntryIndex(5))
val newSnapshotMetadata = SnapshotProtocol.EntitySnapshotMetadata(normalizedEntityId, LogEntryIndex(6))

// The entity will stash a TakeSnapshot.
val takeSnapshotReplyProbe = testkit.createTestProbe[RaftProtocol.Snapshot]()
bankAccount.asEntity ! RaftProtocol.TakeSnapshot(newSnapshotMetadata, takeSnapshotReplyProbe.ref.toClassic)
// Assert: The entity doesn't reply to the TakeSnapshot before it completes a recovery.
takeSnapshotReplyProbe.expectNoMessage()

// Recover the entity.
val state = SnapshotProtocol.EntityState(BankAccountBehavior.Account(100))
recoverWithState(bankAccount.asEntity, SnapshotProtocol.EntitySnapshot(snapshotMetadata, state))

// The entity will unstash and handle the TakeSnapshot after it completes the recovery.
takeSnapshotReplyProbe.expectMessage(RaftProtocol.Snapshot(newSnapshotMetadata, state))

testkit.stop(bankAccount)
}

"stash Replica until recovery completed" in {
val bankAccount = spawnEntity(BankAccountBehavior(entityContext))

val snapshotMetadata = SnapshotProtocol.EntitySnapshotMetadata(normalizedEntityId, LogEntryIndex(5))

// The entity has a balance of 100. It will deposit 10 to the balance.
val bankAccountState = BankAccountBehavior.Account(100)
val bankAccountNewEvent = BankAccountBehavior.Deposited(10)

// The entity will stash a Replica.
bankAccount.asEntity ! RaftProtocol.Replica(
LogEntry(LogEntryIndex(6), EntityEvent(Option(normalizedEntityId), bankAccountNewEvent), Term(2)),
)

// Recover the entity.
locally {
val state = SnapshotProtocol.EntityState(bankAccountState)
recoverWithState(bankAccount.asEntity, SnapshotProtocol.EntitySnapshot(snapshotMetadata, state))
}

// The entity will unstash and handle the Replica after it completes the recovery.
// The entity should have a new balance of 110 (= 100 + 10).
locally {
val getBalanceReplyProbe = bankAccount.askWithTestProbe(BankAccountBehavior.GetBalance)
val replicate = shardProbe.expectMessageType[RaftProtocol.Replicate]
replicate.replyTo ! RaftProtocol.ReplicationSucceeded(replicate.event, LogEntryIndex(7), replicate.instanceId)
getBalanceReplyProbe.expectMessage(BankAccountBehavior.AccountBalance(110))
}

testkit.stop(bankAccount)
}

"stash command until replication completed" in {
val bankAccount = spawnEntity(BankAccountBehavior(entityContext))

// recover the entity
import SnapshotProtocol._
val metadata = EntitySnapshotMetadata(normalizedEntityId, nextLogEntryIndex())
val state = EntityState(BankAccountBehavior.Account(100))
inside(snapshotStoreProbe.receiveMessage()) {
case SnapshotProtocol.FetchSnapshot(_, replyTo) =>
replyTo ! SnapshotProtocol.SnapshotFound(EntitySnapshot(metadata, state))
}
inside(shardProbe.receiveMessage()) {
case FetchEntityEvents(_, _, _, replyTo) =>
replyTo ! FetchEntityEventsResponse(Seq())
}
recoverWithState(bankAccount.asEntity, EntitySnapshot(metadata, state))

val getBalanceProbe = bankAccount.askWithTestProbe(BankAccountBehavior.GetBalance)

Expand Down Expand Up @@ -428,14 +566,7 @@ class ReplicatedEntityBehaviorSpec extends WordSpec with BeforeAndAfterAll with
import SnapshotProtocol._
val metadata = EntitySnapshotMetadata(normalizedEntityId, nextLogEntryIndex())
val state = EntityState(BankAccountBehavior.Account(100))
inside(snapshotStoreProbe.receiveMessage()) {
case SnapshotProtocol.FetchSnapshot(_, replyTo) =>
replyTo ! SnapshotProtocol.SnapshotFound(EntitySnapshot(metadata, state))
}
inside(shardProbe.receiveMessage()) {
case FetchEntityEvents(_, _, _, replyTo) =>
replyTo ! FetchEntityEventsResponse(Seq())
}
recoverWithState(bankAccount.asEntity, EntitySnapshot(metadata, state))
}

val clientProbe = bankAccount.askWithTestProbe(BankAccountBehavior.GetBalance)
Expand Down Expand Up @@ -691,4 +822,20 @@ class ReplicatedEntityBehaviorSpec extends WordSpec with BeforeAndAfterAll with
replyTo ! FetchEntityEventsResponse(Seq())
}
}

private def recoverWithState(
entity: ActorRef[RaftProtocol.EntityCommand],
entitySnapshot: SnapshotProtocol.EntitySnapshot,
): Unit = {
val entityId = entitySnapshot.metadata.entityId
inside(snapshotStoreProbe.receiveMessage()) {
case SnapshotProtocol.FetchSnapshot(`entityId`, replyTo) =>
replyTo ! SnapshotProtocol.SnapshotFound(entitySnapshot)
}
inside(shardProbe.receiveMessage()) {
case FetchEntityEvents(_, _, _, replyTo) =>
replyTo ! FetchEntityEventsResponse(Seq())
}
}

}