CIS 307: Mutual Exclusion and Elections in Distributed Systems

Pages 265 - 269 and 262 - 263 in Tanenbaum-vanSteen.

Mutual Exclusion with a Manager

We are given n sites P1 ... Pn that want to use in mutual exclusion a resource R. The requirements of mutual exclusion, progress, fairness, etc. apply now as they did when we were in a shared memory system. But now there is no shared memory, thus no support for spinlocks and semaphores. One easy way of dealing with the problem is to have a Manager Process [or Coordinator, or Leader] M to whom to send requests and from whom the clients receive permission.

Here is how a client would operate

    ..........
    Send request to M
    Wait for reply from M
    Use resource R
    Send release notice to M
    ..........
This means that each use of the resource will have as overhead 3 messages. If W is the time we use the resource and T is the time taken by each message then the total time spent for a critical region is (W+3T) and the maximum rate of use of the resource for a single user is 1/(W+3T). Requests of multiple users can occur while the resource is being used, thus we can have the utilization W/(W+2T).

Now we look at what is done by the manager M:

    It uses two variables:
      resourceIsFree, a boolean indicating state of resource, initially true;
      rqueue, a FIFO queue containing pending requests, initially empty;

    loop {
      wait for a message;
      if the message is a request {
        if resourceIsFree {
	  allocate resource to requestor;
          set resourceIsFree to false;
        } else
	  insert request in rqueue;
      } else /* it is a release */
        if rqueue is empty 
	  set resourceIsFree to true;
        else {
	  remove a request from rqueue;
	  send reply to that requestor;}        
    }
Of course we are in trouble if the Manager dies. Firstly we do not know its current state (i.e., the state of the resource and who is waiting), second, we don't have a coordinator anymore. For the second problem we hold an election to select a new manager. For the first problem, each process has to recognise failure and adjust accordingly [each process knows its own state at the time of the failure and can communicate it to the new manager].
By the way, how do we recognize that a leader (or other participant) is dead? We need to have some form of Heartbeat Protocol to monitor the "aliveness" of the nodes. [At regular intervals, say, every few seconds, a participant will send a messages that says "I am alive". Or send a message asking "are you alive?".]

Notice the program executed by the manager M: after initialization it consists just of "handlers" for the various possible message types. This is typical for "responsive systems", i.e. systems whose purpose it to be available to answer messages from clients. Notice also that while the protocol for mutual exclusion is going on, the protocol for hearbeat is also executing, and so may also other protocols.

No Manager: Ricart-Agrawala Algorithm

We now consider the problem of mutual exclusion in the fully distributed case where there is no Manager. We assume that we have n processes, possibly at different sites, P1, P2, ..,Pn. When a process needs the resource it sends a request to the other (n-1) processes; when it receives (n-1) grant replies, it goes on to using the resource. When a process is done with the resource it sends a message to all the processes that are waiting for the resource. [This solution assumes a reliable mechanism for delivering messages.] For each use of the critical region it requires 2*(n-1) messages.

Each process must be written to include two threads of control. One to deal with the communication activities, the other to carry out the computations of this process.

Variables

    self: the identity of this process (a number like 7);
    localClock: an integer representing a local logical clock (initially 0);
	used to record the time at which this node last made a request
    highestClock: an integer, the largest clock value seen in a request
	(initially 0);
    need: an integer representing number of replies needed before self
	can enter the critical region;
    pending: a boolean array with n entries; entry k (k!=self) is true 
        iff process Pk has made a request and process self has not responded;
	the self entry is true if process self wants to enter critical region;
    mutex: mutual exclusion semaphore used to protect shared variables.
    s: a blocking semaphore used to hold the processing thread while the
	comunication thread takes care of synchronization.

Communication Thread

  loop {
    wait for a message;
    P(mutex);
    if the message is a request {
       let k be the identity of requestor and c the timestamp on message;
       set highestClock to max(localClock, c);
       if (pending[self] and ([self,localClock] < [k, c])){//[k,c] is less recent
		// [self,localClock] is less than [k,c] if localClock is 
		// less than c or they are equal and self is less than k
	  set pending[k] to true;
       } else { 
	  send response to k-th requestor;}
    } else {  // the message is a reply
       decrement need by 1;
       if need is 0 // Replies come only if we sent a request
           V(s);}   // Free the processing thread 
    V(mutex):
  }

Processing Thread

    P(mutex);
    set pending[self] to true;
    set localClock to highestClock + 1;
    set need to n-1;
    V(mutex);
    send to all other processes a request stamped with [self, localClock];
    P(s); // Here we wait for release from communication thread
    CRITICAL REGION
    P(mutex);
    set pending[self] to false;
    for k from 1 to n {
      if pending[k] {
 	 send response to k;
         set pending[k] to false;}
    V(mutex);
This algorithm, due to Ricart-Agrawala has been extended by Raymond to the case where more that 1 process can be in the critical region at one time and to the case where requests are for multiple copies of a resource.
An improvement on Ricart-Agrawala is based on the observation that unanimity is not required: majority will do. That is, we were acquiring a resource only after receiving approval from all other processes. Now we do so only after approval of a majority. This will suffice if nobody approves two consecutive requests without an intermediate release. In any case this is an algorithm that requires so many messages and is so vulnerable to failures in the participants that it is only of theoretical significance.

Bully Election Algorithm (Garcia-Molina)

Election algorithms are used to choose coordinators in centralized distributed systems (i.e. systems where there is a central authority in decision making). Elections are required to select a new coordinator when the current coordinator has failed or when a node recovers from a previous failure.

The following algorithm, a variation on the Garcia-Molina algorithm, carries out elections in a semi-synchronous distributed system. (A distributed system is semi-synchronous if there are known deadlines on the delivery of messages.) It is assumed that each node has an identifier, a number, and that that number represents the priority of the node. The node with highest priority among the available nodes will become the new coordinator after an election.

The algorithm has a number of parts:

  1. What is done during an election procedure by the node that called the election
  2. What is done by each node in response to the messages it receives
In addition there is code run to (decide if to) start a new election.

A node can be in one of the following states:

A number of kinds of messages are sent by the elector (i.e. the node starting the election):

The replies are messages tagged ANSWER that contain the identity and status of the respondent.

Each node keeps the following variables:

Election Procedure

The election procedure is run whenever there is reason to assume that there has been a recovery of a previously failed node, or the failure of the coordinator is recognised by a node that times out in receiving a response for a service requested from the coordinator. The election procedure is run from a specific node and goes through a number of phases.
PHASE 1: it checks if there are higher priority nodes that are active. If yes, the current node knows that it is not going to be the coordinator and stops its active involvement in the election. Otherwise it carries on tentatively as the new coordinator.
PHASE 2: it informs all lower priority nodes that an election is in progress, that it is the new coordinator, and collects the identity of the nodes that are active.
PHASE 3: it makes sure that all the active nodes now agree that SELF is the new coordinator. If they don't the election is restarted.
PHASE 4: it distributes to the active nodes information for recovery from failure; if there has been any change in the active nodes since phase 2, the election is restarted.
  Elect:
	//Phase 1
	For each node j > SELF
		Send STATUS_POLL message to j;
		Wait with timeout for reply;
		If there is a reply
		   return; // SELF cannot be the new coordinator
	// Phase 2
	Set ACTIVE to the empty set;
	For each node j < SELF
		Send ELECTION message to j
		Wait with timeout for reply;
		If there is a reply
		   add j to ACTIVE;
	// Phase 3
	Set ANSWERS to the empty set;
	For each node j < SELF
		Send COORDINATOR SELF message to j;
		Wait with timeout for reply;
		If there is a reply
		   add j to ANSWERS;
	If ANSWERS != ACTIVE
	    Goto Elect; // We restart things since the network has changed
	// Phase 4
	Set ANSWERS to empty;
	For each node j in ACTIVE
		Send NEW-STATE message and recovery information to j;
		If there is a reply
		   add j to ANSWERS;
	If ANSWERS != ACTIVE
	   Goto Elect;

Responding to Election Messages

A thread will always be there to answer to election messages with the following code:
	while (1)
	{
		Wait for a message m;
		switch (m.messageType) {
	        case STATUS_POLL:
			Send SELF, STATUS;
			break; 
		case ELECTION(sender): 
			Set STATUS to ELECTION;
			Let CANDIDATE be a new variable and set it to sender;
			Stop processing on this node;
			Stop any other election that may be in progress at this time;
			Send SELF;
			break;
		case COORDINATOR(sender):
			If Status == ELECTION and sender == CANDIDATE
			   Set STATUS to REORGANIZATION;
			   Set COORDINATOR to sender;
			   Send SELF;
			break;
		case NEW_STATE(sender,RecoveryInfo):
			If STATUS == REORGANIZATION and sender == COORDINATOR
			   Set STATUS to NORMAL;
			   Use the Recovery Information;}
			
	}

Coordinator's Poll

The current coordinator will at regular intervals poll the nodes in the system to detect changes in their status (this is a heartbeat protocol), say, using a handler for a timer interrupt:
	If SELF == COORDINATOR and STATUS == NORMAL
	   For each node j
		Send STATUS-POLL message to j;
		Wait with timeout for reply(sender, status);
		If j is not in ACTIVE or returned status is not NORMAL 
		   Call the election procedure;

Code run by a node after recovery from Failure

	Set STATUS to DOWN;
	Call the election procedure;