Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine-tune Decentralized communications #1604

Merged
merged 6 commits into from
Jun 1, 2023

Conversation

K1li4nL
Copy link
Contributor

@K1li4nL K1li4nL commented May 25, 2023

Two things in this pr:

GetMessagesByIdResponseHanlder:
The validation has changed so that instead of retrying a certain amount of time per channel, each message has its own counter.

ConnectionMediator:
Prevent it to connect to the same urls upon change in the config. The behavior is not perfect as for now there is no link between clientActorRef and their urls. GreetServer will be helpful in that regard.

@K1li4nL K1li4nL added be2-scala decentralized Decentralized Communication Project labels May 25, 2023
@K1li4nL K1li4nL requested a review from a team as a code owner May 25, 2023 15:42
@K1li4nL K1li4nL force-pushed the work-be2-lauener-Fix-get_messages_by_id-handler branch from d3f8a27 to 1707365 Compare May 27, 2023 10:27
@K1li4nL K1li4nL force-pushed the work-be2-lauener-Fix-get_messages_by_id-handler branch from 1707365 to d5dcc91 Compare May 27, 2023 12:19
@K1li4nL K1li4nL changed the title Fix GetMessagesByIdResponseHandler to work on large result messages Fine tune Decentralized communications May 27, 2023
@K1li4nL K1li4nL changed the title Fine tune Decentralized communications Fine-tune Decentralized communications May 27, 2023
Copy link
Contributor

@Ajkunas Ajkunas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the comments are nitpicks or questions. Othw all good :)

}

// Push a set of message corresponding to one channel through the pipeline
// Push a message corresponding through the pipeline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Push a message corresponding through the pipeline
// Push a message through the pipeline

}
}
}

passThroughPipeline(failedMessages, validatorFlow, remainingAttempts - 1)
passThroughPipeline(failedMessages.reverse, validatorFlow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to reverse your list of messages ?

Copy link
Contributor Author

@K1li4nL K1li4nL May 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I unfortunately don't have any data to back this up but:
Since we are ordering the channels based on the length, I wanted to keep their order. (given that we add the failed graphMessage on the top of the failed list, the order is reversed)

I don't know performance wise if one way is better than the other though...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really a need to keep that ordering all the way through the simulation pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed since its utility beside being confusing is questionable.

Copy link
Contributor

@Jharaxus Jharaxus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice code, it indeed makes more sense to have a retry per message !

Comment on lines 123 to 124
var publishedList = map.foldLeft(List.empty[JsonRpcRequest])((acc, elem) =>
acc ++ elem._2.map(msg =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could use a flatMap instead of foldLeft + an accumulator

)
}
)
publishedList = publishedList.sortWith((e1, e2) => e1.getParamsChannel.channel.length <= e2.getParamsChannel.channel.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ordering part of the decentralized specification? I find an ordering by the channel name's length quite strange... Is there some specific intent behind that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a scala side, you could avoir using var for publishedList by simply returning

publishedList.sortWith(...)
.map(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absoluetely not in the protocol. The intent is as follows:
In general shorter channel name are less specific and so should be treated first.

/root: should be treated first as it contains the lao creation messages
/root/lao_id: should be treated right after since it contains the rollcalls and all.

It might not be very elegant and there are exceptions but the retries seem to be enough to deal with them

}
}
}

passThroughPipeline(failedMessages, validatorFlow, remainingAttempts - 1)
passThroughPipeline(failedMessages.reverse, validatorFlow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really a need to keep that ordering all the way through the simulation pipeline?

if (retry > 0) {
messagesThroughPipeline(validatorFlow, graphMessage) match {
case Left(err) =>
failedMessages = (graphMessage, retry - 1) :: failedMessages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicks: You could probably use the operator ::= that you made me discover a couple of lines above :)
On a side note, maybe using a mutable collection from scala.collection.mutable would be better than using a var for performance purposes

)(implicit system: ActorSystem): Boolean = {
if (receivedResponse.isEmpty || remainingAttempts <= 0) {
if (receivedResponse.isEmpty || receivedResponse.forall(elem => elem._2 <= 0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second check receivedResponse.forall(elem => elem._2 <= 0) is actually not really necesssary as messages with retry == 0 are ignored in the following foreach(...), so if all the responses received have 0 retry left, the next passThroughPipeline will have an empty list of responses and will therefor still return (so it would avoid iterating over the entire list of responses 2 times per call instead of only 1)

def responseHandler(messageRegistry: MessageRegistry)(implicit system: ActorSystem): Flow[GraphMessage, GraphMessage, NotUsed] = Flow[GraphMessage].map {
case Right(JsonRpcResponse(_, Some(resultObject), None, _)) =>
resultObject.resultMap match {
case Some(resultMap) =>
val receivedResponse: Map[Channel, Set[GraphMessage]] = wrapMsgInPublish(resultMap)
val receivedResponse = wrapMsgInPublish(resultMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: maybe you could pass the number of retries as a parameter to wrapMsgInPublish as you used to do with passThroughPipeline: otherwise, I feel like it kinds of come from nowhere as wrapMsgInPublish as a name doesn't really carry the intent of allocating a number of retry to each message...

@sonarcloud
Copy link

sonarcloud bot commented May 31, 2023

[PoP - Be1-Go] Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

@sonarcloud
Copy link

sonarcloud bot commented May 31, 2023

[PoP - Be2-Scala] Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

90.0% 90.0% Coverage
0.0% 0.0% Duplication

@sonarcloud
Copy link

sonarcloud bot commented May 31, 2023

[PoP - Fe1-Web] Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

Copy link
Contributor

@Ajkunas Ajkunas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sonarcloud
Copy link

sonarcloud bot commented Jun 1, 2023

[PoP - Fe2-Android] Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

@K1li4nL K1li4nL dismissed Jharaxus’s stale review June 1, 2023 14:10

Comments taken into account, thanks Hugo !

@K1li4nL K1li4nL merged commit 787b884 into master Jun 1, 2023
@K1li4nL K1li4nL deleted the work-be2-lauener-Fix-get_messages_by_id-handler branch June 1, 2023 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
be2-scala decentralized Decentralized Communication Project
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants