-
Notifications
You must be signed in to change notification settings - Fork 1
Communication
This document is intended to describe the communication primitives that are used in the HGDB peer system.
The general principles the communication is built upon are:
- All communication is done asynchronously.
- Peers do not control each other, but each peer should be implemented so that it maximizes the output of the system. Workers should actually compete for jobs instead of clients competing for resources
It is possible for an action to affect atoms on more than one peer and, as such, the problem of consistency is of actuality. Lets call all actions that affect the state of the atoms in a database "write actions". Given the design of our system we have the following requirements for the consistency implementations:
- Must be asynchronous. In database terminology we distinguish between eager and lasy propagation of write actions (eager propagates write actions within the scope of the transaction while lasy allows a transaction to commit locally and then the change is propagated in the system). HGDB should use lasy propagation.
- Must make a compromise between consistency and assumptions on the delivery order of the messages. We can not rely on the fact that group communication ensures a total ordering on all messages (that is if e1 happens before e2, it is not guaranteed that all peers will observe e1 before they observe e2)
- Must allow selective replication. That means that only a part of the atoms in the peer database are replicated to another peer (non-disjunctive sets of atoms can be replicated to different peers). Ideally the sets should be defined in a declarative manner by each peer (ex. peer A will inform peer B that it is interested of all atoms stored with peer B for which a certain expression evaluates to true).
- Assume that no peer is designated as holding the primary copy of an atom.
A good paradigm to are the epidemic algorithms for database replication (see http://csdl2.computer.org/persagen/DLAbsToc.jsp?resourcePath=/dl/trans/tk/&toc=comp/trans/tk/2003/05/k5toc.xml&DOI=10.1109/TKDE.2003.1232274) and http://www.cs.umd.edu/~keleher/papers/monet99.pdf .
It is important to define what ordering can be imposed on the messages that are exchanged between peers. An important order is the causality relation between transactions (if transaction B was executed after observing transaction A, all peers must execute transaction B after transaction A). While this is important, there are systems that might require a more optimistic approach and might not be willing to pay the price of constant election mechanisms (elections assume independence of peer failures and not all systems have this property).
As a first implementation we can ignore the causality order guarantee, and go with a very optimistic approach (assume that the probability of the same atom being updated on two peers before the second peer observes the update made by the first is zero). On the medium term a currency approach would be interesting. It basically attaches a currency of 1 to an atom and then split it according to some algorithm when new replicas are created. By requiring that voters with a certain amount of currency need to agree before committing a transaction, one can control the type and magnitude of the elections).
Another thing to consider (and probably of greater immediate importance) is a mechanism by which peers can catch up with modifications after being disconnected. The choice is between a push technique (where updates are constantly pushed to peers until they come back online) and a pull technique (where the peer queries for updates when it comes back online).
It is proven that pull techniques have a higher convergence rate (stale peers get up to date faster). The solution is to have each peer, on start-up, present its state to the other peers and then receive all updates.
The main issue here is how will peer B, based on the description of the state of peer A, figure out what to send to peer A and in which order. Since we allow for partial replication, we can not assume that peer B has all the data that peer A need to come up to date with. Another use case here is load balancing - a snapshot of a peer database can be copied to another node, started up, and should automatically come up to date and participate in the group operations.
A good compromise would be to ensure that all transactions generated by one peer are applied in the same order at all peers. This would ensure consistency for scenarios where the same atom is not modified by two peers in the same time, which is a common scenario in a distributed environment (especially in one that does not make a point from hiding the distributed nature of data from the client).
This can be achieved by assigning a version number to each message. A peer will not apply a message until all messages with smaller numbers have been applied.
Because peers that have announced their interest in a certain event (an operation on an atom) might not be online and available at the time the event happens, the system must guarantee that the event will eventually reach the peer (we assume down times are not long).
Each peer maintains a log of events. The log will impose the ordering of the events (it is important that at all peers events from the same source are applied in the same order). Each event is given a version number (might be a time stamp).
The log order is a stronger constraint then the causality relation between two transactions, so using the log order is ok, although not optimal.
One aspect to consider here is that, due to partitions, not all peers will see all events, in other words, if i<k<j, it is possible for a peer to apply event j after event i (if he is not interested in event k). In order for peers to know what events are required in order to apply an event j, at the moment the event is created in the local log, it is also computed what partitions it belongs to and who is the previous event from that partition.
Log records are required until all interested peers have received them and the originating peer knows that they have been received. Each peer will have a vector Tik = v, where i is the current peer, k is a known peer and v is the last event that originated from i and i knows k received it. It is obvious that a log record v is required as long as there is at least one k such that Tik < v.
As a future improvement, we could allow log records to be replicated on a few peers that could then take over the identity of the originating peer in case it fails.
Every time a peer i signals an event to another peer j, it will also send the version number of the event and the version number of the previous event that should be at j. j will decide based on the its knowledge about the last event received from i if it is up to date and can apply the event or not. If yes, j sends a confirm message to peer i which will update his knowledge about the knowledge of j. If not, it will send a request for all the events that were missed by j.
When peer j will start it will first publish all his interests and then start an "catch up" phase in which it sends each peer i the version number of the last event received from i and request any available updates.
- Peer i sends a call-for-proposal to peer j.
(cfp
:sender (peer i)
:receiver (peer j)
:on-behalf-of (peer i)
:content
(action remember actionID1 advertisement)
:ontology ontology_name
)
on-behalf-of allows peers to forward cfps to other known peers based on their knowledge of the network topology
advertisement is a representation of the atom that allows peers to take an informed decision about their ability to send a proposal in response to this call-for-proposal. The form and meaning of the advertisement is described by the ontology and should be domain specific.
- Peer j receives the call for proposal and decides to do one of the following: propose to peer i to store the atom or forward the message and send an inform message to peer i (this can be used by peer i to determine if the request is still propagating or must be resent)
The propose message has the following form:
(propose
:sender (peer j)
:receiver (peer i)
:in-reply-to actionID1
:content
(action remember actionID2 advertisement)
:ontology ontology_name
)
in-reply-to identifies the call for proposal that is answered
advertisement contains information about the proposal value in the defined ontology. Multiple peers may decide to compete on this call for proposal and peer i needs to take a decision.
- Peer i receives the proposal and can decide to accept or reject it. In the first case (accept) the message sent to peer j will also contain the data required by peer j to complete the operation. In the second case (reject) a message is sent to peer j containing the reason for rejection (this reason can be used by peer j to improve his knowledge on the network, and make better proposals next time)
One of the following messages is sent:
(accept-proposal
:sender (peer i)
:receiver (peer j)
:in-reply-to actionID2
:content
(action remember missing_data)
)
(reject-proposal
:sender (peer i)
:receiver (peer j)
:in-reply-to actionID2
:content
(reason)
:ontology ontology_name
)
When peer i sends the accept-proposal message, peer j might fail to complete the task for various reasons. In this case peer i will try to accept other proposals or store the atom in local cache and retry at a later time.
Given a handle one can make no assumption on the location of the data. One reason for this is that we will eventually offer a load balancing mechanism that will move atoms between peers and, as such, the handle will be a location independent identifier for an atom. This can be a scalability issue if a search of the entire network is performed every time a handle is requested, but the network will have to organize such that handles that are likely to be requested by peer i are very "close" to peer i (ideally on peer i).
The mechanism to retrieve an atom is similar to that of adding one: a request-for-proposal is published, in this case the action is return and the advertisement is the handle.
Usually an atom should be on a single peer but we should not assume that as various failure scenarios may introduce split brain conditions or other inconsistent state. A peer that has the atom will send a proposal that will include as advertisement a version, a summary of the atom or the atom it self. Peer j might choose to accept or reject (and provide a reason) the proposal.
A scenario to consider here is the one in which peer i queries for an atom that does not exist (might have been deleted, might have never existed, or the peer hosting it may no longer be running). In this case no one would respond to the call-for-proposal. For the time being we can assume a timeout for each query, but more complex behavior might be worth considering.
The implementation uses two base concepts: tasks and conversations. Both are activities - basically finite state machines.
A task manages multiple conversations and the transitions between states are triggered by changes in the states of the conversations. A conversation is established between exactly two peers and the transitions between states is done by either having one peer explicitly calling a method (the "say" transitions) or by receiving a message from the other peer (the "hear" transitions).
Characteristics of conversations:
- All messages need to arrive in the "proper" order. If any out-of-order messages are received a "do-not-understand" reply is generated.
- They are not strictly related to an action. The conversation only imposes the ordering of the performatives that are being exchanged. That being said, there can be special purpose conversation types that can be used only for a certain action.
- Conversations should be designed with composability in mind, but even composed conversations will assume a strict order of the messages.
Characteristics of tasks:
- The transition of a task are determined by changes in the states of the conversations handled by the task. These changes can happen when the activity is not in an appropriate state to handle them. The task must make sure that these changes are available if the state changes.
- It is the responsibility of the conversations to ensure that no state change happens while the conversation is in a wait state at a task. This is easily ensured by the strict ordering presumption made for conversations.
- Implementors of the Task base class will see a serialized version of the changes that happened in the conversations (they will not observe a change in a conversation while handling a change in another conversation). There is no specific need for ordering the events, but eventually it would be useful for implementors to declare a priority based ordering of the conversations state changes (in the first use case above, the propose message is more important than the inform message).