-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fine-tune Decentralized communications #1604
Fine-tune Decentralized communications #1604
Conversation
d3f8a27
to
1707365
Compare
1707365
to
d5dcc91
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the comments are nitpicks or questions. Othw all good :)
} | ||
|
||
// Push a set of message corresponding to one channel through the pipeline | ||
// Push a message corresponding through the pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Push a message corresponding through the pipeline | |
// Push a message through the pipeline |
be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/GetMessagesByIdResponseHandler.scala
Outdated
Show resolved
Hide resolved
be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/GetMessagesByIdResponseHandler.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
} | ||
|
||
passThroughPipeline(failedMessages, validatorFlow, remainingAttempts - 1) | ||
passThroughPipeline(failedMessages.reverse, validatorFlow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to reverse your list of messages ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I unfortunately don't have any data to back this up but:
Since we are ordering the channels based on the length, I wanted to keep their order. (given that we add the failed graphMessage
on the top of the failed list, the order is reversed)
I don't know performance wise if one way is better than the other though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there really a need to keep that ordering all the way through the simulation pipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed since its utility beside being confusing is questionable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice code, it indeed makes more sense to have a retry per message !
var publishedList = map.foldLeft(List.empty[JsonRpcRequest])((acc, elem) => | ||
acc ++ elem._2.map(msg => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could use a flatMap
instead of foldLeft
+ an accumulator
) | ||
} | ||
) | ||
publishedList = publishedList.sortWith((e1, e2) => e1.getParamsChannel.channel.length <= e2.getParamsChannel.channel.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this ordering part of the decentralized specification? I find an ordering by the channel name's length quite strange... Is there some specific intent behind that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a scala side, you could avoir using var
for publishedList
by simply returning
publishedList.sortWith(...)
.map(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absoluetely not in the protocol. The intent is as follows:
In general shorter channel name are less specific and so should be treated first.
/root
: should be treated first as it contains the lao creation messages
/root/lao_id
: should be treated right after since it contains the rollcalls and all.
It might not be very elegant and there are exceptions but the retries seem to be enough to deal with them
} | ||
} | ||
} | ||
|
||
passThroughPipeline(failedMessages, validatorFlow, remainingAttempts - 1) | ||
passThroughPipeline(failedMessages.reverse, validatorFlow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there really a need to keep that ordering all the way through the simulation pipeline?
if (retry > 0) { | ||
messagesThroughPipeline(validatorFlow, graphMessage) match { | ||
case Left(err) => | ||
failedMessages = (graphMessage, retry - 1) :: failedMessages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpicks: You could probably use the operator ::=
that you made me discover a couple of lines above :)
On a side note, maybe using a mutable collection from scala.collection.mutable
would be better than using a var
for performance purposes
)(implicit system: ActorSystem): Boolean = { | ||
if (receivedResponse.isEmpty || remainingAttempts <= 0) { | ||
if (receivedResponse.isEmpty || receivedResponse.forall(elem => elem._2 <= 0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second check receivedResponse.forall(elem => elem._2 <= 0)
is actually not really necesssary as messages with retry == 0
are ignored in the following foreach(...)
, so if all the responses received have 0 retry left, the next passThroughPipeline
will have an empty list of responses and will therefor still return (so it would avoid iterating over the entire list of responses 2 times per call instead of only 1)
def responseHandler(messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map { | ||
case Right(JsonRpcResponse(_, Some(resultObject), None, _)) => | ||
resultObject.resultMap match { | ||
case Some(resultMap) => | ||
val receivedResponse: Map[Channel, Set[GraphMessage]] = wrapMsgInPublish(resultMap) | ||
val receivedResponse = wrapMsgInPublish(resultMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: maybe you could pass the number of retries as a parameter to wrapMsgInPublish
as you used to do with passThroughPipeline
: otherwise, I feel like it kinds of come from nowhere as wrapMsgInPublish
as a name doesn't really carry the intent of allocating a number of retry to each message...
[PoP - Be1-Go] Kudos, SonarCloud Quality Gate passed! |
[PoP - Be2-Scala] Kudos, SonarCloud Quality Gate passed! |
[PoP - Fe1-Web] Kudos, SonarCloud Quality Gate passed! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
[PoP - Fe2-Android] Kudos, SonarCloud Quality Gate passed! |
Comments taken into account, thanks Hugo !
Two things in this pr:
GetMessagesByIdResponseHanlder
:The validation has changed so that instead of retrying a certain amount of time per channel, each message has its own counter.
ConnectionMediator
:Prevent it to connect to the same urls upon change in the config. The behavior is not perfect as for now there is no link between clientActorRef and their urls.
GreetServer
will be helpful in that regard.