\section{More Example Protocols}

This section shows code fragments from several different protocols,
with the goal of illustrating some of the more common protocol
algorithms. All of these examples are taken from protocols distributed
with the {\xk}. See the actual source code for the complete protocol
associated with each fragment.

\subsection{Fragmentation}

BLAST is an {\xk} protocol that fragments large messages into
MTU-sized packets at the sender, and reassembles the fragments back
into the complete message at the receiver. The following shows the
implementation of {\var blastPush}, which contains the {\var for} loop
that generates and transmits all the fragments in a message.  Notice
that it is not necessary to calculate how long the last fragment is
because the {\var msgBreak} operation automatically makes each
fragment the minimum of the specified size ({\var FRAGMENT\_SIZE}) and
how ever many bytes are left in the message.

\begin{verbatim}
static XkReturn
blastPush(Sessn s, Msg *msg)
{
    BlastState *state = s->state;
    BlastHdr   *hdr;
    int        num_frags, i;
    Msg        *fragment;
    char       *buf;

    /* get header template and set MID, incrementing last value used */
    hdr = state->hdr_template;
    if (state->mid == MAX_SEQ_NUM)
        state->mid = 0;
    hdr->MID = ++state->mid;

    /* determine number of fragments */
    if (msgLength(msg) <= FRAGMENT_SIZE)
        num_frags = 1;
    else
        num_frags = (msgLength(msg) + FRAGMENT_SIZE - 1)/FRAGMENT_SIZE;
    hdr->NumFrags = num_frags;

    /* create and transmit individual fragments */
    for (i = 1; i <= num_frags; i++) {
        /* carve a fragment off of original msg */
        msgConstructEmpty(fragment);
        msgBreak(msg, fragment, FRAGMENT_SIZE);

        /* fill in dynamic parts of header */
        hdr->len = msgLength(fragment);
        set_fragment_mask(hdr->mask, i);

        /* add header and send fragment */
        buf = msgPush(fragment, HDR_LEN);
        blast_hdr_store(hdr, buf, HDR_LEN, fragment);
        xPush(xGetDown(s, 0), fragment);

        /* save copy of fragment for future retransmit */
        save_for_retransmit(state->frag_list, fragment, i);
    }
    /* schedule DONE timer */
    state->event = evSchedule(giveup, 0, DONE);
}
\end{verbatim}

This routine also uses the event library to schedule a timeout.
We show a more complete example of how timeouts are implemented
in a later example.

\subsection{Reassembly}

This example shows how IP reassembles fragments that arrive over the
network back into a complete datagram. One reason we give this
particular piece of code is that it is representative of a large
fraction of networking code---it does little more than tedious and
unglamorous bookkeeping. In this case, the code has to keep track of
what fragments have and have not arrived.

First, we define the key data structure ({\var FragList}) that is used
to hold the individual fragments that arrive at the destination.
Incoming fragments are saved in this data structure until all the
fragments in the original datagram have arrived, at which time they
are reassembled into a complete datagram and passed up to some higher
level protocol. Note that each element in {\var FragList} contains
either a fragment or a hole.

\begin{verbatim}
#define FRAGOFFMASK      0x1fff
#define FRAGOFFSET(flag) ((fragflag) & FRAGOFFMASK)
#define INFINITE_OFFSET  0xffff

typedef struct fid {
    IpHost  source;
    IpHost  dest;
    u_char  prot;
    u_char  pad;
    u_short ident;
} FragId;

typedef struct hole {
    u_int first;
    u_int last;
} Hole;

#define HOLE  1
#define FRAG  2

typedef struct fragif {
    u_char type;
    union {
        Hole hole;
        Msg  frag;
    } u;
    struct fragif *next, *prev;
} FragInfo;

typedef struct FragList {
    u_short  nholes;
    FragInfo head;       /* dummy header node */
    Bind     binding;
    bool     gcMark;
} FragList;
\end{verbatim}

The reassembly routine, {\var ipReassemble}, takes the session to
which the datagram belongs ({\var s}), an incoming datagram ({\var
dg}), and the IP header for that datagram ({\var hdr}) as arguments.
The final argument ({\var fragMap}) is used to map the incoming
datagram into appropriate {\var FragList}. (Recall that the group of
fragments that are being reassembled together are uniquely identified
by several fields in the IP header, as defined by structure {\var
FragId} given above.)

The actual work done in {\var ipReassemble} is straightforward; as
stated above, it is mostly bookkeeping. First, the routine extracts
the fields from the IP header that uniquely identify the datagram
being reassembled, constructs a key from these fields, and looks this
key up in {\var fragMap} to find the appropriate {\var FragList}.
Next, it inserts the new fragment into this {\var FragList}.  This
involves comparing the sum of the offset and length of this fragment
with the offset of the next fragment in the list. Some of this work is
done in subroutine {\var hole\_create}, which is given below.  Finally,
{\var ipReassemble} checks to see if all the holes are filled. If all
the fragments are present, it calls the $x$-kernel routine {\var
msgJoin} to actually reassemble the fragments into a whole datagram
and then calls {\var xDemux} to pass this datagram up the protocol
graph.

\begin{verbatim}
static XkReturn
ipReassemble(Sessn s, Msg *dg, IpHdr *hdr, Map fragMap)
{
    FragId   fragid;
    FragList *list;
    FragInfo *fi, *prev;
    Hole     *hole;
    u_short  offset, len;

    /* extract fragmentation info from header and create id for this frag */
    offset = FRAGOFFSET(hdr->frag)*8;
    len = hdr->dlen - GET_HLEN(hdr) * 4;
    bzero((char *)&fragid, sizeof(FragId));
    fragid.source = hdr->source;
    fragid.dest   = hdr->dest;
    fragid.prot   = hdr->prot;
    fragid.ident  = hdr->ident;

    /* find reassembly list for this frag; create one if this none exists */
    if (mapResolve(fragMap, &fragid, (void **)&list) == XK_FAILURE) {
        list = X_NEW(FragList);
        list->binding = mapBind(fragMap, &fragid, list);

        /* initialize list with a single hole spanning the whole datagram */
        list->nholes     = 1;
        list->head.next  = fi = X_NEW(FragInfo);
        fi->next         = 0;
        fi->type         = HOLE;
        fi->u.hole.first = 0;
        fi->u.hole.last  = INFINITE_OFFSET;
    }
    list->gcMark = FALSE;

    prev = &list->head;
    for (fi = prev->next; fi != 0; prev = fi, fi = fi->next) {
        if (fi->type == FRAG)
            continue;
        hole = &fi->u.hole;
        if (offset < hole->last && offset + len > hole->first) {
            /* check to see if frag overlaps previously received frags */
            if (offset < hole->first) {
                /* truncate message from left */
                msgPop(dg, hole->first - offset);
                offset = hole->first;
            }
            if (offset + len > hole->last) {
                /* truncate message from right */
                msgTruncate(dg, hole->last - offset);
                len = hole->last - offset;
            }

            /* now check to see if new hole(s) need to be made */
            if (offset + len < hole->last && hdr->frag & MOREFRAGMENTS) {
                /* creating new hole above */
                hole_create(prev, fi, (offset+len), hole->last);
                list->nholes++;
            }
            if (offset > hole->first) {
                /* creating new hole below */
                hole_create(fi, fi->next, hole->first, (offset));
                list->nholes++;
            }

            /* change this FragInfo structure to be an FRAG */
            list->nholes--;
            fi->type = FRAG;
            msgConstructCopy(&fi->u.frag, dg);
            break;
        } /* if found a hole */
    } /* for loop */

    /* check to see if we're done, and if so, pass datagram up */
    if (list->nholes == 0)
    {
        Msg fullMsg;

        /* now have a full datagram */
        msgConstructEmpty(&fullMsg);
        for (fi = list->head.next; fi != 0; fi = fi->next)
            msgJoin(&fullMsg, &fi->u.frag, &fullMsg);
        mapRemoveBinding(fragMap, list->binding);
        ipFreeFragList(list);
        xDemux(xGetUp(s), s, &fullMsg);
        msgDestroy(&fullMsg);
    }
    return XK_SUCCESS;
}
\end{verbatim}

Subroutine {\var hole\_create} creates a new hole in the fragment list
that begins at offset {\var first} and continues to offset {\var
last}. It makes use of the $x$-kernel utility {\var X\_NEW}, which
creates an instance of the given structure.

\begin{verbatim}
static int
hole_create(FragInfo *prev, FragInfo *next, u_int first, u_int last)
{
    FragInfo *fi;

    /* creating new hole from first to last */
    fi = X_NEW(FragInfo);
    fi->type         = HOLE;
    fi->u.hole.first = first;
    fi->u.hole.last  = last;
    fi->next         = next;
    prev->next       = fi;
}
\end{verbatim}

Finally, note that these routines do not capture the entire picture of
reassembly. What is not shown is a background process ($x$-kernel
event) that periodically checks to see if there has been any recent
activity on this datagram (it looks at field {\var gcMark}), and if
not, deletes the corresponding {\var FragList}. IP does not attempt to
recover from the situation where one or more of the fragments does not
arrive; it simply gives up and reclaims the memory that was being used
for reassembly.

\subsection{Synchrony, Timeouts, and Blocking}

We now show an example of a hybrid protocol, named CHAN, that turns an
underlying asynchronous communication service into a synchronous
communication service. That is, CHAN supports the {\var
xPush/xDemux/xPop} from below, and the {\var
xCall/xCallDemux/xCallPop} from above. CHAN is also interesting
because it illustrates how timeouts are scheduled in the {\xk}, and
how how a protocol blocks, waiting for a reply message.

CHAN supports request/reply channels between a pair of hosts. The
client sends a request message on a channel and blocks waiting for a
reply message.  The server accepts the request message and responds
with a reply message. The protocol defines two key data structures:
{\var ChanHdr} and {\var ChanState}.  Both of these data structures
are defined in a private {\var .h} file (e.g., {\var
chan\_internal.h}), rather than the {\var chan.h} file included by
other protocols.

The fields in {\var ChanHdr} are fairly straightforward.  The {\var
Type} field specifies the type of the message; in this case, the
possible types are {\var REQ, REP, ACK} and {\var PROBE}. The {\var
ProtNum} field identifies the high-level protocol that depends on
CHAN.  The {\var CID} field uniquely identifies the logical channel to
which this message belongs.  This is a 16-bit field, meaning that CHAN
supports up to 64K concurrent request/reply transactions between any
pair of hosts.  The {\var MID} field uniquely identifies each
request/reply pair; the reply message has the same {\var MID} as the
request. Finally, the {\var BID} field gives the {\it boot id} for the
host. A machine's boot id is a number that is incremented each time
the machine reboots; this number is read from disk, incremented, and
written back to disk during the machine's startup procedure. This
number is then put in every message sent by that host.

The fields in {\var ChanState} will be explained by the code that
follows. The one thing to note at this point is that {\var ChanState}
includes a {\var hdr\_template} field, which is a copy of the CHAN
header. Many of the fields in the CHAN header remain the same for all
messages sent out over this channel.  These fields are filled in when
the channel is created (not shown); only the fields that change are
modified before a given message is transmitted.

\begin{verbatim}
typedef struct {
    u_short Type;       /* message type: REQ, REP, ACK, PROBE */
    u_short CID;        /* unique channel id */
    int     MID;        /* unique message id */
    int     BID;        /* unique boot id */
    int     Length;     /* length of message */
    int     ProtNum;    /* high-level protocol number */
} ChanHdr;

typedef struct {
    u_char    type;             /* type of session: CLIENT or SERVER */
    u_char    status;           /* status of session: BUSY or IDLE */
    Event     event;            /* place to save timeout event */
    int       retries;          /* number of times retransmitted */
    int       timeout;          /* timeout value */
    XkReturn  ret_val;          /* place to save return value */
    Msg       *request;         /* place to save request message */
    Msg       *reply;           /* place to save reply message */
    Semaphore reply_sem;        /* semaphore the client blocks on */
    int       mid;              /* message id for this channel */
    int       bid;              /* boot id for this channel */
    ChanHdr   hdr_template;     /* header template for this channel */
} ChanState;
\end{verbatim}

The CHAN-specific implementation of {\var xCall} is given by the
following routine, named {\var chanCall}. The first thing to notice is
that {\var ChanState} includes a field named {\var status} that
indicates whether or not this channel is being used. If the channel is
currently in use, then {\var chanCall} returns failure.

The next thing to notice about {\var chanCall} is that after filling
out the message header and transmitting the request message, the
calling process is blocked on a semaphore ({\var reply\_sem}); {\var
semWait} is the $x$-kernel semaphore operation introduced in
Section~\ref{sec:upi}.  When the reply message eventually arrives, it
is processed by CHAN's {\var xPop} routine (see below), which copies
the reply message into state variable {\var reply}, and signals this
blocked process. The process then returns. Should the reply message
not arrive, then timeout routine {\var retransmit} is called (see
below). This event is scheduled in the body of {\var chanCall}.

\begin{verbatim}
static XkReturn
chanCall(Sessn self, Msg *msg, Msg *rmsg)
{
    ChanState *state = (ChanState *)self->state;
    ChanHdr   *hdr;
    char      *buf;

    /* ensure only one transaction per channel */
    if (state->status != IDLE)
        return XK_FAILURE;
    state->status = BUSY;

    /* save a copy of request msg and pointer to reply msg*/
    msgConstructCopy(&state->request, msg);
    state->reply = rmsg;

    /* fill out header fields */
    hdr = state->hdr_template;
    hdr->Length = msgLength(msg);
    if (state->mid == MAX_MID)
        state->mid = 0;
    hdr->MID = ++state->mid;

    /* attach header to msg and send it */
    buf = msgPush(msg, HDR_LEN);
    chan_hdr_store(hdr, buf, HDR_LEN);
    xPush(xGetDown(self, 0), msg);

    /* schedule first timeout event */
    state->retries = 1;
    state->event   = evSchedule(retransmit, self, state->timeout);

    /* wait for the reply msg */
    semWait(&state->reply_sem);

    /* clean up state and return */
    flush_msg(state->request);
    state->status = IDLE;
    return state->ret_val;
}
\end{verbatim}

The next routine ({\var retransmit}) is called whenever the retransmit
timer fires. It is scheduled for the first time in {\var chanCall},
but each time it is called, it reschedules itself. Once the request
message has been retransmitted four times, CHAN gives up: it sets the
return value to {\var XK\_FAILURE} and wakes up the blocked client
process.  Finally, the reason {\var retransmit} first checks to see if
the event was cancelled is that there is a potential race condition
between when {\var evCancel} is invoked and when the event actually
executes.  Note that each time {\var retransmit} executes and sends
another copy of the request message, it needs to re-save the message
in state variable {\var request}. This is because each time a protocol
calls the {\var xPush} operation on a message, it loses its reference
to the message.

\begin{verbatim}
static void
retransmit(Event ev, int *arg)
{
    Sessn     s = (Sessn)arg;
    ChanState *state = (ChanState *)s->state;
    Msg       tmp;

    /* see if event was cancelled */
    if (evIsCancelled(ev))
        return;

    /* unblock the client process if we have retried 4 times */
    if (++state->retries > 4) {
       state->ret_val = XK_FAILURE;
       semSignal(state->rep_sem);
       return;
    }

    /* retransmit request message */
    msgConstructCopy(&tmp, &state->request);
    xPush(xGetDown(s, 0), &tmp);

    /* reschedule event with exponential backoff */
    evDetach(state->event);
    state->timeout = 2*state->timeout;
    state->event = evSchedule(retransmit, s, state->timeout);
}
\end{verbatim}

CHAN's {\var chanPop} routine is very simple. This is because CHAN is
an asymmetric protocol: the code that implements CHAN on the client
machine is completely distinct from the code that implements CHAN on
the server machine. In fact, any given CHAN session will always be a
purely client session or a purely server session, and this fact is
stored in a session state variable ({\var type}). Thus, all {\var
chanPop} does is check to see whether it is a server session (one that
expects {\var REQ} messages), or a client session (one that expects
{\var REP} messages), and calls the appropriate client- or
server-specific routine. In this case, we show only the
client-specific routine.

\begin{verbatim}
static XkReturn
chanPop(Sessn self, Sessn lls, Msg *msg, void *inHdr)
{
    /* see if this is a CLIENT or SERVER session */
    if (self->state->type == SERVER)
        return(chanServerPop(self, lls, msg, inHdr));
    else
        return(chanClientPop(self, lls, msg, inHdr));
}
\end{verbatim}

The client-specific pop routine ({\var chanClientPop}) is given below.
This routine first checks to see if it has received the expected
message, for example, that it has the right {\var MID}, the right
{\var BID}, and is of type {\var REP} or {\var ACK}. This check is
made in subroutine {\var clnt\_msg\_ok} (not shown). If it is a valid
acknowledgment message, then {\var chanClientPop} cancels the
retransmit timer and schedules the probe timer. The probe timer is not
shown, but would be similar to the retransmit timer given above. If
the message is a valid reply, then {\var chanClientPop} cancels the
retransmit timer, saves a copy of the reply message in state variable
{\var reply}, and wakes up the blocked client process. It is this
client process that actually returns the reply message to the
high-level protocol; the process that called {\var chanClientPop}
simply returns back down the protocol stack.

\begin{verbatim}
static XkReturn
chanClientPop(Sessn self, Sessn lls, Msg *msg, void *inHdr)
{
    ChanState *state = (ChanState *)self->state;
    ChanHdr   *hdr = (ChanHdr *)inHdr;

    /* verify correctness of msg header */
    if (!clnt_msg_ok(state, hdr))
        return XK_FAILURE;

    /* cancel retransmit timeout event */
    evCancel(state->event);

    /* if this is an ACK, then schedule PROBE timer and exit*/
    if (hdr->Type == ACK) {
       state->event = evSchedule(probe, s, PROBE);
       return XK_SUCCESS;
    }

    /* msg must be a REP, so save it and signal blocked client process */
    msgAssign(state->reply, msg);
    state->ret_val = XK_SUCCESS;
    semSignal(&state->reply_sem);

    return XK_SUCCESS;
}
\end{verbatim}

\subsection{Purely Synchronous Protocol}\label{sec:select}

We next show a purely sychronous protocol, called SELECT, that is
typically configured on top of CHAN. SELECT's job is to dispatch
request messages to the appropriate procedure.  What this means is
that on the client side, SELECT is given a procedure number that the
client wants to invoke, puts this number it its header, and invokes a
lower level request/reply protocol like CHAN. When this invocation
returns, SELECT merely lets the return pass on through to the client;
it has no real demultiplexing work to do.  On the server side, SELECT
uses the procedure number it finds in its header to select the right
local procedure to invoke. When this procedure returns, SELECT simply
returns to the low-level protocol that just invoked it.

The $x$-kernel code for {\var selectCall} and {\var selectCallPop} is
given below.  Notice that SELECT is like CHAN in that it is
asymmetric---each session is either a client session or a server
session.  Unlike CHAN, however, SELECT exports the synchronous
interface to both higher-level protocols ({\var xCall}) and
lower-level protocols ({\var xCallDemux} and {\var xCallPop}).

\begin{verbatim}
static XkReturn
selectCall(Sessn self, Msg *msg, Msg *rmsg)
{
    SelectState *state = (SelectState *)self->state;
    char        *buf;

    buf = msgPush(msg, HLEN);
    select_hdr_store(state->hdr, buf, HLEN);
    return xCall(xGetDown(self, 0), msg, rmsg);
}

static XkReturn
selectCallPop(Sessn s, Sessn lls, Msg *msg, Msg *rmsg, void *inHdr)
{
    return xCallDemux(xGetUp(s), s, msg, rmsg);
}
\end{verbatim}

\subsection{Virtual Protocols}\label{virtual}

In the $x$-kernel, we sometimes implement modules called {\it virtual
protocols}. Virtual protocols are configured into a protocol graph
just like any other protocol, but they are different from regular
protocols in that they do not add a header to messages; i.e., they do
not directly communicate with their peer on the other machine.
Instead, virtual protocols serve to ``route messages'' through the
protocol graph.  For example, the virtual protocol VSIZE is configured
on top of some number of low-level protocols (usually two).  VSIZE
takes a message from some high-level protocol, and based on how big
the message is (this can be determined using the $x$-kernel's {\var
msgLength} operation), decides which of the two low-level protocols to
pass the message on to. For example, VSIZE might be configured into
the protocol graph to route large messages (those greater than 1KB) to
BLAST, and small messages (those less than or equal to 1KB) to IP.
The advantage of using a protocol like VSIZE is that it allows
messages that are too small to need fragmenting from incurring the
overhead of yet another protocol layer.

The following shows VSIZE's open routine.  The main thing to notice
about {\var vsizeOpen} is that it passes the {\var hlpType} it was
given from above as the {\var hlpType} to the protocol below it,
rather than using {\var self}. This is because VSIZE wants to
intercept all of the higher-level protocol's messages, and it has no
header of its own to stick this protocol's demux key in.

\begin{verbatim}
static Sessn
vsizeOpen(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    Sessn  s;
    Sessn  lls[VSIZEMAXDOWN];
    Part   savedPart[3];
    PSTATE *pstate = (PSTATE *)self->state;
    int    plen;
    int    i,j;

    /*
     * Save the original participants before opening since we need to
     * use it in both opens and it will get munged in the first open
     */
    plen = partLength(p) * sizeof(Part);
    bcopy((char *)p, (char *)savedPart, plen);

    for (i = 0; i < pstate->numdown; i++) {
        lls[i] = xOpen(self, hlpType, xGetProtlDown(self, i), p);
        if (lls[i] == ERR_SESSN) {
            /* could not open session */
            for (j = 0; j < i; j++)
                xClose(lls[j]);
            return ERR_SESSN;
        }
        bcopy((char *)savedPart, (char *)p, plen);
    }
    if (mapResolve(pstate->activeMap, lls, (void **)&s) == XK_SUCCESS) {
        /* found an existing session */
        for (i = 0; i < pstate->numdown; i++)
            xClose(lls[i]);
    }
    else {
        /* creating a new session */
        if ((s = vsizeCreateSessn(self, hlp, hlpType, lls)) == ERR_SESSN) {
            for (i = 0; i < pstate->numdown; i++)
                xClose(lls[i]);
        }
    }
    return s;
}
\end{verbatim}

VSIZE's push routine is very simple---it checks the size of the
message pushed to it, and calls the appropriate session below it.
The code for {\var vsizePush} follows.

\begin{verbatim}
static XkHandle
vsizePush(Sessn s, Msg *msg)
{
    SSTATE *sstate;
    int    i;

    sstate = (SSTATE *)s->state;
    for (i = 0; i < sstate->numdown-1; i++) {
        if (msgLength(msg) <= sstate->cutoff[i])
            return xPush(xGetSessnDown(s, i), msg);
    }
    return xPush(xGetSessnDown(s, sstate->numdown-1), msg);
}
\end{verbatim}
