diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala index ed6cea31ff..49673e4db6 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/ConnectionMediator.scala @@ -4,7 +4,6 @@ import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props} import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws.WebSocketRequest import akka.pattern.AskableActorRef - import ch.epfl.pop.model.network.method.{Heartbeat, ParamsWithMap} import ch.epfl.pop.model.network.{JsonRpcRequest, MethodType} import ch.epfl.pop.pubsub.ClientActor.ClientAnswer @@ -20,6 +19,7 @@ final case class ConnectionMediator( // List of servers connected private var serverSet: Set[ActorRef] = Set() + private var previousUrlList: List[String] = Nil // Ping Monitor to inform it of our ActorRef monitorRef ! ConnectionMediator.Ping() @@ -28,7 +28,9 @@ final case class ConnectionMediator( // Connect to some servers case ConnectionMediator.ConnectTo(urlList) => - urlList.map(url => + val urlDiff = urlList.diff(previousUrlList) + previousUrlList :::= urlDiff + urlDiff.map(url => Http().singleWebSocketRequest( WebSocketRequest(url), PublishSubscribe.buildGraph( @@ -53,8 +55,10 @@ final case class ConnectionMediator( log.info("Server left") serverSet -= serverRef // Tell monitor to stop scheduling heartbeats since there is no one to receive them - if (serverSet.isEmpty) + if (serverSet.isEmpty) { + previousUrlList = Nil monitorRef ! Monitor.NoServerConnected + } case Heartbeat(map) => log.info("Sending a heartbeat to the servers") diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/GetMessagesByIdResponseHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/GetMessagesByIdResponseHandler.scala index 4aabdf95ef..cd499ade0a 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/GetMessagesByIdResponseHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/GetMessagesByIdResponseHandler.scala @@ -12,23 +12,32 @@ import ch.epfl.pop.pubsub.graph.{ErrorCodes, GraphMessage, PipelineError, pretty import ch.epfl.pop.pubsub.{AskPatternConstants, MessageRegistry, PublishSubscribe} import scala.annotation.tailrec +import scala.util.Success import scala.concurrent.Await //This object's job is to handle responses it receives from other servers after sending a heartbeat. // When receiving the missing messages, the server's job is to write them on the database. object GetMessagesByIdResponseHandler extends AskPatternConstants { - private final val MAX_RETRY = 5 - private final val SUCCESS = 0 + private val MAX_RETRY_PER_MESSAGE = 10 + private val SUCCESS = 0 - // This function packs each message into a publish before pushing them into the pipeline + /** Validate and replay on the system each message received in the get_messages_by_id result + * + * @param messageRegistry + * The system message registry to use in the validation pipeline + * @param system + * Implicit actor system to use the given validator + * @return + * Left if some messages couldn't be validated after MAX_RETRY_PER_MESSAGE times, Right for success + */ def responseHandler(messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { case Right(JsonRpcResponse(_, Some(resultObject), None, _)) => resultObject.resultMap match { case Some(resultMap) => - val receivedResponse: Map[Channel, Set[GraphMessage]] = wrapMsgInPublish(resultMap) + val receivedResponse = wrapMsgInPublish(resultMap, MAX_RETRY_PER_MESSAGE) val validator = PublishSubscribe.validateRequests(ActorRef.noSender, messageRegistry) - val success: Boolean = passThroughPipeline(receivedResponse, validator, MAX_RETRY) + val success: Boolean = passThroughPipeline(receivedResponse, validator) if (success) { Right(JsonRpcResponse(RpcValidator.JSON_RPC_VERSION, new ResultObject(SUCCESS), None)) } else { @@ -42,66 +51,98 @@ object GetMessagesByIdResponseHandler extends AskPatternConstants { case value @ _ => value } - // This function will try to digest the set of messages for an entire channel - // It will go channel by channel until it finished processing messages or MAX_RETRY is reached + /** Will try to digest each GraphMessage until their retry-counter reaches 0 or they all get validated + * + * @param receivedResponse + * The wrapped messages with their counter + * @param validatorFlow + * The validator to push the messages through + * @param logListFailure + * The list of accumulated error logs, default to Nil + * @param system + * Implicit actor system to use the given validator + * @return + * true if all messages could be validated, otherwise false + */ @tailrec private def passThroughPipeline( - receivedResponse: Map[Channel, Set[GraphMessage]], + receivedResponse: List[(GraphMessage, Int)], validatorFlow: Flow[GraphMessage, GraphMessage, NotUsed], - remainingAttempts: Int + logListFailure: List[String] = Nil )(implicit system: ActorSystem): Boolean = { - if (receivedResponse.isEmpty || remainingAttempts <= 0) { - return receivedResponse.isEmpty + if (receivedResponse.isEmpty) { + if (logListFailure.nonEmpty) { + logListFailure.foreach(log => println(log)) + } + return logListFailure.isEmpty } - var failedMessages: Map[Channel, Set[GraphMessage]] = Map.empty + var failedMessages: List[(GraphMessage, Int)] = Nil + var logs: List[String] = logListFailure receivedResponse.foreach { - case (channel, messagesSet) => - val failedMsgErrorMap = messagesThroughPipeline(validatorFlow, messagesSet) - if (failedMsgErrorMap.nonEmpty) { - failedMessages += channel -> failedMsgErrorMap.keySet + case (graphMessage, retry) => + if (retry > 0) { + messagesThroughPipeline(validatorFlow, graphMessage) match { + case Left(err) => + if (retry == 1) { + // only log a during last attempt + logs ::= "Errors: " + err.toString + "\n On:\n" + prettyPrinter(graphMessage) + } else { + failedMessages ::= graphMessage -> (retry - 1) + } - // only log a during last attempt - if (remainingAttempts == 1) { - println("Errors: " + failedMsgErrorMap.map { - case (msg, error) => error.toString + "\n On:\n" + prettyPrinter(msg) - }) + case _ => /* DO NOTHING */ } } } - passThroughPipeline(failedMessages, validatorFlow, remainingAttempts - 1) + passThroughPipeline(failedMessages, validatorFlow, logs) } - // Push a set of message corresponding to one channel through the pipeline + /** Push a message through the given validator + * + * @param validator + * The pipeline verifying the message + * @param message + * The message to verify + * @param system + * Implicit actor system to use the given validator + * @return + * The result of the validator on the given message, Left for failure, Right for success + */ private def messagesThroughPipeline( validator: Flow[GraphMessage, GraphMessage, NotUsed], - messageSet: Set[GraphMessage] - )(implicit system: ActorSystem): Map[GraphMessage, PipelineError] = { - var failedGraphMessages: Map[GraphMessage, PipelineError] = Map.empty - messageSet.foreach { message => - val singleRun = Source.single(message).via(validator).runWith(Sink.foreach { - case Left(err: PipelineError) => - failedGraphMessages += message -> err - - case _ => /* Nothing to do */ - }) - Await.ready(singleRun, duration) + message: GraphMessage + )(implicit system: ActorSystem): GraphMessage = { + val output = Source.single(message).via(validator).runWith(Sink.head) + Await.ready(output, duration).value.get match { + case Success(res @ _) => res + case err @ _ => Left(PipelineError(ErrorCodes.SERVER_ERROR.id, err.toString, None)) } - - failedGraphMessages } - private def wrapMsgInPublish(map: Map[Channel, Set[Message]]): Map[Channel, Set[GraphMessage]] = { - map.map { case (channel, set) => - channel -> set.map(msg => - Right(JsonRpcRequest( - RpcValidator.JSON_RPC_VERSION, - MethodType.PUBLISH, - new Publish(channel, msg), - Some(0) - )) - ) - } + /** Wrap messages in Publish RPCs along with a counter + * + * @param map + * A map of channels to their set of messages + * @return + * A list of GraphMessage - Int pairs where the GraphMessage are sorted in increasing order of their channel length and the Int is the number of retry left for each message + */ + private def wrapMsgInPublish(map: Map[Channel, Set[Message]], retryPerMessage: Int): List[(GraphMessage, Int)] = { + val publishedList: List[JsonRpcRequest] = map.flatMap { + case (channel, set) => + set.map(msg => + JsonRpcRequest( + RpcValidator.JSON_RPC_VERSION, + MethodType.PUBLISH, + new Publish(channel, msg), + Some(0) + ) + ) + }.toList + + publishedList + .sortWith((jsonMsg1, jsonMsg2) => jsonMsg1.getParamsChannel.channel.length <= jsonMsg2.getParamsChannel.channel.length) + .map(js => (Right(js), retryPerMessage)) } }