Skip to content
bolerio edited this page Jul 10, 2015 · 1 revision

Introduction

This document is intended to give an example on how the work flow based framework can be used to create new tasks.

As a definition, a task is a unit of work that needs to be fulfilled by a peer. In order to complete the task, the peer will engage in (possibly) multiple conversations with its neighbors.

Implementing a new task

Problem

Consider we use a ring structure (for whatever purpose - could be replication for example). The structure requires each peer to have a next and a previous neighbor and also requires that the last peer is followed by the first (and the first is preceded by the last).

The problem is how to implement a join mechanism - a peer wants "in" and, as such, must negotiate with two consecutive peers.

Algorithm

The new peer (call it P) will send out message announcing that it wants to join the ring. Peers that are in the ring and are interested will reply with a proposal that will also contain the address of their successor. If peer Pi replies and states that peer Pj is its successor, and P decides to accept the proposal the following happens:

  1. P send an acknowledgment to Pi asking it to not accept any other requests for a given period of time
  2. P sends a proposal to peer Pj asking for permission to join as a predecessor
  3. If Pj accepts, P will send acknowledgments to both Pi and Pj and everybody updates their data
  4. If Pj rejects the request or fails to answer in a given time, P will send an acknowledgment to Pi and start waiting for other peers.

Work flow on peer P

The states of the task at peer P are:

Started the peer is waiting for proposals to join the ring
FirstAccepted the peer accepted a proposal from Pi and started contacting Pj
Done Pj accepted and both Pi and Pj are acknowledged and the task is finished

The states of the task at peer Pi are:

Started the peer received a call for proposal
Proposed peer P was sent a proposal
Accepted peer P accepted the proposal and requested to block for a given time
Confirmed peer P managed to contact Pj
Disconfirmed peer P did not manage to contact Pj or Pj did not agree
Done The task is finished

The states at peer Pj are:

Started the peer received a proposal from P
Accepted the peer accepted the proposal from P
Confirmed / Diconfirmed P confirmed / disconfirmed
Done Task is finished

Implementation

Because the steps in the conversations are allready implemented, we will use ProposalConversation

For the task on peer P the following need to be done:

  1. Extend TaskActivity

  2. Overwrite the startTask function. The implementation will have to declare when and how are conversation messages received + needs to send call for proposal messages to neighbors.

To declare workflow transitions based on conversation updates the registerConversationHandler function has to be called:

registerConversationHandler(State.Started, ProposalConversation.State.Proposed, "handleProposal", State.Working);

The line above states that whenever the task is in state Started and a conversation sent a Proposed message the handleProposal function is called after the current state of the task is set to Working (this ensures that no other messages are processed while executing the function). Note that if a Proposed message arrives while the task is not in the Started state, the message will be stored and considered if the task ever reaches the Started state again.

The hadleProposal function will do the following:

public State handleProposal(AbstractActivity<?> fromActivity)
{
	ProposalConversation conversation = (ProposalConversation)fromActivity;
	
	//create reply message
	Object message = conversation.getMessage();
	Object reply = getReply(message);

	//ask to block for 1 second
	combine(reply, struct(BLOCK, 1000));
	conversation.accept(reply);
	
	//get the address of peer j from the message and send a proposal

	//return appropriate state
	return State.FirstAccepted;
}
   

At this point the task will accept Accept and Reject messages from Pj

Add these lines to the _startTask- function.

registerConversationHandler(State.FirstAccepted, ProposalConversation.State.Accepted, "handleAccept", State.Working);

registerConversationHandler(State.FirstAccepted, ProposalConversation.State.Rejected, "handleReject", State.Working);

```_

public State handleAccept(AbstractActivity<?> fromActivity) { ProposalConversation conversation = (ProposalConversation)fromActivity;

//send confirm messages to Pi and Pj

//update internal state

//the task has ended
return State.Done;

}

public State handleReject(AbstractActivity<?> fromActivity) { ProposalConversation conversation = (ProposalConversation)fromActivity;

//send disconfirm messages to Pi 

//back to the begining...
return State.Started;

}

Clone this wiki locally