/*     
 * $RCSfile: book-swp.c,v $
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 */

/*
 * Sliding Window Protocol.
 *
 * This protocol expects exactly one other protocol to be configured
 * on top and below of it (i.e., SWP does NOT perform any
 * demultiplexing).  Both protocols must be in the asynchronous domain
 * and record-oriented.  SWP implements reliable, ordered message
 * delivery and uses a sliding window protocol to improve throughput.
 *
 * A real world protocol also would have to:
 *	- convert headers to/from network byte order
 *	- do more error checking (in particular of received headers)
 *	- piggy-back ACKs on data messages
 */
#include "xkernel.h"

#define SWS	128		/* send window size */
#define RWS	128		/* receive winwod size */

/*
 * Message types:
 */
#define FLAG_ACK_VALID	0x01
#define FLAG_HAS_DATA	0x02

typedef xk_u_int8	SWPSeqno;

typedef struct {
    SWPSeqno	seqno;		/* sequence number of this packet */
    SWPSeqno	ackno;		/* allows window sizes of up to 128 */
    xk_u_int16	flags;		/* up to 16 bits worth of flags */
} SWPHdr;

typedef struct {
    Map		activemap;
} PState;

typedef struct {
    Binding	binding;

    /* sender side state: */
    SWPSeqno	LAR;		/* seqno of last ACK received */
    SWPSeqno	LFS;		/* last frame sent */
    Semaphore	sendWindowNotFull;
    SWPHdr	hdr;		/* pre-inintialized header */
    struct txq_slot {
	Event	timeout;	/* event associated with send-timeout */
	Sessn	lls;		/* session to resend message on */
	Msg	msg;
    } txq[SWS];

    /* receiver side state: */
    SWPSeqno	NFE;		/* seqno of next frame expected */
    struct rxq_slot {
	int	received;	/* is msg valid? */
	Msg	msg;
    } rxq[RWS];
} SState;

#define SEC			(1000*1000)	/* this many usecs/sec */
#define SWP_SEND_TIMEOUT	(5*SEC)


void swp_init(Protl);	/* prototype for global function */


int	traceswpp = 0;

static Protl	swpHlp;		/* pointer to (only) higher-level protocol */


static bool
swpInWindow(SWPSeqno seqno, SWPSeqno min, SWPSeqno max)
{
    SWPSeqno pos, maxpos;

    pos    = seqno - min;	/* pos *should* be in range [0..WS-1] */
    maxpos = max - min + 1;	/* maxpos is in range [0..WS] */
    return pos < maxpos;
}


static void
swpTimeout(Event ev, void *arg)
{
    struct txq_slot *slot = arg;
    Msg m;

    xTrace1(swpp, TR_EVENTS, "swp: retransmitting slot %p", slot);
    msgConstructCopy(&m, &slot->msg);
    xPush(slot->lls, &m);
    msgDestroy(&m);
    slot->timeout = evSchedule(swpTimeout, arg, SWP_SEND_TIMEOUT);
}


static XkHandle
swpPush(Sessn s, Msg *msg)
{
    SState *sstate = s->state;
    struct txq_slot *slot;

    semWait(&sstate->sendWindowNotFull);
    sstate->hdr.seqno = ++sstate->LFS;
    slot = &sstate->txq[sstate->hdr.seqno % SWS];
    bcopy(&sstate->hdr, msgPush(msg, sizeof(SWPHdr)), sizeof(SWPHdr));
    msgConstructCopy(&slot->msg, msg);
    slot->lls = xGetSessnDown(s, 0);
    slot->timeout = evSchedule(swpTimeout, slot, SWP_SEND_TIMEOUT);
    xTrace2(swpp, TR_EVENTS, "swp: sending seqno %u in slot %p",
	    sstate->hdr.seqno, slot);
    return xPush(slot->lls, msg);
}


static XkReturn
swpPop(Sessn s, Sessn ds, Msg *dg, VOID *arg)
{
    SState *sstate = (SState *)s->state;
    SWPHdr hdr;

    bcopy(msgPop(dg, sizeof(SWPHdr)), &hdr, sizeof(SWPHdr));
    if (hdr.flags & FLAG_ACK_VALID) {
	/* received an acknowledgment---do SENDER-side */
	if (swpInWindow(hdr.ackno, sstate->LAR + 1, sstate->LFS)) {
	    xTrace3(swpp, TR_EVENTS,
		    "swp: got ack for seqno %u (LAR=%u, LFS=%u)",
		    hdr.ackno, sstate->LAR, sstate->LFS);
	    do {
		struct txq_slot *slot;

		slot = &sstate->txq[++sstate->LAR % SWS];
		evCancel(slot->timeout);
		msgDestroy(&slot->msg);
		semSignal(&sstate->sendWindowNotFull);
	    } while (sstate->LAR != hdr.ackno);
	} else {
	    xTrace3(swpp, TR_EVENTS,
		    "swp: ignoring ack %u (LAR=%u, LFS=%u)",
		    hdr.ackno, sstate->LAR, sstate->LFS);
	}
    }
    if (hdr.flags & FLAG_HAS_DATA) {
	struct rxq_slot *slot;

	/* received data packet---do RECEIVER side */
	slot = &sstate->rxq[hdr.seqno % RWS];
	if (!swpInWindow(hdr.seqno, sstate->NFE, sstate->NFE + RWS - 1)) {
	    /* drop the message */
	    xTrace3(swpp, TR_EVENTS,
		    "swp: dropping seqno %u (NFE=%u, NFE+RWS-1=%u)",
		    hdr.seqno, sstate->NFE, sstate->NFE + RWS - 1);
	    return XK_SUCCESS;
	}
	msgConstructCopy(&slot->msg, dg);
	slot->received = TRUE;
	if (hdr.seqno == sstate->NFE) {
	    SWPHdr ackhdr;
	    Msg m;

	    while (slot->received) {
		xTrace1(swpp, TR_EVENTS,
			"swp: delivering seqno %u to hlp", sstate->NFE);
		xDemux(xGetUp(s), s, &slot->msg);
		msgDestroy(&slot->msg);
		slot->received = FALSE;
		slot = &sstate->rxq[++sstate->NFE % RWS];
	    }
	    /* send ACK: */
	    xTrace1(swpp, TR_EVENTS,
		    "swp: acking seqno %u to peer", sstate->NFE - 1);
	    ackhdr.ackno = sstate->NFE - 1;
	    ackhdr.flags = FLAG_ACK_VALID;
	    msgConstructBuffer(&m, (char *)&ackhdr, sizeof(ackhdr));
	    xPush(ds, &m);
	    msgDestroy(&m);
	} else {
	    xTrace1(swpp, TR_EVENTS,
		    "swp: queued out-of-order seqno %u for later delivery\n",
		    hdr.seqno);
	}
    }
    return XK_SUCCESS;
}


static int
swpControlSessn(Sessn s, int opcode, char *buf, int len)
{
    return xControlSessn(xGetSessnDown(s, 0), opcode, buf, len);;
}


static int
swpControlProtl(Protl s, int opcode, char *buf, int len)
{
    return xControlProtl(xGetProtlDown(s, 0), opcode, buf, len);
}


static XkReturn
swpCloseSessn(Sessn s)
{
    xClose(xGetSessnDown(s, 0));
    xDestroySessn(s);
    return XK_SUCCESS;
}


static Sessn
swpCreateSessn(Protl self, Sessn lls)
{
    PState *pstate = (PState *)self->state;
    SState *sstate;
    Sessn s;

    /* hlp and hlpType are (only) higher-level protocol, lls is key */

    xDuplicate(lls);
    s = xCreateSessn(0, swpHlp, swpHlp, self, 1, &lls);

    sstate = xMalloc(sizeof(SState));
    bzero((char *)sstate, sizeof(SState));
    sstate->binding = mapBind(pstate->activemap, &lls, s);

    /* initialize sender-side state: */
    sstate->LAR = -1;
    sstate->LFS = -1;
    semInit(&sstate->sendWindowNotFull, SWS);
    sstate->hdr.seqno = 0;
    sstate->hdr.ackno = 0;
    sstate->hdr.flags = FLAG_HAS_DATA;

    /* initialize receiver-side state: */
    sstate->NFE = 0;

    s->state   = sstate;
    s->push    = swpPush;
    s->pop     = swpPop;
    s->controlsessn = swpControlSessn;
    s->close   = swpCloseSessn;
    return s;
}


static XkReturn
swpDemux(Protl self, Sessn lls, Msg *dg)
{
    PState *pstate = (PState *)self->state;
    Sessn s;

    if (mapResolve(pstate->activemap, &lls, (void **)&s) != XK_SUCCESS) {
	s = swpCreateSessn(self, lls);
	if (s == ERR_SESSN)
	    return XK_SUCCESS;
	xDuplicate(lls);
    }
    return xPop(s, lls, dg, 0);
}


static Sessn
swpOpen(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    Sessn lls;

    swpHlp = hlp;
    lls = xOpen(self, self, xGetProtlDown(self, 0), p);
    return swpCreateSessn(self, lls);
}


static XkReturn
swpOpenEnable(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    swpHlp = hlp;
    return xOpenEnable(self, self, xGetProtlDown(self, 0), p);
}


static XkReturn
swpOpenDisable(Protl self, Protl hlp, Protl hlpType, Part *p)
{
    return xOpenDisable(self, self, xGetProtlDown(self, 0), p);
}


void
swp_init(Protl self)
{
    PState *pstate;

    pstate = xMalloc(sizeof(PState));
    bzero((char *)pstate, sizeof(PState));

    pstate->activemap = mapCreate(71, sizeof(Sessn));
    
    self->state	      = pstate;
    self->controlprotl = swpControlProtl;
    self->open	      = swpOpen;
    self->openenable  = swpOpenEnable;
    self->opendisable = swpOpenDisable;
    self->demux	      = swpDemux;
}
