Skip to content

Commit

Permalink
Merge pull request #1808 from dedis/work-be2-onsriahi-Reduce-heartbea…
Browse files Browse the repository at this point in the history
…tbuild-logic-duplication

Reduce heartbeatbuild logic duplication
  • Loading branch information
onsriahi14 authored Apr 19, 2024
2 parents 572c577 + 9396f41 commit c5d8af4
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 228 deletions.
5 changes: 2 additions & 3 deletions be2-scala/src/main/scala/ch/epfl/pop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.util.Timeout
import ch.epfl.pop.authentication.{GetRequestHandler, PopchaWebSocketResponseHandler}
import ch.epfl.pop.config.RuntimeEnvironment
import ch.epfl.pop.config.RuntimeEnvironment._
import ch.epfl.pop.decentralized.{ConnectionMediator, HeartbeatGenerator, Monitor}
import ch.epfl.pop.decentralized.{ConnectionMediator, Monitor}
import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe}
import ch.epfl.pop.storage.{DbActor, SecurityModuleActor}
import org.iq80.leveldb.Options
Expand Down Expand Up @@ -49,8 +49,7 @@ object Server {
val securityModuleActorRef: AskableActorRef = system.actorOf(Props(SecurityModuleActor(RuntimeEnvironment.securityPath)))

// Create necessary actors for server-server communications
val heartbeatGenRef: ActorRef = system.actorOf(HeartbeatGenerator.props(dbActorRef))
val monitorRef: ActorRef = system.actorOf(Monitor.props(heartbeatGenRef))
val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef))
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, messageRegistry))

// Setup routes
Expand Down

This file was deleted.

32 changes: 23 additions & 9 deletions be2-scala/src/main/scala/ch/epfl/pop/decentralized/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,34 @@ package ch.epfl.pop.decentralized
import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
import akka.event.LoggingReceive
import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Sink
import ch.epfl.pop.config.RuntimeEnvironment.{readServerPeers, serverPeersListPath}
import ch.epfl.pop.decentralized.Monitor.TriggerHeartbeat
import ch.epfl.pop.model.network.JsonRpcRequest
import ch.epfl.pop.model.network.method.ParamsWithMap
import ch.epfl.pop.model.network.method.{Heartbeat, ParamsWithMap}
import ch.epfl.pop.model.objects.{Channel, Hash}
import ch.epfl.pop.pubsub.AskPatternConstants
import ch.epfl.pop.pubsub.graph.GraphMessage
import ch.epfl.pop.storage.DbActor

import java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY
import java.nio.file.{Path, WatchService}
import java.nio.file.StandardWatchEventKinds.{ENTRY_CREATE, ENTRY_MODIFY}
import scala.collection.immutable.HashMap
import scala.concurrent.Await
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.Success

//This actor is tasked with scheduling heartbeats.
// To that end it sees every messages the system receives.
// To that end it sees every message the system receives.
// When a message is seen it schedule a heartbeat in the next heartbeatRate seconds.
// Periodic heartbeats are sent with a period of messageDelay seconds.
final case class Monitor(
heartbeatGenRef: ActorRef,
dbActorRef: AskableActorRef,
heartbeatRate: FiniteDuration,
messageDelay: FiniteDuration
) extends Actor with ActorLogging with Timers {
) extends Actor with ActorLogging with Timers with AskPatternConstants() {

// These keys are used to keep track of the timers states
private val periodicHbKey = 0
Expand Down Expand Up @@ -51,7 +58,15 @@ final case class Monitor(
case Monitor.TriggerHeartbeat =>
log.info("triggering a heartbeat")
timers.cancel(singleHbKey)
heartbeatGenRef ! Monitor.GenerateAndSendHeartbeat(connectionMediatorRef)

val askForHeartbeat = dbActorRef ? DbActor.GenerateHeartbeat()
val heartbeat: HashMap[Channel, Set[Hash]] =
Await.ready(askForHeartbeat, duration).value.get match
case Success(DbActor.DbActorGenerateHeartbeatAck(map)) => map
case _ => HashMap.empty[Channel, Set[Hash]] // Handle anything else

if (heartbeat.nonEmpty)
connectionMediatorRef ! Heartbeat(heartbeat)

case Right(jsonRpcMessage: JsonRpcRequest) =>
jsonRpcMessage.getParams match {
Expand All @@ -74,8 +89,8 @@ final case class Monitor(
}

object Monitor {
def props(heartbeatGenRef: ActorRef, heartbeatRate: FiniteDuration = 15.seconds, messageDelay: FiniteDuration = 1.seconds): Props =
Props(new Monitor(heartbeatGenRef, heartbeatRate, messageDelay))
def props(dbActorRef: AskableActorRef, heartbeatRate: FiniteDuration = 15.seconds, messageDelay: FiniteDuration = 1.seconds): Props =
Props(new Monitor(dbActorRef, heartbeatRate, messageDelay))

def sink(monitorRef: ActorRef): Sink[GraphMessage, NotUsed] = {
Sink.actorRef(
Expand All @@ -90,7 +105,6 @@ object Monitor {
sealed trait Event
final case class AtLeastOneServerConnected() extends Event
final case class NoServerConnected() extends Event
final case class GenerateAndSendHeartbeat(connectionMediatorRef: ActorRef) extends Event
final case class TriggerHeartbeat() extends Event
private final case class DoNothing() extends Event
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,22 @@ object ParamsWithMapHandler extends AskPatternConstants {
/** first step is to retrieve the received heartbeat from the jsonRpcRequest */
val receivedHeartBeat: Map[Channel, Set[Hash]] = jsonRpcMessage.getParams.asInstanceOf[Heartbeat].channelsToMessageIds

/** second step is to retrieve the local set of channels */
var setOfChannels: Set[Channel] = Set()
val ask = dbActorRef ? DbActor.GetAllChannels()
Await.ready(ask, duration).value match {
case Some(Success(DbActor.DbActorGetAllChannelsAck(channels))) =>
setOfChannels = channels
case Some(Failure(ex: DbActorNAckException)) =>
Left(PipelineError(ex.code, s"couldn't retrieve local set of channels", jsonRpcMessage.getId))
/** finally, we only keep from the received heartbeat the message ids that are not contained in the locally extracted heartbeat. */
val ask = dbActorRef ? DbActor.GenerateHeartbeat()
Await.ready(ask, duration).value.get match
case Success(DbActor.DbActorGenerateHeartbeatAck(map)) =>
var missingIdsMap: HashMap[Channel, Set[Hash]] = HashMap()
receivedHeartBeat.keys.foreach(channel => {
val missingIdsSet = receivedHeartBeat(channel).diff(map.getOrElse(channel, Set.empty))
if (missingIdsSet.nonEmpty)
missingIdsMap += (channel -> missingIdsSet)
})
Right(JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, MethodType.get_messages_by_id, GetMessagesById(missingIdsMap), Some(0)))

case Failure(ex: DbActorNAckException) =>
Left(PipelineError(ex.code, s"couldn't retrieve localHeartBeat", jsonRpcMessage.getId))
case reply =>
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"heartbeatHandler failed : unexpected DbActor reply '$reply'", jsonRpcMessage.getId))
}

/** third step is to ask the DB for the content of each channel in terms of message ids. */
val localHeartBeat: mutable.HashMap[Channel, Set[Hash]] = mutable.HashMap()
setOfChannels.foreach(channel => {
val ask = dbActorRef ? DbActor.ReadChannelData(channel)
Await.ready(ask, duration).value match {
case Some(Success(DbActor.DbActorReadChannelDataAck(channelData))) =>
val setOfIds = channelData.messages.toSet
localHeartBeat += (channel -> setOfIds)
case Some(Failure(ex: DbActorNAckException)) =>
Left(PipelineError(ex.code, s"couldn't readChannelData for local heartbeat", jsonRpcMessage.getId))
case reply =>
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"heartbeatHandler failed : unexpected DbActor reply '$reply'", jsonRpcMessage.getId))
}
})

/** finally, we only keep from the received heartbeat the message ids that are not contained in the locally extracted heartbeat. */
var missingIdsMap: HashMap[Channel, Set[Hash]] = HashMap()
receivedHeartBeat.keys.foreach(channel => {
val missingIdsSet = receivedHeartBeat(channel).diff(localHeartBeat.getOrElse(channel, Set.empty))
if (missingIdsSet.nonEmpty)
missingIdsMap += (channel -> missingIdsSet)
})
Right(JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, MethodType.get_messages_by_id, GetMessagesById(missingIdsMap), Some(0)))

case Right(jsonRpcMessage: JsonRpcResponse) =>
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "HeartbeatHandler received a 'JsonRpcResponse'", jsonRpcMessage.id))
Expand Down
44 changes: 38 additions & 6 deletions be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.lao.GreetLao
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
import ch.epfl.pop.model.objects.*
import ch.epfl.pop.model.objects.Channel.{LAO_DATA_LOCATION, ROOT_CHANNEL_PREFIX}
import ch.epfl.pop.model.objects._
import ch.epfl.pop.pubsub.graph.AnswerGenerator.timout
import ch.epfl.pop.pubsub.graph.{ErrorCodes, JsonString}
import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe}
import ch.epfl.pop.storage.DbActor._
import ch.epfl.pop.storage.DbActor.*
import com.google.crypto.tink.subtle.Ed25519Sign

import java.util.concurrent.TimeUnit
import scala.collection.immutable.HashMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -130,7 +131,7 @@ final case class DbActor(

@throws[DbActorNAckException]
private def readElectionData(laoId: Hash, electionId: Hash): ElectionData = {
Try(storage.read(storage.DATA_KEY + s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/private/${electionId.toString}")) match {
Try(storage.read(storage.DATA_KEY + s"$ROOT_CHANNEL_PREFIX${laoId.toString}/private/${electionId.toString}")) match {
case Success(Some(json)) => ElectionData.buildFromJson(json)
case Success(None) => throw DbActorNAckException(ErrorCodes.SERVER_ERROR.id, s"ElectionData for election $electionId not in the database")
case Failure(ex) => throw ex
Expand Down Expand Up @@ -243,7 +244,7 @@ final case class DbActor(

@throws[DbActorNAckException]
private def createElectionData(laoId: Hash, electionId: Hash, keyPair: KeyPair): Unit = {
val channel = Channel(s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/private/${electionId.toString}")
val channel = Channel(s"$ROOT_CHANNEL_PREFIX${laoId.toString}/private/${electionId.toString}")
if (!checkChannelExistence(channel)) {
val pair = (storage.DATA_KEY + channel.toString) -> ElectionData(electionId, keyPair).toJsonString
storage.write(pair)
Expand Down Expand Up @@ -299,7 +300,7 @@ final case class DbActor(
@throws[DbActorNAckException]
private def generateLaoDataKey(channel: Channel): String = {
channel.decodeChannelLaoId match {
case Some(data) => storage.DATA_KEY + s"${Channel.ROOT_CHANNEL_PREFIX}$data${LAO_DATA_LOCATION}"
case Some(data) => storage.DATA_KEY + s"${Channel.ROOT_CHANNEL_PREFIX}$data$LAO_DATA_LOCATION"
case None =>
log.error(s"Actor $self (db) encountered a problem while decoding LAO channel from '$channel'")
throw DbActorNAckException(ErrorCodes.SERVER_ERROR.id, s"Could not extract the LAO id for channel $channel")
Expand All @@ -308,7 +309,7 @@ final case class DbActor(

// generates the key of the RollCallData to store in the database
private def generateRollCallDataKey(laoId: Hash): String = {
storage.DATA_KEY + s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/rollcall"
storage.DATA_KEY + s"$ROOT_CHANNEL_PREFIX${laoId.toString}/rollcall"
}

@throws[DbActorNAckException]
Expand Down Expand Up @@ -369,6 +370,20 @@ final case class DbActor(
(publicKey, privateKey)
}

@throws[DbActorNAckException]
private def generateHeartbeat(): HashMap[Channel, Set[Hash]] = {
val setOfChannels = getAllChannels
if (setOfChannels.isEmpty) return HashMap()
val heartbeatMap: HashMap[Channel, Set[Hash]] = setOfChannels.foldLeft(HashMap.empty[Channel, Set[Hash]]) {
(acc, channel) =>
readChannelData(channel).messages.toSet match {
case setOfIds if setOfIds.nonEmpty => acc + (channel -> setOfIds)
case _ => acc
}
}
heartbeatMap
}

override def receive: Receive = LoggingReceive {
case Write(channel, message) =>
log.info(s"Actor $self (db) received a WRITE request on channel '$channel'")
Expand Down Expand Up @@ -543,6 +558,13 @@ final case class DbActor(
case failure => sender() ! failure.recover(Status.Failure(_))
}

case GenerateHeartbeat() =>
log.info(s"Actor $self (db) received a GenerateHeartbeat request")
Try(generateHeartbeat()) match {
case Success(heartbeat) => sender() ! DbActorGenerateHeartbeatAck(heartbeat)
case failure => sender() ! failure.recover(Status.Failure(_))
}

case m =>
log.info(s"Actor $self (db) received an unknown message")
sender() ! Status.Failure(DbActorNAckException(ErrorCodes.INVALID_ACTION.id, s"database actor received a message '$m' that it could not recognize"))
Expand Down Expand Up @@ -758,6 +780,9 @@ object DbActor {
*/
final case class ReadServerPrivateKey() extends Event

/** Request to generate a local heartbeat */
final case class GenerateHeartbeat() extends Event

// DbActor DbActorMessage correspond to messages the actor may emit
sealed trait DbActorMessage

Expand Down Expand Up @@ -832,6 +857,13 @@ object DbActor {
*/
final case class DbActorReadServerPrivateKeyAck(privateKey: PrivateKey) extends DbActorMessage

/** Response for a [[GenerateHeartbeat]] db request Receiving [[DbActorGenerateHeartbeatAck]] works as an acknowledgement that the request was successful
*
* @param heartbeatMap
* requested heartbeat as a map from the channels to message ids
*/
final case class DbActorGenerateHeartbeatAck(heartbeatMap: HashMap[Channel, Set[Hash]]) extends DbActorMessage

/** Response for a general db actor ACK
*/
final case class DbActorAck() extends DbActorMessage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package ch.epfl.pop.decentralized

import akka.actor.Actor
import ch.epfl.pop.model.objects.{Channel, DbActorNAckException}
import ch.epfl.pop.pubsub.graph.ErrorCodes.SERVER_ERROR
import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.model.objects.DbActorNAckException

class FailingToyDbActor extends Actor {
override def receive: Receive = {
case _ => {
case _ =>
sender() ! DbActorNAckException(0, "")
}
}

}
Loading

0 comments on commit c5d8af4

Please sign in to comment.