/*     
 *     x-kernel v3.3
 *
 *     Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 */

#include <stdio.h>
#include "msp_internal.h"

/* timeout, in microseconds */
static int timeout = MSP_TIMEOUT;

void
msp_init(Protl self)
{
    ProtlState *pstate;
    Protl      llp;
    Part       part;

    getproc_protl(self);

    /* create and initialize protocol state */
    pstate = X_NEW(ProtlState);
    memset((char *)pstate, 0, sizeof(ProtlState));
    self->state = (void *)pstate;
    pstate->activemap  = mapCreate(ACTIVE_MAP_SIZE, sizeof(ActiveId));
    pstate->passivemap = mapCreate(PASSIVE_MAP_SIZE, sizeof(PassiveId));

    /* find lower level protocol and do a passive open on it */
    llp = xGetProtlDown(self, 0);
    if (!xIsProtl(llp))
        Kabort("MSP could not get lower protocol");
    partInit(&part, 1);
    partPush(part, ANY_HOST, 0);
    if (xOpenEnable(self, self, llp, &part) == XK_FAILURE) {
        xTrace0(mspp, TR_ALWAYS,
		"msp_init: openenable on lower protocol failed");
        xFree((char *) pstate);
        exit(1);
    }
}

static void
getproc_protl(Protl p)
{
    /* fill in the function pointers to implement protocol operations */
    p->open         = mspOpen;
    p->openenable   = mspOpenEnable;
    p->demux        = mspDemux;
    p->controlprotl = mspControlProtl;
}

static void
activeOpen(Sessn s)
{
    SessnState *sstate = (SessnState *)s->state;

    /* send SYN, go to SYNSENT, block until ESTABLISHED is reached */
    sendPacket(s, MSP_FLAG_SYN, 0);
    sstate->state = MSP_STATE_SYNSENT;

    /* 
     * make an extra reference to the session so it doesn't
     * go away while we're sleeping
     */
    xDuplicate(s);
    semWait(&sstate->wait);

    /* 
     * bring the reference back down to its natural value 
     * (there should be a better way to do this)
     */
    s->rcnt --;
}

static Sessn
mspOpen(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    ActiveId   key;
    Sessn      msp_s, lls;
    ProtlState *pstate = (ProtlState *)self->state;

    memset((char *)&key, 0, sizeof(key));

    /* high level protocol must specify both local and remote MSP port */
    key.remoteport = *((MSPport *)partPop(p[0]));
    key.localport  = *((MSPport *)partPop(p[1]));

    /* attempt to open session on protocol below this one */
    lls = xOpen(self, self, xGetProtlDown(self, 0), p);
    if (lls != ERR_SESSN) {
        key.lls = lls;
	/* check for this session in the active map */
        if (mapResolve(pstate->activemap, &key, (void **)&msp_s) == XK_FAILURE){
	    /* session wasn't already in map, so initialize it */
            msp_s = mspCreateSessn(self, hlp, hlpType, &key);
            if (msp_s != ERR_SESSN) {
                activeOpen(msp_s);
                return msp_s;
            }
        }
        /* if control makes it this far, an error has occurred */
        xClose(lls);
    }
    return ERR_SESSN;
}

static XkReturn
mspOpenEnable(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    PassiveId  key;
    ProtlState *pstate = (ProtlState *)self->state;
    Enable     *e;

    key = *((MSPport *)partPop(*p));

    /* check if this port has already been openenabled */
    if (mapResolve(pstate->passivemap, &key, (void **)&e) != XK_FAILURE) {
        if (e->hlp == hlp) {
	    /* this port was openenabled previously by the same hlp */
            e->rcnt++;
            return XK_SUCCESS;
        }
	/* this port was openenabled previously by a different hlp - error */
        return XK_FAILURE;
    }

    /* this will be a new enabling, so create and initialize Enable object, */
    /* and enter the binding of port/enable object in the passive map */
    e = X_NEW(Enable);
    e->hlp     = hlp;
    e->hlpType = hlpType;
    e->rcnt    = 1;
    e->binding = mapBind(pstate->passivemap, &key, e);
    if (e->binding == ERR_BIND) {
        xFree((char *)e);
        return XK_FAILURE;
    }
    return XK_SUCCESS;
}


static Sessn
mspCreateSessn(Protl self, Protl hlp, Protl hlpType, ActiveId *key)
{
    Sessn      s;
    ProtlState *pstate = (ProtlState *)self->state;
    SessnState *sstate;
    MSPhdr     *msph;
    XTime      now;

    /* create the session object and initialize it */
    s = xCreateSessn(getproc_sessn, hlp, hlpType, self, 1, &key->lls);
    s->binding = mapBind(pstate->activemap, key, s);
    sstate = X_NEW(SessnState);
    memset(sstate, 0, sizeof(*sstate));
    s->state = (char *)sstate;

    /* create an MSP header template */
    msph = &(sstate->hdr);
    msph->sport = key->localport;
    msph->dport = key->remoteport;

    /* schedule timeout handler to run periodically (it rescheds itself) */
    sstate->event = evSchedule(mspTimeout, s, MSP_TICK);
    sstate->recvbufsize = MSP_DEF_RECVBUFSIZE;
    sstate->state = MSP_STATE_CLOSED;
    semInit(&sstate->wait, 0);

    /* pick an initial sequence number sort of randomly */
    xGetTime(&now);
    sstate->next_send_seq = (now.sec ^ now.usec);
    sstate->lowest_unack = sstate->next_send_seq;
    sstate->effective_window = 0;
    semInit(&sstate->sleep, 0);

    return s;
}

static void
getproc_sessn(Sessn s)
{
    /* fill in the function pointers to implement session operations */
    s->push            = mspPush;
    s->pop             = mspPop;
    s->controlsessn    = mspControlSessn;
    s->getparticipants = mspGetParticipants;
    s->close           = mspClose;
}

static XkReturn
mspDemux(Protl self, Sessn lls, Msg *dg)
{
    char       *buf;
    MSPhdr     h;
    ActiveId   activeid;
    ProtlState *pstate = (ProtlState *)self->state;
    Sessn      s;
    SessnState *sstate;
    PassiveId  passiveid;
    Enable     *e;

    /* extract the header from the message */
    buf = msgPop(dg, HLEN);
    if (buf == NULL)
        return XK_FAILURE;
    mspHdrLoad(&h, buf, HLEN);

    /* construct a demux key from the header */
    bzero((char *)&activeid, sizeof(activeid));
    activeid.lls        = lls;
    activeid.localport  = h.dport;
    activeid.remoteport = h.sport;

    /* see if demux key is in the active map */
    if (mapResolve(pstate->activemap, &activeid, (void **)&s) == XK_FAILURE) {
	/* didn't find an active session, so check passive map */
        passiveid = h.dport;
        if (mapResolve(pstate->passivemap, &passiveid, (void **)&e) ==
	        XK_FAILURE) {
            /* drop the message */
            return XK_SUCCESS;
        }

        /* port was enabled, so create a new session and inform hlp */
        s = mspCreateSessn(self, e->hlp, e->hlpType, &activeid);
        if (s == ERR_SESSN)
            return XK_SUCCESS;
        xDuplicate(lls);

        /* we haven't yet processed the SYN, we're in LISTEN state */
        sstate = (SessnState *)s->state;
        sstate->state = MSP_STATE_LISTEN;
        sstate->passiveEnable = e;
    }

    /* found (or created) an appropriate session, so pop to it */
    return xPop(s, lls, dg, &h);
}

static long
mspHdrLoad(MSPhdr *hdr, char *src, long len)
{
    /* copy from src to hdr, then convert network byte order to host order */
    memcpy(hdr, src, HLEN);
    hdr->sport = ntohs(hdr->sport);
    hdr->dport = ntohs(hdr->dport);
    hdr->flags = ntohl(hdr->flags);
    hdr->len  = ntohs(hdr->len);
    hdr->advwindow = ntohs(hdr->advwindow);
    hdr->seq = ntohs(hdr->seq);
    hdr->ackseq = ntohs(hdr->ackseq);
    return HLEN;
}

static void
mspHdrStore(MSPhdr *hdr, char *dst, long len)
{
    /* convert host byte order to network order, then copy from hdr to dst */
    /* (note: argument 'hdr' is changed by the following code) */
    hdr->sport = htons(hdr->sport);
    hdr->dport = htons(hdr->dport);
    hdr->flags = htonl(hdr->flags);
    hdr->len  = htons(hdr->len);
    hdr->advwindow = htons(hdr->advwindow);
    hdr->seq = htons(hdr->seq);
    hdr->ackseq = htons(hdr->ackseq);
    memcpy(dst, hdr, HLEN);
}

/* enqueue a sent packet for possible retransmission */
static void
sendqEnqueue(sendqelem *queue, Msg *packet, MSPsws seq)
{
    sendqelem *elem = &queue[QIDX(seq)];

    xTrace2(mspp, TR_MAJOR_EVENTS, "sendqEnqueue: seq (%d) slot (%d)",
	seq, QIDX(seq));
    xAssert(!elem->valid);
    elem->valid = 1;
    xGetTime(&elem->last_sent);
    msgConstructCopy(&elem->packet, packet);
}

static void
sendqDequeue(sendqelem *queue, MSPsws seq)
{
    sendqelem *elem = &queue[QIDX(seq)];
    
    xTrace2(mspp, TR_MAJOR_EVENTS, "sendqDequeue: seq (%d) slot (%d)",
	seq, QIDX(seq));
    xAssert(elem->valid);
    elem->valid = 0;
    msgDestroy(&elem->packet);
}

/* enqueue a received packet for reordering */
static void
recvqEnqueue(recvqelem *queue, Msg *packet, MSPsws seq, int fin)
{
    recvqelem *elem = &queue[QIDX(seq)];

    xAssert(!elem->valid);
    elem->valid = 1;
    elem->fin = fin;
    msgConstructCopy(&elem->packet, packet);
}

static void
recvqDequeue(recvqelem *queue, MSPsws seq)
{
    recvqelem *elem = &queue[QIDX(seq)];
    
    xAssert(elem->valid);
    elem->valid = 0;
    msgDestroy(&elem->packet);
}


static XkHandle 
mspPush(Sessn self, Msg *msg)
{
    SessnState *sstate = (SessnState *)self->state;

    /* can only send when we're opened */
    if(sstate->state != MSP_STATE_ESTABLISHED)
        return XK_FAILURE;

    /* wait till we are allowed to send then send */
    if(sstate->effective_window <= 0) {
        sstate->sleepers ++;
        semWait(&sstate->sleep);
    }
    sstate->effective_window --;
    sendPacket(self, MSP_FLAG_DATA, msg);
    return XK_SUCCESS;
}

/*
 * return the diff between two sequence numbers (s1 - s2) 
 * as an unsigned quantity
 */
static int
seqDiff(MSPsws s1, MSPsws s2)
{
    /* make sure the result is unsigned */
    return (s1 - s2) & 0xffff;
}

/* compute the advertised window */
static MSPsws
computeAdvertisedWindow(SessnState *sstate)
{
    int window;

    /* 
     * window = enough sequence numbers to allow
     * 1k packets to fill up recvbuf, minus the amount
     * being consumed by the re-ordering buffers
     */
    window = sstate->recvbufsize / MSP_PACKETSIZE;
    window -= seqDiff(1 + sstate->max_seq_seen, sstate->next_deliver);
    if(window < 0)
        window = 0;
    return (MSPsws) window;    
}

/*
 * send out a packet with the current seq and ackseq for this
 * session, with the given data and flags
 */
static void
sendPacket(Sessn s, int flags, Msg *data)
{
    SessnState *sstate = (SessnState *)s->state;
    Msg packet;
    MSPhdr hdr;


    if(data == 0) {
        msgConstructEmpty(&packet);
    } else {
        msgConstructCopy(&packet, data);
    } 

    hdr = sstate->hdr;
    hdr.len = HLEN + msgLength(&packet);
    hdr.advwindow = computeAdvertisedWindow(sstate);
    hdr.flags = flags;
    hdr.seq = sstate->next_send_seq;
    hdr.ackseq = sstate->next_deliver;

    /* 
     * SYN, FIN and data packets consume sequence space and can
     * be retransmitted.  enqueue them and up the seq number
     */
    mspHdrStore(&hdr, msgPush(&packet, HLEN), HLEN);
    if(data || (flags & (MSP_FLAG_SYN | MSP_FLAG_FIN))) {
        sendqEnqueue(sstate->sendq, &packet, sstate->next_send_seq);
        sstate->next_send_seq ++;
    }

    /* send it on its way */
    xTrace0(mspp, TR_MAJOR_EVENTS, "send packet"); 
    xPush(xGetSessnDown(s, 0), &packet);
}


/* send a reset packet,  kill the session and return success */
static XkReturn
reset(Sessn s)
{
    xTrace1(mspp, TR_MAJOR_EVENTS, "reset: in state %d, sending RST",
	    ((SessnState *)s->state)->state);
    sendPacket(s, MSP_FLAG_RST, 0);
    shut_down(s);
    return XK_SUCCESS;
}

static void
opened(Sessn s)
{
    SessnState *sstate = (SessnState *)s->state;

    if(sstate->passiveEnable) {
        /* passive open, upcall to notify open is complete */
        s->rcnt --;  /* undo previous xDuplicate */
        xOpenDone(sstate->passiveEnable->hlp, xMyProtl(s), s);
        sstate->passiveEnable = 0;
    }

    /* awake anyone waiting for the open to complete */
    semSignal(&sstate->wait);
}

static XkReturn
mspPop(Sessn self, Sessn lls, Msg *msg, void *hdr)
{
    MSPhdr *h = (MSPhdr *)hdr;
    SessnState *sstate = (SessnState *)self->state;

    xTrace0(mspp, TR_MAJOR_EVENTS, "msgPop: got packet");

    /* first check if this packet is a reset */
    if(h->flags & MSP_FLAG_RST) {
        /* shut down immediately, dont send a response */
        shut_down(self);
        return XK_SUCCESS;
    }

    /* per state processing minus reset handling */
    switch(sstate->state) {
    case MSP_STATE_CLOSED:
        /* error - no opened connection */
        return reset(self);

    case MSP_STATE_LISTEN:
        if(h->flags == MSP_FLAG_SYN) {
            /* got SYN, send SYN+ACK and go to SYNRECVD */
            xDuplicate(self);
            sstate->max_seq_seen = h->seq;
            sstate->next_deliver = h->seq + 1;
            sstate->state = MSP_STATE_SYNRECVD;
            sendPacket(self, MSP_FLAG_SYN | MSP_FLAG_ACK, 0);
        } else {
            return reset(self);
        }
        break;

    case MSP_STATE_SYNSENT:
        if(h->flags == (MSP_FLAG_SYN | MSP_FLAG_ACK)) {
            /* got SYN+ACK, ACK and go to ESTABLISHED */
            if(h->ackseq != sstate->next_send_seq) {
                xTrace0(mspp, TR_MAJOR_EVENTS, "mspPop: Bad SYN+ACK sequence");
                return reset(self);
            }
            sstate->max_seq_seen = h->seq;
            sstate->next_deliver = h->seq + 1;
            processAck(self, h->ackseq, h->advwindow);
            sstate->state = MSP_STATE_ESTABLISHED;
            opened(self);
            sendPacket(self, MSP_FLAG_ACK, 0);
        } else {
            return reset(self);
        }
        break;

    case MSP_STATE_SYNRECVD:
        if(h->flags == MSP_FLAG_ACK) {
            /* got ACK, go to ESTABLISHED */
            if(h->ackseq != sstate->next_send_seq) {
                xTrace0(mspp, TR_MAJOR_EVENTS, 
			"mspPop: Bad ACK sequence for SYN");
                return reset(self);
            }
            processAck(self, h->ackseq, h->advwindow);
            sstate->state = MSP_STATE_ESTABLISHED;
            opened(self);
        } else {
            /* ignore */
        }
        break;

    case MSP_STATE_ESTABLISHED:
        if(h->flags == MSP_FLAG_ACK) {
            /* ack data */
            processAck(self, h->ackseq, h->advwindow);
        } else if(h->flags == MSP_FLAG_PERS) {
            /* got PERS, send an up-to-date ack */
            sendPacket(self, MSP_FLAG_ACK, 0);
        } else if(h->flags == MSP_FLAG_DATA || h->flags == MSP_FLAG_FIN) {
            /* data or FIN */
            processData(self, h, msg);
        } else {
            /* ignore */
            xTrace1(mspp, TR_MAJOR_EVENTS, 
		    "mspPop: bad flags in established (%x); ignoring", 
		    h->flags);
        }
        break;

    case MSP_STATE_FIN1:
        if(h->flags == MSP_FLAG_ACK) {
            /* data or FIN ack, if FIN ack go to FINWAIT2 */
            processAck(self, h->ackseq, h->advwindow);
            if(h->ackseq == sstate->fin_seq) {
                sstate->state = MSP_STATE_FIN2;
            }
        } else if(h->flags == MSP_FLAG_DATA || h->flags == MSP_FLAG_FIN) {
            /* data or FIN */
            processData(self, h, msg);
        } else {
            return reset(self);
        }
        break;

    case MSP_STATE_FIN2:
        if(h->flags == MSP_FLAG_DATA || h->flags == MSP_FLAG_FIN) {
            /* data or FIN */
            processData(self, h, msg);
        } else {
            return reset(self);
        }
        break;

    case MSP_STATE_CLOSING:
        if(h->flags == MSP_FLAG_ACK) {
            /* ack, if acks our FIN then go to timewait */
            processAck(self, h->ackseq, h->advwindow);
            if(h->ackseq == sstate->fin_seq) {
                sstate->timer = MSP_TIMEWAIT_TIMER;
                sstate->state = MSP_STATE_TIMEWAIT;
            }
        } else {
            return reset(self);
        }
        break;

    case MSP_STATE_CLOSEWAIT:
        if(h->flags == MSP_FLAG_ACK) {
            /* data acks */
            processAck(self, h->ackseq, h->advwindow);
        } else {
            return reset(self);
        }
        break;

    case MSP_STATE_LASTACK:
        if(h->flags == MSP_FLAG_ACK) {
            /* ack, if acks our FIN then go to closed */
            processAck(self, h->ackseq, h->advwindow);
            if(h->ackseq == sstate->fin_seq) {
                semSignal(&sstate->wait);
                sstate->state = MSP_STATE_CLOSED;
            }
        } else {
            return reset(self);
        }
        break;

    case MSP_STATE_TIMEWAIT:
        /* illegal packet after close */
        return reset(self);
        break;

    default:
        xAssert("Bad State!" == 0);
    }

    return XK_SUCCESS;
}


static void
processAck(Sessn s, MSPsws ackseq, MSPsws advwindow)
{
    SessnState *sstate = (SessnState *)s->state;
    int seqdiff, unackdiff, x;
    MSPsws seq;

    /* in order if: lowest_unack <= ackseq <= next_send_seq */
    seqdiff = seqDiff(ackseq, sstate->lowest_unack);
    unackdiff = seqDiff(sstate->next_send_seq, sstate->lowest_unack);

    if(seqdiff > unackdiff) {
        /* out of our send window, drop */
        xTrace4(mspp, TR_MAJOR_EVENTS, 
		"processACK: Ack not in window: %d %d (%d: %d)", 
		sstate->lowest_unack, sstate->next_send_seq, unackdiff, 
		seqdiff);
        return;
    }

    /* cancel all retransmissions for acknowledged packets */
    for(seq = sstate->lowest_unack; ; seq ++) {
        if(seq == ackseq)
            break;
        xTrace1(mspp, TR_MAJOR_EVENTS, "processACK: cancel retrans for %d",
		seq); 
        sendqDequeue(sstate->sendq, seq);
    }
    sstate->lowest_unack = ackseq;

    /* compute effective_window from advwindow */
    sstate->effective_window = advwindow - 
        seqDiff(sstate->next_send_seq, sstate->lowest_unack);
    if(sstate->effective_window > 0) {
        /* awake anyone waiting to send */
        x = (sstate->sleepers < sstate->effective_window) ?
            sstate->sleepers : sstate->effective_window;
        while(x--)
            semSignal(&sstate->sleep);
    }
}


/* handle non-ACK packets in ESTABLISHED state (data and FINs) */
static void
processData(Sessn s, MSPhdr *h, Msg *msg)
{
    SessnState *sstate = (SessnState *)s->state;
    int seqdiff, maxdiff;
    MSPsws seq;

    /* 
     * in window if: 
     *  next_deliver <= seq <= next_deliver + (recvbuf / MSP_PACKETSIZE) 
     */
    seqdiff = seqDiff(h->seq, sstate->next_deliver);
    maxdiff = sstate->recvbufsize / MSP_PACKETSIZE;
    if(seqdiff > maxdiff) {
        /* out of window, drop but still send an ack w/ prev ack seq */
        xTrace0(mspp, TR_MAJOR_EVENTS, "processData: data not in window");
        sendPacket(s, MSP_FLAG_ACK, 0);
        return;
    }

    /* see if this is our new max_seq_seen */
    seqdiff = seqDiff(h->seq, sstate->max_seq_seen);
    if(seqdiff < MSP_MAX_WINDOW)
        sstate->max_seq_seen = h->seq;

    /* truncate message to length shown in header */
    if (h->len - HLEN < msgLength(msg))
        msgTruncate(msg, (int)(h->len - HLEN));

    /* deliver all in-order packets */
    recvqEnqueue(sstate->recvq, msg, h->seq, 
                 (h->flags & MSP_FLAG_FIN) ? 1 : 0);
    for(seq = sstate->next_deliver; RECVQ(sstate, seq).valid; seq++) {
        if(RECVQ(sstate, seq).fin) {
            if(sstate->state == MSP_STATE_ESTABLISHED) {
                sstate->state = MSP_STATE_CLOSEWAIT;
            } else if(sstate->state == MSP_STATE_FIN1) {
                sstate->state = MSP_STATE_CLOSING;
            } else if(sstate->state == MSP_STATE_FIN2) {
                sstate->timer = MSP_TIMEWAIT_TIMER;
                sstate->state = MSP_STATE_TIMEWAIT;
            } else
                xAssert("got fin in illegal state" == 0);
        } else {
            xTrace2(mspp, TR_MAJOR_EVENTS, 
		    "processData: deliver %d (queue %d)", seq, QIDX(seq));

            /* by passing the packet up we are consuming recv buffer */
            sstate->recvbufsize -= MSP_PACKETSIZE;
            xDemux(xGetUp(s), s, &RECVQ(sstate, seq).packet);
        }
        recvqDequeue(sstate->recvq, seq);
    }
    sstate->next_deliver = seq;

    /* send back an ACK, reguardless if anything new was delivered */
    sendPacket(s, MSP_FLAG_ACK, 0);
}


static int
mspControlProtl(Protl self, int opcode, char *buf, int len)
{
    switch (opcode) {
        case GETMAXPACKET:
        case GETOPTPACKET:
            checkLen(len, sizeof(int));
            if (xControlProtl(xGetProtlDown(self, 0), opcode, buf, len) <
		    sizeof(int))
                return -1;
            *(int *)buf -= HLEN;
            return sizeof(int);
        default:
            return xControlProtl(xGetProtlDown(self, 0), opcode, buf, len);
    }
}

static int
mspControlSessn(Sessn self, int opcode, char *buf, int len)
{
    SessnState *sstate = (SessnState *)self->state;
    MSPhdr     *hdr;

    hdr = &(sstate->hdr);
    switch (opcode) {

        case MSP_SETRCVBUFSIZE:
	    checkLen(len, sizeof(MSPrcvbufsize));
	    sstate->recvbufsize = *(MSPrcvbufsize *)buf;
	    xAssert(sstate->recvbufsize <= MSP_DEF_RECVBUFSIZE);
	    return sstate->recvbufsize;

        case GETMYPROTO:
            checkLen(len, sizeof(long));
            *(long *)buf = sstate->hdr.sport;
            return sizeof(long);
        case GETPEERPROTO:
            checkLen(len, sizeof(long));
            *(long *)buf = sstate->hdr.dport;
            return sizeof(long);
        case GETMAXPACKET:
        case GETOPTPACKET:
            checkLen(len, sizeof(int));
            if (xControlSessn(xGetSessnDown(self, 0), opcode, buf, len) <
		    sizeof(int))
                return -1;
            *(int *)buf -= HLEN;
            return sizeof(int);
        default:
            return xControlSessn(xGetSessnDown(self, 0), opcode, buf, len);
    }
}

static Part *
mspGetParticipants(Sessn self)
{
    Part       *p;
    int        numParts;
    SessnState *sstate = (SessnState *)self->state;
    long       localPort, remotePort;

    p = xGetParticipants(xGetSessnDown(self, 0));
    if (!p)
        return NULL;
    numParts = partLength(p);
    if (numParts > 0 && numParts <= 2) {
        if (numParts == 2) {
            localPort = (long)sstate->hdr.sport;
            partPush(p[1], (void *)&localPort, sizeof(long));
        }
        remotePort = (long)sstate->hdr.dport;
        partPush(p[0], (void *)&remotePort, sizeof(long));
	return p;
    }
    else    /* Bad number of participants */
        return NULL;
}

/* free up any resources allocated to a session */
static void
shut_down(Sessn s)
{
     ProtlState *pstate = (ProtlState *)xMyProtl(s)->state;
     SessnState *sstate = (SessnState *)s->state;

     xTrace0(mspp, TR_MAJOR_EVENTS, "shut_down!");

     /* cancel the timeout event */
     evCancel(sstate->event);

     /* remove this session from the active map */
     mapRemoveBinding(pstate->activemap, s->binding);

     /* close the lower level session on which it depends */
     xClose(xGetSessnDown(s, 0));

     /* de-allocate the session object itself */
     xFree(s->state);
     xDestroySessn(s);
}

static XkReturn
mspClose(Sessn s)
{
    SessnState *sstate = (SessnState *)s->state;

   
    xTrace1(mspp, TR_MAJOR_EVENTS, "mspClose: close session in state %d", 
	    sstate->state);
    switch(sstate->state) {
    case MSP_STATE_CLOSED:
        /* close for real */
        shut_down(s);
        return XK_SUCCESS;

    case MSP_STATE_ESTABLISHED:
        sendPacket(s, MSP_FLAG_FIN, 0);
        sstate->fin_seq = sstate->next_send_seq;
        sstate->state = MSP_STATE_FIN1;
        xTrace1(mspp, TR_MAJOR_EVENTS, "mspClose: fin seq %d", 
		(unsigned short)sstate->fin_seq);
        break;
    case MSP_STATE_CLOSEWAIT:
        sendPacket(s, MSP_FLAG_FIN, 0);
        sstate->fin_seq = sstate->next_send_seq;
        sstate->state = MSP_STATE_LASTACK; 
        xTrace1(mspp, TR_MAJOR_EVENTS, "mspClose: fin seq %d", 
		(unsigned short)sstate->fin_seq);
        break;
    default:
        xTrace1(mspp, TR_MAJOR_EVENTS, 
		"mspClose: tried to close in bad state: %d", 
		sstate->state);
        return XK_SUCCESS;
    }

    /* we're not really closing yet.  wait till we really close */
    xDuplicate(s);
    xTrace0(mspp, TR_MAJOR_EVENTS, "mspClose: waiting for real close");
    semWait(&sstate->wait);
    return XK_SUCCESS;
}


/*
 * timer function
 *	handles both retransmissions and persists
 */
static void
mspTimeout(Event e, void *arg)
{
    static int ptick = MSP_PERSTICKS;
    Sessn s = (Sessn)arg;
    SessnState *sstate = (SessnState *)s->state;
    XTime now, diff;
    Msg retrans_msg;
    int i;
    MSPsws seq;

    /* reschedule the timout function */
    sstate->event = evSchedule(mspTimeout, arg, MSP_TICK);

    xGetTime(&now);

    /* 
     * search sendq (from lowest_unack to next_send_seq)
     * for packets needing retransmission 
     */
    for(seq = sstate->lowest_unack; seq != sstate->next_send_seq; seq++) {
        i = QIDX(seq);
        if(SENDQ(sstate, i).valid) {
            xSubTime(&diff, now, SENDQ(sstate, i).last_sent);
            if(diff.sec > 0 || diff.usec > timeout) {
                /* retransmit packet */
                xTrace2(mspp, TR_MAJOR_EVENTS, "mspTimeout: diff time %d:%d", 
			diff.sec, diff.usec);
                xTrace1(mspp, TR_MAJOR_EVENTS, "mspTimeout: retransmit %d", i); 
                msgConstructCopy(&retrans_msg, &SENDQ(sstate, i).packet);
                SENDQ(sstate, i).last_sent = now;
                xPush(xGetSessnDown(s, 0), &retrans_msg);
            }
        }
    }

    if(sstate->state == MSP_STATE_ESTABLISHED && ptick-- <= 0) {
        /* time to check if a persist packet is needed */
        ptick = MSP_PERSTICKS;
        if(sstate->effective_window <= 0) {
            /*
             * we cant send any new data - send a persist to make the
             * remote generate window-size information.
             */
            sendPacket(s, MSP_FLAG_PERS, 0);
        }
    }

    /* timeout TIMEWAIT sessions and transition to CLOSED */
    if(sstate->state == MSP_STATE_TIMEWAIT && sstate->timer-- <= 0) {
        xTrace0(mspp, TR_MAJOR_EVENTS, 
		"mspTimeout: TIMEWAIT timer expired - go to CLOSED");
        sstate->state = MSP_STATE_CLOSED;
        semSignal(&sstate->wait);
    }
}

