Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config file watch service #1570

Merged
merged 4 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ final case class ConnectionMediator(
// List of servers connected
private var serverSet: Set[ActorRef] = Set()

// Ping Monitor to inform it of our ActorRef
monitorRef ! ConnectionMediator.Ping()

override def receive: Receive = {

// Connect to some servers
Expand Down Expand Up @@ -77,4 +80,5 @@ object ConnectionMediator {
final case class ConnectTo(urlList: List[String]) extends Event
final case class NewServerConnected(serverRef: ActorRef) extends Event
final case class ServerLeft(serverRef: ActorRef) extends Event
final case class Ping() extends Event
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
import akka.event.LoggingReceive
import akka.stream.scaladsl.Sink
import ch.epfl.pop.config.RuntimeEnvironment.{readServerPeers, serverPeersListPath}
import ch.epfl.pop.decentralized.Monitor.TriggerHeartbeat
import ch.epfl.pop.model.network.JsonRpcRequest
import ch.epfl.pop.model.network.method.ParamsWithMap
import ch.epfl.pop.pubsub.graph.GraphMessage

import java.nio.file.{Path, WatchService}
import java.nio.file.StandardWatchEventKinds.{ENTRY_CREATE, ENTRY_MODIFY}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters.CollectionHasAsScala

//This actor is tasked with scheduling heartbeats.
// To that end it sees every messages the system receives.
Expand All @@ -35,7 +39,6 @@ final case class Monitor(
override def receive: Receive = LoggingReceive {

case Monitor.AtLeastOneServerConnected =>
connectionMediatorRef = sender()
if (!someServerConnected) {
timers.startTimerWithFixedDelay(periodicHbKey, TriggerHeartbeat, heartbeatRate)
someServerConnected = true
Expand All @@ -61,6 +64,11 @@ final case class Monitor(
}
}

case ConnectionMediator.Ping() =>
log.info("Received ConnectionMediator ping")
connectionMediatorRef = sender()
new Thread(new FileMonitor(connectionMediatorRef)).start()

case _ => /* DO NOTHING */
}
}
Expand All @@ -86,3 +94,35 @@ object Monitor {
final case class TriggerHeartbeat() extends Event
private final case class DoNothing() extends Event
}

// This class watch the list of server peers config file and upon changes
// tells connectionMediator about it
private class FileMonitor(mediatorRef: ActorRef) extends Runnable {

// getParent to exclude the filename from the path, i.e get the config directory path
private val directory: Path = Path.of(serverPeersListPath).getParent
private val watchService: WatchService = directory.getFileSystem.newWatchService()
directory.register(watchService, ENTRY_CREATE, ENTRY_MODIFY)

override def run(): Unit = {
try {
while (!Thread.currentThread().isInterrupted) {
// Blocks until an event happen
val watchKey = watchService.take()

// For any event, read the file and send it
for (event <- watchKey.pollEvents().asScala.toList) {
if (serverPeersListPath.endsWith(event.context().toString)) {
mediatorRef ! ConnectionMediator.ConnectTo(readServerPeers())
}
}
watchKey.reset()
}
} catch {
case _: InterruptedException =>
println("File watch service interrupted")
} finally {
watchService.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class ConnectionMediatorSuite extends TestKit(ActorSystem("ConnectionMediatorSui
ConnectionMediator.props(mockMonitor.ref, ActorRef.noSender, ActorRef.noSender, MessageRegistry())
)

mockMonitor.expectMsg(timeout, ConnectionMediator.Ping())

// Register server
connectionMediatorRef ! ConnectionMediator.NewServerConnected(server.ref)

Expand All @@ -48,6 +50,8 @@ class ConnectionMediatorSuite extends TestKit(ActorSystem("ConnectionMediatorSui
ConnectionMediator.props(mockMonitor.ref, ActorRef.noSender, ActorRef.noSender, MessageRegistry())
)

mockMonitor.expectMsg(timeout, ConnectionMediator.Ping())

// Register servers
connectionMediatorRef ! ConnectionMediator.NewServerConnected(server1.ref)
connectionMediatorRef ! ConnectionMediator.NewServerConnected(server2.ref)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package ch.epfl.pop.decentralized

import akka.actor.ActorSystem
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.Source
import akka.testkit.{TestKit, TestProbe}
import ch.epfl.pop.config.RuntimeEnvironment.serverPeersListPath
import ch.epfl.pop.config.RuntimeEnvironmentTestingHelper.testWriteToServerPeersConfig
import ch.epfl.pop.model.network.method.ParamsWithMap
import ch.epfl.pop.model.network.{JsonRpcRequest, MethodType}
import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.{AnyFunSuiteLike => FunSuiteLike}
import org.scalatest.matchers.should.Matchers
import util.examples.JsonRpcRequestExample

import java.io.{File, PrintWriter}
import java.nio.file.Path
import scala.concurrent.duration.DurationInt

class MonitorSuite extends TestKit(ActorSystem("MonitorSuiteActorSystem")) with FunSuiteLike with Matchers with BeforeAndAfterAll {
Expand Down Expand Up @@ -99,4 +104,39 @@ class MonitorSuite extends TestKit(ActorSystem("MonitorSuiteActorSystem")) with
testProbe.send(monitorRef, Monitor.AtLeastOneServerConnected)
testProbe.expectMsgType[Monitor.GenerateAndSendHeartbeat](timeout)
}

test("monitor should send ConnectTo() requests to ConnectionMediator upon relevant config file change") {
val mockConnectionMediator = TestProbe()
val monitorRef = system.actorOf(Monitor.props(ActorRef.noSender))

// Ping monitor to inform it of ConnectionMediatorRef
mockConnectionMediator.send(monitorRef, ConnectionMediator.Ping())

// Expect no message as long as the server peers list is untouched
mockConnectionMediator.expectNoMessage(timeout)

val newContent = List("some", "strings")
testWriteToServerPeersConfig(newContent)

mockConnectionMediator.expectMsgType[ConnectionMediator.ConnectTo](timeout)
}

test("monitor should not react upon non relevant events in config directory") {
val mockConnectionMediator = TestProbe()
val monitorRef = system.actorOf(Monitor.props(ActorRef.noSender))

// Ping monitor to inform it of ConnectionMediatorRef
mockConnectionMediator.send(monitorRef, ConnectionMediator.Ping())

// Create new file in the directory
val filePath = Path.of(serverPeersListPath).getParent.toString + File.separator + "DELETE_ME"
val file = new PrintWriter(filePath)
file.write("Hello")
file.close()

// Set the file we created to delete itself after the jvm shutdown
new File(filePath).deleteOnExit()

mockConnectionMediator.expectNoMessage(timeout)
}
}