/*     
 *     x-kernel v3.3
 *
 *    Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 */

#include "swp_internal.h"

void
swp_init(Protl self)
{
    ProtlState *pstate;
    Protl      llp;
    Part       part;

    getproc_protl(self);

    /* create and initialize protocol state */
    pstate = X_NEW(ProtlState);
    memset((char *)pstate, 0, sizeof(ProtlState));
    self->state = (void *)pstate;
    pstate->activemap  = mapCreate(ACTIVE_MAP_SIZE, sizeof(ActiveId));
    pstate->passivemap = mapCreate(PASSIVE_MAP_SIZE, sizeof(PassiveId));

    /* find lower level protocol and do a passive open on it */
    llp = xGetProtlDown(self, 0);
    if (!xIsProtl(llp))
        Kabort("SWP could not get lower protocol");
    partInit(&part, 1);
    partPush(part, ANY_HOST, 0);
    if (xOpenEnable(self, self, llp, &part) == XK_FAILURE) {
        xTrace0(swpp, TR_ALWAYS,
		"swp_init: openenable on lower protocol failed");
        xFree((char *) pstate);
        exit(1);
        return;
    }
}

static void
getproc_protl(Protl p)
{
    /* fill in the function pointers to implement protocol operations */
    p->open         = swpOpen;
    p->openenable   = swpOpenEnable;
    p->demux        = swpDemux;
    p->controlprotl = swpControlProtl;
}


static Sessn
swpOpen(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    ActiveId   key;
    Sessn      swp_s, lls;
    ProtlState *pstate = (ProtlState *)self->state;

    memset((char *)&key, 0, sizeof(key));

    /* high level protocol must specify both local and remote SWP port */
    key.remoteport = *((SWPport *)partPop(p[0]));
    key.localport  = *((SWPport *)partPop(p[1]));

    /* attempt to open session on protocol below this one */
    lls = xOpen(self, self, xGetProtlDown(self, 0), p);
    if (lls != ERR_SESSN) {
        key.lls = lls;
	/* check for this session in the active map */
        if (mapResolve(pstate->activemap, &key, (void **)&swp_s) == XK_FAILURE){
	    /* session wasn't already in map, so initialize it */
            swp_s = swpCreateSessn(self, hlp, hlpType, &key);
            if (swp_s != ERR_SESSN)    /* A successful open! */
                return swp_s;
        }
        /* if control makes it this far, an error has occurred */
        xClose(lls);
    }
    return ERR_SESSN;
}

static XkReturn
swpOpenEnable(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    PassiveId  key;
    ProtlState *pstate = (ProtlState *)self->state;
    Enable     *e;

    key = *((SWPport *)partPop(*p));

    /* check if this port has already been openenabled */
    if (mapResolve(pstate->passivemap, &key, (void **)&e) != XK_FAILURE) {
        if (e->hlp == hlp) {
	    /* this port was openenabled previously by the same hlp */
            e->rcnt++;
            return XK_SUCCESS;
        }
	/* this port was openenabled previously by a different hlp - error */
        return XK_FAILURE;
    }

    /* this will be a new enabling, so create and initialize Enable object, */
    /* and enter the binding of port/enable object in the passive map */
    e = X_NEW(Enable);
    e->hlp     = hlp;
    e->hlpType = hlpType;
    e->rcnt    = 1;
    e->binding = mapBind(pstate->passivemap, &key, e);
    if (e->binding == ERR_BIND) {
        xFree((char *)e);
        return XK_FAILURE;
    }
    return XK_SUCCESS;
}

/* 
 * set the window size of a session 
 * 
 * Must not be called with an invalid window size or after
 * data is in transit.
 */
static void
setWindowSize(SessnState *sstate, int windowsize)
{
    int seqsize;

    /* free any previous values */
    if(sstate->sendq)
        xFree((char *)sstate->sendq);
    if(sstate->recvq)
        xFree((char *)sstate->recvq);

    /* pick a liberal size for the sequence space */
    seqsize = windowsize * 4;
    semInit(&sstate->canSend, windowsize);
    sstate->windowsize = windowsize;
    sstate->seqsize = seqsize;
    sstate->hdr.windowsize = windowsize;
    xTrace2(swpp, TR_MAJOR_EVENTS,
	    "setWindowSize: set window %d, seqsize %d", windowsize,
	    sstate->seqsize);

    /* allocate queues */
    sstate->sendq = (sendqelem *) xMalloc(seqsize * sizeof(sendqelem));
    memset(sstate->sendq, 0, seqsize * sizeof(sendqelem));
    sstate->recvq = (recvqelem *) xMalloc(seqsize * sizeof(recvqelem));
    memset(sstate->recvq, 0, seqsize * sizeof(recvqelem));
}

static Sessn
swpCreateSessn(Protl self, Protl hlp, Protl hlpType, ActiveId *key)
{
    Sessn      s;
    ProtlState *pstate = (ProtlState *)self->state;
    SessnState *sstate;
    SWPhdr     *swph;

    /* create the session object and initialize it */
    s = xCreateSessn(getproc_sessn, hlp, hlpType, self, 1, &key->lls);
    s->binding = mapBind(pstate->activemap, key, s);
    sstate = X_NEW(SessnState);
    memset(sstate, 0, sizeof(*sstate));
    s->state = (char *)sstate;

    /* create an SWP header template */
    swph = &(sstate->hdr);
    swph->sport = key->localport;
    swph->dport = key->remoteport;

    setWindowSize(sstate, SWP_DEF_WINDOW);

    /* schedule timeout handler to run periodically (it rescheds itself) */
    sstate->event = evSchedule(swpTimeout, sstate, SWP_TICK);

    return s;
}

static void
getproc_sessn(Sessn s)
{
    /* fill in the function pointers to implement session operations */
    s->push            = swpPush;
    s->pop             = swpPop;
    s->controlsessn    = swpControlSessn;
    s->getparticipants = swpGetParticipants;
    s->close           = swpClose;
}

static XkReturn
swpDemux(Protl self, Sessn lls, Msg *dg)
{
    char       *buf;
    SWPhdr     h;
    ActiveId   activeid;
    ProtlState *pstate = (ProtlState *)self->state;
    Sessn      s;
    PassiveId  passiveid;
    Enable     *e;

    /* extract the header from the message */
    buf = msgPop(dg, HLEN);
    if (buf == NULL)
        return XK_FAILURE;
    swpHdrLoad(&h, buf, HLEN);

    /* construct a demux key from the header */
    bzero((char *)&activeid, sizeof(activeid));
    activeid.lls        = lls;
    activeid.localport  = h.dport;
    activeid.remoteport = h.sport;

    /* see if demux key is in the active map */
    if (mapResolve(pstate->activemap, &activeid, (void **)&s) == XK_FAILURE) {
	/* didn't find an active session, so check passive map */
        passiveid = h.dport;
        if (mapResolve(pstate->passivemap, &passiveid, (void **)&e) ==
	        XK_FAILURE) {
            /* drop the message */
            return XK_SUCCESS;
        }

        /* port was enabled, so create a new session and inform hlp */
        s = swpCreateSessn(self, e->hlp, e->hlpType, &activeid);
        if (s == ERR_SESSN)
            return XK_SUCCESS;
        xDuplicate(lls);
        xOpenDone(e->hlp, self, s);
    }

    /* found (or created) an appropriate session, so pop to it */
    return xPop(s, lls, dg, &h);
}

static long
swpHdrLoad(SWPhdr *hdr, char *src, long len)
{
    /* copy from src to hdr, then convert network byte order to host order */
    memcpy(hdr, src, HLEN);
    hdr->ulen  = ntohs(hdr->ulen);
    hdr->sport = ntohs(hdr->sport);
    hdr->dport = ntohs(hdr->dport);
    hdr->seq = ntohs(hdr->seq);
    hdr->windowsize = ntohs(hdr->windowsize);
    hdr->flags = ntohs(hdr->flags);
    return HLEN;
}

static void
swpHdrStore(SWPhdr *hdr, char *dst, long len)
{
    /* convert host byte order to network order, then copy from hdr to dst */
    /* (note: argument 'hdr' is changed by the following code) */
    hdr->ulen  = htons(hdr->ulen);
    hdr->sport = htons(hdr->sport);
    hdr->dport = htons(hdr->dport);
    hdr->seq = htons(hdr->seq);
    hdr->windowsize = htons(hdr->windowsize);
    hdr->flags = htons(hdr->flags);
    memcpy(dst, hdr, HLEN);
}

static void
sendqEnqueue(sendqelem *queue, Sessn lls, Msg *packet, SWPsws seq)
{
    sendqelem *elem = &queue[seq];

    xAssert(!elem->valid);
    elem->valid = 1;
    xGetTime(&elem->last_sent);
    elem->lls = lls;
    msgConstructCopy(&elem->packet, packet);
}

static void
sendqDequeue(sendqelem *queue, SWPsws seq)
{
    sendqelem *elem = &queue[seq];

    xAssert(elem->valid);
    elem->valid = 0;
    msgDestroy(&elem->packet);
}

static void
recvqEnqueue(recvqelem *queue, Msg *packet, SWPsws seq)
{
    recvqelem *elem = &queue[seq];

    xAssert(!elem->valid);
    elem->valid = 1;
    msgConstructCopy(&elem->packet, packet);
}

static void
recvqDequeue(recvqelem *queue, SWPsws seq)
{
    recvqelem *elem = &queue[seq];

    xAssert(elem->valid);
    elem->valid = 0;
    msgDestroy(&elem->packet);
}

static XkHandle swpPush(Sessn self, Msg *msg)
{
    Sessn lls;
    SessnState *sstate = (SessnState *)self->state;
    SWPhdr     hdr;
    char       *buf;

    /* wait till we are allowed to send */
    semWait(&sstate->canSend);

    /* create a header from the template and place it on the packet */
    hdr = sstate->hdr;
    hdr.ulen = msgLength(msg) + HLEN;
    hdr.flags = 0;
    hdr.seq = sstate->next_seq;
    buf = msgPush(msg, HLEN);
    swpHdrStore(&hdr, buf, HLEN);

    /* send packet and save a copy for retransmission */
    xTrace1(swpp, TR_MAJOR_EVENTS,
	    "swpPush: send sequence %d", sstate->next_seq);
    lls = xGetSessnDown(self, 0);
    sendqEnqueue(sstate->sendq, lls, msg, sstate->next_seq);

    /* update state */
    sstate->inuse = 1;
    sstate->next_seq = (sstate->next_seq + 1) % sstate->seqsize;

    return xPush(lls, msg);
}

/* return s1 - s2 */
static int
sequenceDiff(SWPsws s1, SWPsws s2, int size)
{
    int ret;

    ret = (s1 - s2) % size;
    if(ret < 0)
        ret += size;
    return ret;
}

static void
sendAck(SessnState *sstate, Sessn lls, int seq)
{
    Msg ack_msg;
    SWPhdr ackhdr;
    char *buf;
    
    xTrace1(swpp, TR_MAJOR_EVENTS, "senAck: ack %d", seq);
    msgConstructEmpty(&ack_msg);
    ackhdr = sstate->hdr;
    ackhdr.ulen = HLEN;
    ackhdr.flags = SWP_FLAG_ACK;
    ackhdr.seq = seq;
    buf = msgPush(&ack_msg, HLEN);
    swpHdrStore(&ackhdr, buf, HLEN);

    xPush(lls, &ack_msg);
}

static XkReturn
swpPop(Sessn self, Sessn lls, Msg *msg, void *hdr)
{
    Protl      hlp;
    SWPhdr *h = (SWPhdr *)hdr;
    SessnState *sstate = (SessnState *)self->state;
    int seqdiff, seq, lastseq, unackdiff;

    if(h->windowsize < 0 || h->windowsize > SWP_MAX_WINDOW) {
        /* bad window size, drop */
        xTrace0(swpp, TR_MAJOR_EVENTS, "swpPop: bad window");
        return XK_FAILURE;
    }

    /* set window size according to receivers window on first packet */
    if(!sstate->inuse) {
        xTrace0(swpp, TR_MAJOR_EVENTS, 
		"swpPop: receiver using senders windowsize");
        setWindowSize(sstate, h->windowsize);
        sstate->inuse = 1;
    }

    if(sstate->windowsize != h->windowsize) {
        /* window size changed, drop */
        xTrace0(swpp, TR_MAJOR_EVENTS, "swpPop: window changed");
        return XK_FAILURE;
    }
    if(h->seq < 0 || h->seq >= sstate->seqsize) {
        /* sequence number invalid, drop */
        xTrace0(swpp, TR_MAJOR_EVENTS, "swpPop: invalid seq_num");
        return XK_FAILURE;
    }

    if(h->flags & SWP_FLAG_ACK) {
        /* Ack Packet */
        xTrace1(swpp, TR_MAJOR_EVENTS, "swpPop: got ack %d", h->seq);

        /* in order if: lowest_unack <= seq < next_seq */
        seqdiff = sequenceDiff(h->seq, sstate->lowest_unack, sstate->seqsize);
        unackdiff = sequenceDiff(sstate->next_seq, sstate->lowest_unack,
                                 sstate->seqsize);
        if(seqdiff >= unackdiff) {
            /* out of our send window, drop */
	    xTrace4(swpp, TR_MAJOR_EVENTS, 
		    "swpPop: not in window %d %d (%d: %d)", 
		    sstate->lowest_unack, sstate->next_seq, unackdiff, 
		    seqdiff);
            return XK_FAILURE;
        }

        /* cancel all retransmissions for acknowledged packets */
        for(seq = sstate->lowest_unack; ; seq = (seq + 1) % sstate->seqsize) {
            xTrace1(swpp, TR_MAJOR_EVENTS, "swpPop: cancel retrans for %d", 
		    seq);
            sendqDequeue(sstate->sendq, seq);
            semSignal(&sstate->canSend);
            if(seq == h->seq)
                break;
        }
        seq = (seq + 1) % sstate->seqsize;
        sstate->lowest_unack = seq;
    } else {
        /* Data Packet */
        xTrace1(swpp, TR_MAJOR_EVENTS, "swpPop: data packet %d", h->seq);

        /* in order if: next_ack <= seq < next_ack + windowsize */
        seqdiff = sequenceDiff(h->seq, sstate->next_ack, sstate->seqsize);
        if(seqdiff >= sstate->windowsize) {
            /*
             * out of our receive window, drop but resend our
             * last acknowledgement (next_ack - 1)
             */
            seqdiff = sequenceDiff(sstate->next_ack, 1, sstate->seqsize);
            sendAck(sstate, lls, seqdiff);
            xTrace0(swpp, TR_MAJOR_EVENTS, "swpPop: not in window");
            return XK_FAILURE;
        }

        /* truncate message to length shown in header */
        if (h->ulen - HLEN < msgLength(msg))
            msgTruncate(msg, (int)h->ulen);

        /* enqueue packet */
        recvqEnqueue(sstate->recvq, msg, h->seq);

        /* deliver all in-order packets */
        hlp = xGetUp(self);
        lastseq = 0;
        for(seq = sstate->next_ack; sstate->recvq[seq].valid; 
                                    seq = (seq + 1) % sstate->seqsize) {
            xTrace1(swpp, TR_MAJOR_EVENTS, "swpPop: deliver %d", seq);
            xDemux(hlp, self, &sstate->recvq[seq].packet);
            recvqDequeue(sstate->recvq, seq);
            lastseq = seq;
        }

        if(seq != sstate->next_ack) {
            /*
             * We delivered packets to the user, acknowledge them
             * to the sender
             */
            sstate->next_ack = seq;
            xTrace1(swpp, TR_MAJOR_EVENTS, "swpPop: nextack %d", 
		    sstate->next_ack);
            sendAck(sstate, lls, lastseq);
        }
    }

    return XK_SUCCESS;
}

static void
swpTimeout(Event e, void *arg)
{
    SessnState *sstate = (SessnState *)arg;
    XTime now, diff;
    Msg retrans_msg;
    int i;

    /* reschedule the timout function */
    sstate->event = evSchedule(swpTimeout, arg, SWP_TICK);

    xGetTime(&now);

    /* search sendq for packets needing retransmission */
    for(i = 0; i < sstate->seqsize; i++) {
        if(sstate->sendq[i].valid) {
            xSubTime(&diff, now, sstate->sendq[i].last_sent);
            if(diff.sec > 0 || diff.usec > SWP_TIMEOUT) {
                /* retransmit packet */
                xTrace1(swpp, TR_MAJOR_EVENTS, "swpTimeout: retransmit %d", i);
                msgConstructCopy(&retrans_msg, &sstate->sendq[i].packet);
                sstate->sendq[i].last_sent = now;
                xPush(sstate->sendq[i].lls, &retrans_msg);
            }
        }
    }
}

static int
swpControlProtl(Protl self, int opcode, char *buf, int len)
{
    switch (opcode) {
        case GETMAXPACKET:
        case GETOPTPACKET:
            checkLen(len, sizeof(int));
            if (xControlProtl(xGetProtlDown(self, 0), opcode, buf, len) <
		    sizeof(int))
                return -1;
            *(int *)buf -= HLEN;
            return sizeof(int);
        default:
            return xControlProtl(xGetProtlDown(self, 0), opcode, buf, len);
    }
}

static int
swpControlSessn(Sessn self, int opcode, char *buf, int len)
{
    SessnState *sstate = (SessnState *)self->state;
    SWPhdr     *hdr;
    SWPsws windowsize;

    hdr = &(sstate->hdr);
    switch (opcode) {
        case SWP_SET_SWS:
	    if(sstate->inuse)
                return -1;
            checkLen(len, sizeof(SWPsws));
            windowsize = *(SWPsws *)buf;
            if(windowsize > SWP_MAX_WINDOW)
                return -1;

            setWindowSize(sstate, windowsize);
            return windowsize;

        case GETMYPROTO:
            checkLen(len, sizeof(long));
            *(long *)buf = sstate->hdr.sport;
            return sizeof(long);
        case GETPEERPROTO:
            checkLen(len, sizeof(long));
            *(long *)buf = sstate->hdr.dport;
            return sizeof(long);
        case GETMAXPACKET:
        case GETOPTPACKET:
            checkLen(len, sizeof(int));
            if (xControlSessn(xGetSessnDown(self, 0), opcode, buf, len) <
		    sizeof(int))
                return -1;
            *(int *)buf -= HLEN;
            return sizeof(int);
        default:
            return xControlSessn(xGetSessnDown(self, 0), opcode, buf, len);
    }
}

static Part *
swpGetParticipants(Sessn self)
{
    Part       *p;
    int        numParts;
    SessnState *sstate = (SessnState *)self->state;
    long       localPort, remotePort;

    p = xGetParticipants(xGetSessnDown(self, 0));
    if (!p)
        return NULL;
    numParts = partLength(p);
    if (numParts > 0 && numParts <= 2) {
        if (numParts == 2) {
            localPort = (long)sstate->hdr.sport;
            partPush(p[1], (void *)&localPort, sizeof(long));
        }
        remotePort = (long)sstate->hdr.dport;
        partPush(p[0], (void *)&remotePort, sizeof(long));
	return p;
    }
    else    /* Bad number of participants */
        return NULL;
}

static XkReturn
swpClose(Sessn s)
{
     ProtlState *pstate = (ProtlState *)xMyProtl(s)->state;
     SessnState *sstate = (SessnState *)s->state;

     /* cancel the timeout event */
     evCancel(sstate->event);

     /* remove this session from the active map */
     mapRemoveBinding(pstate->activemap, s->binding);

     /* close the lower level session on which it depends */
     xClose(xGetSessnDown(s, 0));

     /* de-allocate the session object itself */
     xFree((char *)sstate->sendq);
     xFree((char *)sstate->recvq);
     xFree(s->state);
     xDestroySessn(s);

     return XK_SUCCESS;
}
