Skip to content

Commit

Permalink
Merge pull request #1604 from dedis/work-be2-lauener-Fix-get_messages…
Browse files Browse the repository at this point in the history
…_by_id-handler

Fine-tune Decentralized communications
  • Loading branch information
K1li4nL authored Jun 1, 2023
2 parents 19574db + 2c31f10 commit 787b884
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.WebSocketRequest
import akka.pattern.AskableActorRef

import ch.epfl.pop.model.network.method.{Heartbeat, ParamsWithMap}
import ch.epfl.pop.model.network.{JsonRpcRequest, MethodType}
import ch.epfl.pop.pubsub.ClientActor.ClientAnswer
Expand All @@ -20,6 +19,7 @@ final case class ConnectionMediator(

// List of servers connected
private var serverSet: Set[ActorRef] = Set()
private var previousUrlList: List[String] = Nil

// Ping Monitor to inform it of our ActorRef
monitorRef ! ConnectionMediator.Ping()
Expand All @@ -28,7 +28,9 @@ final case class ConnectionMediator(

// Connect to some servers
case ConnectionMediator.ConnectTo(urlList) =>
urlList.map(url =>
val urlDiff = urlList.diff(previousUrlList)
previousUrlList :::= urlDiff
urlDiff.map(url =>
Http().singleWebSocketRequest(
WebSocketRequest(url),
PublishSubscribe.buildGraph(
Expand All @@ -53,8 +55,10 @@ final case class ConnectionMediator(
log.info("Server left")
serverSet -= serverRef
// Tell monitor to stop scheduling heartbeats since there is no one to receive them
if (serverSet.isEmpty)
if (serverSet.isEmpty) {
previousUrlList = Nil
monitorRef ! Monitor.NoServerConnected
}

case Heartbeat(map) =>
log.info("Sending a heartbeat to the servers")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,32 @@ import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError, pretty
import ch.epfl.pop.pubsub.{AskPatternConstants, MessageRegistry, PublishSubscribe}

import scala.annotation.tailrec
import scala.util.Success
import scala.concurrent.Await

//This object's job is to handle responses it receives from other servers after sending a heartbeat.
// When receiving the missing messages, the server's job is to write them on the database.
object GetMessagesByIdResponseHandler extends AskPatternConstants {

private final val MAX_RETRY = 5
private final val SUCCESS = 0
private val MAX_RETRY_PER_MESSAGE = 10
private val SUCCESS = 0

// This function packs each message into a publish before pushing them into the pipeline
/** Validate and replay on the system each message received in the get_messages_by_id result
*
* @param messageRegistry
* The system message registry to use in the validation pipeline
* @param system
* Implicit actor system to use the given validator
* @return
* Left if some messages couldn't be validated after MAX_RETRY_PER_MESSAGE times, Right for success
*/
def responseHandler(messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(JsonRpcResponse(_, Some(resultObject), None, _)) =>
resultObject.resultMap match {
case Some(resultMap) =>
val receivedResponse: Map[Channel, Set[GraphMessage]] = wrapMsgInPublish(resultMap)
val receivedResponse = wrapMsgInPublish(resultMap, MAX_RETRY_PER_MESSAGE)
val validator = PublishSubscribe.validateRequests(ActorRef.noSender, messageRegistry)
val success: Boolean = passThroughPipeline(receivedResponse, validator, MAX_RETRY)
val success: Boolean = passThroughPipeline(receivedResponse, validator)
if (success) {
Right(JsonRpcResponse(RpcValidator.JSON_RPC_VERSION, new ResultObject(SUCCESS), None))
} else {
Expand All @@ -42,66 +51,98 @@ object GetMessagesByIdResponseHandler extends AskPatternConstants {
case value @ _ => value
}

// This function will try to digest the set of messages for an entire channel
// It will go channel by channel until it finished processing messages or MAX_RETRY is reached
/** Will try to digest each GraphMessage until their retry-counter reaches 0 or they all get validated
*
* @param receivedResponse
* The wrapped messages with their counter
* @param validatorFlow
* The validator to push the messages through
* @param logListFailure
* The list of accumulated error logs, default to Nil
* @param system
* Implicit actor system to use the given validator
* @return
* true if all messages could be validated, otherwise false
*/
@tailrec
private def passThroughPipeline(
receivedResponse: Map[Channel, Set[GraphMessage]],
receivedResponse: List[(GraphMessage, Int)],
validatorFlow: Flow[GraphMessage, GraphMessage, NotUsed],
remainingAttempts: Int
logListFailure: List[String] = Nil
)(implicit system: ActorSystem): Boolean = {
if (receivedResponse.isEmpty || remainingAttempts <= 0) {
return receivedResponse.isEmpty
if (receivedResponse.isEmpty) {
if (logListFailure.nonEmpty) {
logListFailure.foreach(log => println(log))
}
return logListFailure.isEmpty
}

var failedMessages: Map[Channel, Set[GraphMessage]] = Map.empty
var failedMessages: List[(GraphMessage, Int)] = Nil
var logs: List[String] = logListFailure
receivedResponse.foreach {
case (channel, messagesSet) =>
val failedMsgErrorMap = messagesThroughPipeline(validatorFlow, messagesSet)
if (failedMsgErrorMap.nonEmpty) {
failedMessages += channel -> failedMsgErrorMap.keySet
case (graphMessage, retry) =>
if (retry > 0) {
messagesThroughPipeline(validatorFlow, graphMessage) match {
case Left(err) =>
if (retry == 1) {
// only log a during last attempt
logs ::= "Errors: " + err.toString + "\n On:\n" + prettyPrinter(graphMessage)
} else {
failedMessages ::= graphMessage -> (retry - 1)
}

// only log a during last attempt
if (remainingAttempts == 1) {
println("Errors: " + failedMsgErrorMap.map {
case (msg, error) => error.toString + "\n On:\n" + prettyPrinter(msg)
})
case _ => /* DO NOTHING */
}
}
}

passThroughPipeline(failedMessages, validatorFlow, remainingAttempts - 1)
passThroughPipeline(failedMessages, validatorFlow, logs)
}

// Push a set of message corresponding to one channel through the pipeline
/** Push a message through the given validator
*
* @param validator
* The pipeline verifying the message
* @param message
* The message to verify
* @param system
* Implicit actor system to use the given validator
* @return
* The result of the validator on the given message, Left for failure, Right for success
*/
private def messagesThroughPipeline(
validator: Flow[GraphMessage, GraphMessage, NotUsed],
messageSet: Set[GraphMessage]
)(implicit system: ActorSystem): Map[GraphMessage, PipelineError] = {
var failedGraphMessages: Map[GraphMessage, PipelineError] = Map.empty
messageSet.foreach { message =>
val singleRun = Source.single(message).via(validator).runWith(Sink.foreach {
case Left(err: PipelineError) =>
failedGraphMessages += message -> err

case _ => /* Nothing to do */
})
Await.ready(singleRun, duration)
message: GraphMessage
)(implicit system: ActorSystem): GraphMessage = {
val output = Source.single(message).via(validator).runWith(Sink.head)
Await.ready(output, duration).value.get match {
case Success(res @ _) => res
case err @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, err.toString, None))
}

failedGraphMessages
}

private def wrapMsgInPublish(map: Map[Channel, Set[Message]]): Map[Channel, Set[GraphMessage]] = {
map.map { case (channel, set) =>
channel -> set.map(msg =>
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
MethodType.PUBLISH,
new Publish(channel, msg),
Some(0)
))
)
}
/** Wrap messages in Publish RPCs along with a counter
*
* @param map
* A map of channels to their set of messages
* @return
* A list of GraphMessage - Int pairs where the GraphMessage are sorted in increasing order of their channel length and the Int is the number of retry left for each message
*/
private def wrapMsgInPublish(map: Map[Channel, Set[Message]], retryPerMessage: Int): List[(GraphMessage, Int)] = {
val publishedList: List[JsonRpcRequest] = map.flatMap {
case (channel, set) =>
set.map(msg =>
JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
MethodType.PUBLISH,
new Publish(channel, msg),
Some(0)
)
)
}.toList

publishedList
.sortWith((jsonMsg1, jsonMsg2) => jsonMsg1.getParamsChannel.channel.length <= jsonMsg2.getParamsChannel.channel.length)
.map(js => (Right(js), retryPerMessage))
}
}

0 comments on commit 787b884

Please sign in to comment.