Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce heartbeatbuild logic duplication #1808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions be2-scala/src/main/scala/ch/epfl/pop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.util.Timeout
import ch.epfl.pop.authentication.{GetRequestHandler, PopchaWebSocketResponseHandler}
import ch.epfl.pop.config.RuntimeEnvironment
import ch.epfl.pop.config.RuntimeEnvironment._
import ch.epfl.pop.decentralized.{ConnectionMediator, HeartbeatGenerator, Monitor}
import ch.epfl.pop.decentralized.{ConnectionMediator, Monitor}
import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe}
import ch.epfl.pop.storage.{DbActor, SecurityModuleActor}
import org.iq80.leveldb.Options
Expand Down Expand Up @@ -49,8 +49,7 @@ object Server {
val securityModuleActorRef: AskableActorRef = system.actorOf(Props(SecurityModuleActor(RuntimeEnvironment.securityPath)))

// Create necessary actors for server-server communications
val heartbeatGenRef: ActorRef = system.actorOf(HeartbeatGenerator.props(dbActorRef))
val monitorRef: ActorRef = system.actorOf(Monitor.props(heartbeatGenRef))
val monitorRef: ActorRef = system.actorOf(Monitor.props(dbActorRef))
val connectionMediatorRef: ActorRef = system.actorOf(ConnectionMediator.props(monitorRef, pubSubMediatorRef, dbActorRef, securityModuleActorRef, messageRegistry))

// Setup routes
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,34 @@ package ch.epfl.pop.decentralized
import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
import akka.event.LoggingReceive
import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Sink
import ch.epfl.pop.config.RuntimeEnvironment.{readServerPeers, serverPeersListPath}
import ch.epfl.pop.decentralized.Monitor.TriggerHeartbeat
import ch.epfl.pop.model.network.JsonRpcRequest
import ch.epfl.pop.model.network.method.ParamsWithMap
import ch.epfl.pop.model.network.method.{Heartbeat, ParamsWithMap}
import ch.epfl.pop.model.objects.{Channel, Hash}
import ch.epfl.pop.pubsub.AskPatternConstants
import ch.epfl.pop.pubsub.graph.GraphMessage
import ch.epfl.pop.storage.DbActor

import java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY
import java.nio.file.{Path, WatchService}
import java.nio.file.StandardWatchEventKinds.{ENTRY_CREATE, ENTRY_MODIFY}
import scala.collection.immutable.HashMap
import scala.concurrent.Await
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.Success

//This actor is tasked with scheduling heartbeats.
// To that end it sees every messages the system receives.
// To that end it sees every message the system receives.
// When a message is seen it schedule a heartbeat in the next heartbeatRate seconds.
// Periodic heartbeats are sent with a period of messageDelay seconds.
final case class Monitor(
heartbeatGenRef: ActorRef,
dbActorRef: AskableActorRef,
heartbeatRate: FiniteDuration,
messageDelay: FiniteDuration
) extends Actor with ActorLogging with Timers {
) extends Actor with ActorLogging with Timers with AskPatternConstants() {

// These keys are used to keep track of the timers states
private val periodicHbKey = 0
Expand Down Expand Up @@ -51,7 +58,15 @@ final case class Monitor(
case Monitor.TriggerHeartbeat =>
log.info("triggering a heartbeat")
timers.cancel(singleHbKey)
heartbeatGenRef ! Monitor.GenerateAndSendHeartbeat(connectionMediatorRef)

val askForHeartbeat = dbActorRef ? DbActor.GenerateHeartbeat()
val heartbeat: HashMap[Channel, Set[Hash]] =
Await.ready(askForHeartbeat, duration).value.get match
case Success(DbActor.DbActorGenerateHeartbeatAck(map)) => map
case _ => HashMap.empty[Channel, Set[Hash]] // Handle anything else

if (heartbeat.nonEmpty)
connectionMediatorRef ! Heartbeat(heartbeat)

case Right(jsonRpcMessage: JsonRpcRequest) =>
jsonRpcMessage.getParams match {
Expand All @@ -74,8 +89,8 @@ final case class Monitor(
}

object Monitor {
def props(heartbeatGenRef: ActorRef, heartbeatRate: FiniteDuration = 15.seconds, messageDelay: FiniteDuration = 1.seconds): Props =
Props(new Monitor(heartbeatGenRef, heartbeatRate, messageDelay))
def props(dbActorRef: AskableActorRef, heartbeatRate: FiniteDuration = 15.seconds, messageDelay: FiniteDuration = 1.seconds): Props =
Props(new Monitor(dbActorRef, heartbeatRate, messageDelay))

def sink(monitorRef: ActorRef): Sink[GraphMessage, NotUsed] = {
Sink.actorRef(
Expand All @@ -90,7 +105,6 @@ object Monitor {
sealed trait Event
final case class AtLeastOneServerConnected() extends Event
final case class NoServerConnected() extends Event
final case class GenerateAndSendHeartbeat(connectionMediatorRef: ActorRef) extends Event
final case class TriggerHeartbeat() extends Event
private final case class DoNothing() extends Event
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,22 @@ object ParamsWithMapHandler extends AskPatternConstants {
/** first step is to retrieve the received heartbeat from the jsonRpcRequest */
val receivedHeartBeat: Map[Channel, Set[Hash]] = jsonRpcMessage.getParams.asInstanceOf[Heartbeat].channelsToMessageIds

/** second step is to retrieve the local set of channels */
var setOfChannels: Set[Channel] = Set()
val ask = dbActorRef ? DbActor.GetAllChannels()
Await.ready(ask, duration).value match {
case Some(Success(DbActor.DbActorGetAllChannelsAck(channels))) =>
setOfChannels = channels
case Some(Failure(ex: DbActorNAckException)) =>
Left(PipelineError(ex.code, s"couldn't retrieve local set of channels", jsonRpcMessage.getId))
/** finally, we only keep from the received heartbeat the message ids that are not contained in the locally extracted heartbeat. */
val ask = dbActorRef ? DbActor.GenerateHeartbeat()
Await.ready(ask, duration).value.get match
case Success(DbActor.DbActorGenerateHeartbeatAck(map)) =>
var missingIdsMap: HashMap[Channel, Set[Hash]] = HashMap()
receivedHeartBeat.keys.foreach(channel => {
val missingIdsSet = receivedHeartBeat(channel).diff(map.getOrElse(channel, Set.empty))
if (missingIdsSet.nonEmpty)
missingIdsMap += (channel -> missingIdsSet)
})
Right(JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, MethodType.get_messages_by_id, GetMessagesById(missingIdsMap), Some(0)))

case Failure(ex: DbActorNAckException) =>
Left(PipelineError(ex.code, s"couldn't retrieve localHeartBeat", jsonRpcMessage.getId))
case reply =>
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"heartbeatHandler failed : unexpected DbActor reply '$reply'", jsonRpcMessage.getId))
}

/** third step is to ask the DB for the content of each channel in terms of message ids. */
val localHeartBeat: mutable.HashMap[Channel, Set[Hash]] = mutable.HashMap()
setOfChannels.foreach(channel => {
val ask = dbActorRef ? DbActor.ReadChannelData(channel)
Await.ready(ask, duration).value match {
case Some(Success(DbActor.DbActorReadChannelDataAck(channelData))) =>
val setOfIds = channelData.messages.toSet
localHeartBeat += (channel -> setOfIds)
case Some(Failure(ex: DbActorNAckException)) =>
Left(PipelineError(ex.code, s"couldn't readChannelData for local heartbeat", jsonRpcMessage.getId))
case reply =>
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"heartbeatHandler failed : unexpected DbActor reply '$reply'", jsonRpcMessage.getId))
}
})

/** finally, we only keep from the received heartbeat the message ids that are not contained in the locally extracted heartbeat. */
var missingIdsMap: HashMap[Channel, Set[Hash]] = HashMap()
receivedHeartBeat.keys.foreach(channel => {
val missingIdsSet = receivedHeartBeat(channel).diff(localHeartBeat.getOrElse(channel, Set.empty))
if (missingIdsSet.nonEmpty)
missingIdsMap += (channel -> missingIdsSet)
})
Right(JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, MethodType.get_messages_by_id, GetMessagesById(missingIdsMap), Some(0)))

case Right(jsonRpcMessage: JsonRpcResponse) =>
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, "HeartbeatHandler received a 'JsonRpcResponse'", jsonRpcMessage.id))
Expand Down
44 changes: 38 additions & 6 deletions be2-scala/src/main/scala/ch/epfl/pop/storage/DbActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import ch.epfl.pop.json.MessageDataProtocol.GreetLaoFormat
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.lao.GreetLao
import ch.epfl.pop.model.network.method.message.data.{ActionType, ObjectType}
import ch.epfl.pop.model.objects.*
import ch.epfl.pop.model.objects.Channel.{LAO_DATA_LOCATION, ROOT_CHANNEL_PREFIX}
import ch.epfl.pop.model.objects._
import ch.epfl.pop.pubsub.graph.AnswerGenerator.timout
import ch.epfl.pop.pubsub.graph.{ErrorCodes, JsonString}
import ch.epfl.pop.pubsub.{MessageRegistry, PubSubMediator, PublishSubscribe}
import ch.epfl.pop.storage.DbActor._
import ch.epfl.pop.storage.DbActor.*
import com.google.crypto.tink.subtle.Ed25519Sign

import java.util.concurrent.TimeUnit
import scala.collection.immutable.HashMap
import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -130,7 +131,7 @@ final case class DbActor(

@throws[DbActorNAckException]
private def readElectionData(laoId: Hash, electionId: Hash): ElectionData = {
Try(storage.read(storage.DATA_KEY + s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/private/${electionId.toString}")) match {
Try(storage.read(storage.DATA_KEY + s"$ROOT_CHANNEL_PREFIX${laoId.toString}/private/${electionId.toString}")) match {
case Success(Some(json)) => ElectionData.buildFromJson(json)
case Success(None) => throw DbActorNAckException(ErrorCodes.SERVER_ERROR.id, s"ElectionData for election $electionId not in the database")
case Failure(ex) => throw ex
Expand Down Expand Up @@ -243,7 +244,7 @@ final case class DbActor(

@throws[DbActorNAckException]
private def createElectionData(laoId: Hash, electionId: Hash, keyPair: KeyPair): Unit = {
val channel = Channel(s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/private/${electionId.toString}")
val channel = Channel(s"$ROOT_CHANNEL_PREFIX${laoId.toString}/private/${electionId.toString}")
if (!checkChannelExistence(channel)) {
val pair = (storage.DATA_KEY + channel.toString) -> ElectionData(electionId, keyPair).toJsonString
storage.write(pair)
Expand Down Expand Up @@ -299,7 +300,7 @@ final case class DbActor(
@throws[DbActorNAckException]
private def generateLaoDataKey(channel: Channel): String = {
channel.decodeChannelLaoId match {
case Some(data) => storage.DATA_KEY + s"${Channel.ROOT_CHANNEL_PREFIX}$data${LAO_DATA_LOCATION}"
case Some(data) => storage.DATA_KEY + s"${Channel.ROOT_CHANNEL_PREFIX}$data$LAO_DATA_LOCATION"
case None =>
log.error(s"Actor $self (db) encountered a problem while decoding LAO channel from '$channel'")
throw DbActorNAckException(ErrorCodes.SERVER_ERROR.id, s"Could not extract the LAO id for channel $channel")
Expand All @@ -308,7 +309,7 @@ final case class DbActor(

// generates the key of the RollCallData to store in the database
private def generateRollCallDataKey(laoId: Hash): String = {
storage.DATA_KEY + s"${ROOT_CHANNEL_PREFIX}${laoId.toString}/rollcall"
storage.DATA_KEY + s"$ROOT_CHANNEL_PREFIX${laoId.toString}/rollcall"
}

@throws[DbActorNAckException]
Expand Down Expand Up @@ -369,6 +370,20 @@ final case class DbActor(
(publicKey, privateKey)
}

@throws[DbActorNAckException]
private def generateHeartbeat(): HashMap[Channel, Set[Hash]] = {
val setOfChannels = getAllChannels
if (setOfChannels.isEmpty) return HashMap()
val heartbeatMap: HashMap[Channel, Set[Hash]] = setOfChannels.foldLeft(HashMap.empty[Channel, Set[Hash]]) {
(acc, channel) =>
readChannelData(channel).messages.toSet match {
case setOfIds if setOfIds.nonEmpty => acc + (channel -> setOfIds)
case _ => acc
}
}
heartbeatMap
}

override def receive: Receive = LoggingReceive {
case Write(channel, message) =>
log.info(s"Actor $self (db) received a WRITE request on channel '$channel'")
Expand Down Expand Up @@ -543,6 +558,13 @@ final case class DbActor(
case failure => sender() ! failure.recover(Status.Failure(_))
}

case GenerateHeartbeat() =>
log.info(s"Actor $self (db) received a GenerateHeartbeat request")
Try(generateHeartbeat()) match {
case Success(heartbeat) => sender() ! DbActorGenerateHeartbeatAck(heartbeat)
K1li4nL marked this conversation as resolved.
Show resolved Hide resolved
case failure => sender() ! failure.recover(Status.Failure(_))
}

case m =>
log.info(s"Actor $self (db) received an unknown message")
sender() ! Status.Failure(DbActorNAckException(ErrorCodes.INVALID_ACTION.id, s"database actor received a message '$m' that it could not recognize"))
Expand Down Expand Up @@ -758,6 +780,9 @@ object DbActor {
*/
final case class ReadServerPrivateKey() extends Event

/** Request to generate a local heartbeat */
final case class GenerateHeartbeat() extends Event

// DbActor DbActorMessage correspond to messages the actor may emit
sealed trait DbActorMessage

Expand Down Expand Up @@ -832,6 +857,13 @@ object DbActor {
*/
final case class DbActorReadServerPrivateKeyAck(privateKey: PrivateKey) extends DbActorMessage

/** Response for a [[GenerateHeartbeat]] db request Receiving [[DbActorGenerateHeartbeatAck]] works as an acknowledgement that the request was successful
*
* @param heartbeatMap
* requested heartbeat as a map from the channels to message ids
*/
final case class DbActorGenerateHeartbeatAck(heartbeatMap: HashMap[Channel, Set[Hash]]) extends DbActorMessage

/** Response for a general db actor ACK
*/
final case class DbActorAck() extends DbActorMessage
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package ch.epfl.pop.decentralized

import akka.actor.Actor
import ch.epfl.pop.model.objects.{Channel, DbActorNAckException}
import ch.epfl.pop.pubsub.graph.ErrorCodes.SERVER_ERROR
import ch.epfl.pop.storage.DbActor
import ch.epfl.pop.model.objects.DbActorNAckException

class FailingToyDbActor extends Actor {
override def receive: Receive = {
case _ => {
case _ =>
sender() ! DbActorNAckException(0, "")
}
}

}
Loading
Loading