Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BE2] Added RumorState handler #1890

Merged
merged 8 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ package ch.epfl.pop.decentralized

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.event.slf4j.Logger
import akka.pattern.AskableActorRef
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.{ConnectionMediator, GossipManager}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.{GreetServer, Rumor}
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
import ch.epfl.pop.model.objects.{Base64Data, Channel, PublicKey, RumorData}
import ch.epfl.pop.model.objects.{Channel, PublicKey, RumorData}
import ch.epfl.pop.pubsub.AskPatternConstants
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError}
import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumor, DbActorReadRumorData}
import ch.epfl.pop.storage.DbActor.{DbActorAck, DbActorReadRumorData}

import scala.concurrent.Await
import scala.util.{Failure, Random, Success}
import scala.util.Random

final case class GossipManager(
dbActorRef: AskableActorRef,
Expand Down Expand Up @@ -174,12 +172,12 @@ object GossipManager extends AskPatternConstants {
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"GossipManager received an unexpected message:$graphMessage while monitoring responses", None))
}

def startGossip(gossipManager: AskableActorRef, actorRef: ActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
def startGossip(gossipManager: AskableActorRef, clientRef: ActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.getParamsMessage match
case Some(message) =>
// Start gossiping only if message comes from a real actor (and not from processing pipeline)
if (actorRef != Actor.noSender)
if (clientRef != Actor.noSender)
gossipManager ? StartGossip(Map(jsonRpcRequest.getParamsChannel -> List(message)))
case None => /* Do nothing */
Right(jsonRpcRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ class ResultObject(val result: Option[ResultType]) {
case (Some(a), Some(b)) => a == b
case (None, None) => true
case _ => false
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ final case class RumorState(state: Map[PublicKey, Int]) extends Params {
def toJsonString: String = {
this.toJson.toString
}

def isMissingRumorsFrom(otherRumorState: RumorState): Map[PublicKey, List[Int]] = {
this.state.flatMap { (publicKey, rumorId) =>
otherRumorState.state.get(publicKey) match
case Some(otherRumorId) =>
if (otherRumorId > rumorId)
Some(publicKey -> List.range(rumorId + 1, otherRumorId + 1))
else None
case None => None
} ++ {
otherRumorState.state.filter((pk, _) => !this.state.contains(pk)).map((pk, rumorId) => pk -> List.range(0, rumorId + 1))
}
}
}

object RumorState extends Parsable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ object PublishSubscribe {
val portGetMessagesById = 6
val portGreetServer = 7
val portRumor = 8
val totalPorts = 9
val portRumorState = 9
val totalPorts = 10

/* building blocks */
val input = builder.add(Flow[GraphMessage].collect { case msg: GraphMessage => msg })
Expand All @@ -139,6 +140,7 @@ object PublishSubscribe {
case MethodType.get_messages_by_id => portGetMessagesById
case MethodType.greet_server => portGreetServer
case MethodType.rumor => portRumor
case MethodType.rumor_state => portRumorState
case _ => portPipelineError
}

Expand All @@ -156,6 +158,7 @@ object PublishSubscribe {
val rumorPartition = builder.add(ParamsHandler.rumorHandler(dbActorRef, messageRegistry))
val gossipManagerPartition = builder.add(GossipManager.gossipHandler(gossipManager))
val gossipStartPartition = builder.add(GossipManager.startGossip(gossipManager, clientActorRef))
val rumorStatePartition = builder.add(ParamsHandler.rumorStateHandler(dbActorRef))

val merger = builder.add(Merge[GraphMessage](totalPorts))

Expand All @@ -171,6 +174,7 @@ object PublishSubscribe {
methodPartitioner.out(portGetMessagesById) ~> getMessagesByIdPartition ~> merger
methodPartitioner.out(portGreetServer) ~> greetServerPartition ~> merger
methodPartitioner.out(portRumor) ~> gossipManagerPartition ~> rumorPartition ~> merger
methodPartitioner.out(portRumorState) ~> rumorStatePartition ~> merger

/* close the shape */
FlowShape(input.in, merger.out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.AskableActorRef
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.ConnectionMediator
import ch.epfl.pop.model.network.{ErrorObject, JsonRpcRequest, JsonRpcResponse, MethodType, ResultObject}
import ch.epfl.pop.model.network.{ErrorObject, JsonRpcRequest, JsonRpcResponse, MethodType, ResultObject, ResultRumor}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.{GreetServer, Rumor}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState}
import ch.epfl.pop.model.objects.{Channel, PublicKey}
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError}
import ch.epfl.pop.pubsub.{AskPatternConstants, ClientActor, MessageRegistry, PubSubMediator}
import ch.epfl.pop.storage.DbActor.{DbActorReadRumor, ReadRumor, WriteRumor}
import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.storage.DbActor.{DbActorGenerateRumorStateAns, DbActorReadRumor, ReadRumor, WriteRumor}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
Expand Down Expand Up @@ -131,4 +132,19 @@ object ParamsHandler extends AskPatternConstants {

}

def rumorStateHandler(dbActorRef: AskableActorRef): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(jsonRpcRequest: JsonRpcRequest) =>
jsonRpcRequest.method match
case MethodType.rumor_state =>
val rumorState = jsonRpcRequest.getParams.asInstanceOf[RumorState]
val generateRumorStateAns = dbActorRef ? DbActor.GenerateRumorStateAns(rumorState)
Await.result(generateRumorStateAns, duration) match
case DbActorGenerateRumorStateAns(rumorList) =>
Right(JsonRpcResponse(RpcValidator.JSON_RPC_VERSION, new ResultObject(ResultRumor(rumorList)), jsonRpcRequest.id))
case _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"RumorStateHandler was not able to generate rumor state answer", jsonRpcRequest.id))
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"RumorStateHandler received a message with unexpected method :$graphMessage", jsonRpcRequest.id))
case graphMessage @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"RumorStateHandler received an unexpected message:$graphMessage", None))

}

}
41 changes: 40 additions & 1 deletion be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.pattern.AskableActorRef
import ch.epfl.pop.decentralized.ConnectionMediator
import ch.epfl.pop.json.MessageDataProtocol
import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat
import ch.epfl.pop.model.network.method.Rumor
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.lao.GreetLao
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
Expand Down Expand Up @@ -425,6 +425,27 @@ final case class DbActor(
}
}

private def getRumorState: RumorState = {
val allPublicKeys = storage.filterKeysByPrefix(storage.RUMOR_DATA_KEY).map(key => PublicKey(Base64Data(key.replaceFirst(storage.RUMOR_DATA_KEY, ""))))
val allRumorData = allPublicKeys.flatMap {
publicKey =>
Try(readRumorData(publicKey)) match
case Success(rumorData: RumorData) => Some(publicKey -> rumorData.lastRumorId())
case Failure(ex) => None
}.toMap
RumorState(allRumorData)
}

private def generateRumorStateAns(rumorState: RumorState): List[Rumor] = {
val localRumorState = getRumorState
val missingRumors = rumorState.isMissingRumorsFrom(localRumorState)
missingRumors.flatMap { (publicKey, rumorIdList) =>
rumorIdList.map { id =>
readRumor((publicKey, id)).get
}
}.toList
}

override def receive: Receive = LoggingReceive {
case Write(channel, message) =>
log.info(s"Actor $self (db) received a WRITE request on channel '$channel'")
Expand Down Expand Up @@ -627,6 +648,13 @@ final case class DbActor(
case failure => sender() ! failure.recover(Status.Failure(_))
}

case GenerateRumorStateAns(rumorState: RumorState) =>
log.info(s"Actor $self (db) received a GenerateRumorStateAns request")
Try(generateRumorStateAns(rumorState)) match {
case Success(rumorList) => sender() ! DbActorGenerateRumorStateAns(rumorList)
case failure => sender() ! failure.recover(Status.Failure(_))
}

case m =>
log.info(s"Actor $self (db) received an unknown message")
sender() ! Status.Failure(DbActorNAckException(ErrorCodes.INVALID_ACTION.id, s"database actor received a message '$m' that it could not recognize"))
Expand Down Expand Up @@ -863,6 +891,13 @@ object DbActor {
*/
final case class ReadRumorData(senderPk: PublicKey) extends Event

/** Requests the db to build a list of rumors that we have that are missing to the rumorState
* @param rumorState
* Map of last seen rumorId per Publickey that we want to complete
*/

final case class GenerateRumorStateAns(rumorState: RumorState) extends Event

// DbActor DbActorMessage correspond to messages the actor may emit
sealed trait DbActorMessage

Expand Down Expand Up @@ -952,6 +987,10 @@ object DbActor {
*/
final case class DbActorReadRumorData(rumorIds: RumorData) extends DbActorMessage

/** Response for a [[GenerateRumorStateAns]]
*/
final case class DbActorGenerateRumorStateAns(rumorList: List[Rumor]) extends DbActorMessage

/** Response for a general db actor ACK
*/
final case class DbActorAck() extends DbActorMessage
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ch.epfl.pop.model.network.method

import ch.epfl.pop.model.objects.{Base64Data, PublicKey}
import org.scalatest.matchers.should.Matchers
import org.scalatest.funsuite.AnyFunSuite as FunSuite

class RumorStateSuite extends FunSuite with Matchers {

test("constructor from json works for RumorState") {
val state: Map[PublicKey, Int] = Map(
PublicKey(Base64Data.encode("1")) -> 1,
PublicKey(Base64Data.encode("2")) -> 2,
PublicKey(Base64Data.encode("3")) -> 3
)

val rumorState: RumorState = RumorState(state)

val encodedDecoded = RumorState.buildFromJson(rumorState.toJsonString)

encodedDecoded.state shouldBe state
}

test("difference from rumorState works well if second element as more rumors") {
val rumorState1: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 1,
PublicKey(Base64Data.encode("2")) -> 2
))

val rumorState2: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 3,
PublicKey(Base64Data.encode("2")) -> 5,
PublicKey(Base64Data.encode("3")) -> 3
))

val diffResult = rumorState1.isMissingRumorsFrom(rumorState2)

diffResult shouldBe Map(
PublicKey(Base64Data.encode("1")) -> List(2, 3),
PublicKey(Base64Data.encode("2")) -> List(3, 4, 5),
PublicKey(Base64Data.encode("3")) -> List(0, 1, 2, 3)
)
}

test("difference from rumorState works well if first element as more rumors") {
val rumorState1: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 3,
PublicKey(Base64Data.encode("2")) -> 5,
PublicKey(Base64Data.encode("3")) -> 3
))

val rumorState2: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 1,
PublicKey(Base64Data.encode("2")) -> 2
))

val diffResult = rumorState1.isMissingRumorsFrom(rumorState2)
diffResult shouldBe Map.empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,4 @@ class RumorSuite extends FunSuite with Matchers {
encodedDecoded.messages.values.zip(rumor.messages.values).foreach((arrMsg1, arrMsg2) => arrMsg1 shouldBe arrMsg2)
encodedDecoded.messages.keys shouldBe rumor.messages.keys
}

test("constructor from json works for RumorState") {
val state: Map[PublicKey, Int] = Map(
PublicKey(Base64Data.encode("1")) -> 1,
PublicKey(Base64Data.encode("2")) -> 2,
PublicKey(Base64Data.encode("3")) -> 3
)

val rumorState: RumorState = RumorState(state)

val encodedDecoded = RumorState.buildFromJson(rumorState.toJsonString)

encodedDecoded.state shouldBe state
}

Comment on lines -46 to -60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete that ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to RumorStateSuite apparently

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package ch.epfl.pop.pubsub.graph.handlers

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.AskableActorRef
import akka.testkit.TestKit
import ch.epfl.pop.pubsub.{AskPatternConstants, MessageRegistry, PubSubMediator}
import ch.epfl.pop.storage.{DbActor, InMemoryStorage}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuiteLike
import akka.pattern.ask
import akka.stream.scaladsl.{Flow, Sink, Source}
import ch.epfl.pop.IOHelper
import ch.epfl.pop.model.network.MethodType.rumor_state
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, ResultEmptyList, ResultObject, ResultRumor}
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.objects.{Base64Data, PublicKey}
import ch.epfl.pop.pubsub.graph.GraphMessage
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.storage.DbActor.DbActorAck
import org.scalatest.matchers.should.Matchers.{a, shouldBe}
import util.examples.Rumor.RumorExample

import scala.concurrent.Await

class RumorStateHandlerSuite extends TestKit(ActorSystem("RumorStateSuiteActorSystem")) with AnyFunSuiteLike with AskPatternConstants with BeforeAndAfterAll {

private var inMemoryStorage: InMemoryStorage = _
private var messageRegistry: MessageRegistry = _
private var pubSubMediatorRef: ActorRef = _
private var dbActorRef: AskableActorRef = _
private var rumorStateHandler: Flow[GraphMessage, GraphMessage, NotUsed] = _
private val rumorState = JsonRpcRequest.buildFromJson(IOHelper.readJsonFromPath("src/test/scala/util/examples/json/rumor_state/rumor_state.json"))

override def beforeAll(): Unit = {
inMemoryStorage = InMemoryStorage()
messageRegistry = MessageRegistry()
pubSubMediatorRef = system.actorOf(PubSubMediator.props, "pubSubRumorState")
dbActorRef = system.actorOf(Props(DbActor(pubSubMediatorRef, messageRegistry, inMemoryStorage)), "dbRumorState")
rumorStateHandler = ParamsHandler.rumorStateHandler(dbActorRef)
}

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

test("rumor state handler generate right type of response") {
val output = Source.single(Right(rumorState)).via(rumorStateHandler).runWith(Sink.head)

Await.result(output, duration) shouldBe a[Right[_, _]]
}

test("rumor state handler fails on wrong input") {
val rumorRpc = JsonRpcRequest.buildFromJson(IOHelper.readJsonFromPath("src/test/scala/util/examples/json/rumor/rumor.json"))
val outputRumor = Source.single(Right(rumorRpc)).via(rumorStateHandler).runWith(Sink.head)

Await.result(outputRumor, duration) shouldBe a[Left[_, _]]

val responseRpc = JsonRpcResponse(RpcValidator.JSON_RPC_VERSION, ResultObject(0), None)
val outputResponse = Source.single(Right(responseRpc)).via(rumorStateHandler).runWith(Sink.head)

Await.result(outputResponse, duration) shouldBe a[Left[_, _]]
}

test("rumor state handler should return the right list of rumors") {
val publicKey = PublicKey(Base64Data.encode("publicKey"))
val rumorList: List[Rumor] = (0 to 10).map(i => Rumor(publicKey, i, Map.empty)).toList

for (rumor <- rumorList)
val writeResult = dbActorRef ? DbActor.WriteRumor(rumor)
Await.result(writeResult, duration) shouldBe a[DbActorAck]

val rumorState = RumorState(Map(
publicKey -> 5
))

val rumorStateRpc = JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, rumor_state, rumorState, Some(0))
val output = Source.single(Right(rumorStateRpc)).via(rumorStateHandler).runWith(Sink.head)
val rumorListResult = rumorList.filter(_.rumorId > 5)
val resultObject = ResultObject(ResultRumor(rumorListResult))
Await.result(output, duration) shouldBe Right(JsonRpcResponse(RpcValidator.JSON_RPC_VERSION, resultObject, rumorStateRpc.id))

}

}
Loading
Loading