/*
 * $RCSfile: blast_output.c,v $
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 *
 * $Log: blast_output.c,v $
 * Revision 1.2  1996/02/02 15:51:03  slm
 * Updated copyright and version.
 *
 * Revision 1.1  1995/07/28  22:08:26  slm
 * Initial revision
 *
 * Revision 1.40.2.1.1.2  1994/12/02  18:08:43  hkaram
 * Changed to msgPoke
 * Changed to new mapResolve interface
 *
 * Revision 1.40.2.1.1.1  1994/11/23  17:29:08  hkaram
 * New branch
 *
 * Revision 1.40.2.1  1994/04/25  21:38:25  menze
 * Return msg handle in push
 * field name spelling change
 *
 * Revision 1.40  1993/12/16  01:24:25  menze
 * Removed inappropriate references to Semaphore internals.
 * Fixed mask references to compile with strict ANSI restrictions
 *
 * Revision 1.39  1993/12/13  22:34:45  menze
 * Modifications from UMass:
 *
 *   [ 93/11/12          yates ]
 *   Changed casting of Map manager calls so that the header file does it all.
 */

#include "blast_internal.h"

#ifdef __STDC__

static void	blastSendTimeout( Event, void * );
static void	freeMstate( MSG_STATE * );
static void	push_fragment( Sessn, Msg *, BLAST_HDR * );
static void	retransmit( int, MSG_STATE * );
static XkHandle	sendShortMsg( Msg *, SState * );
static void	unmapMstate( MSG_STATE * );

#else

static void	blastSendTimeout();
static void	freeMstate();
static void	unmapMstate();

#endif /* __STDC__ */

/* 
 * mstate sending reference counts:
 *
 * Reference counts represent both:
 *	presence in the senders map (<= 1 ref count)
 *	retransmission requests (>= 0 ref counts)
 *
 * When a sending mstate is removed from the senders map (through
 * either a timeout or user notification), the reference count
 * associated with the map binding is freed.  After this has happened,
 * the mstate will be deallocated once all active retransmission
 * requests complete.
 */



static XkHandle
sendShortMsg(m, state)
    Msg *m;
    SState *state;  
{
    void *buf;

    state->short_hdr.len = msgLength(m);
    xIfTrace(blastp, TR_DETAILED) {
	blast_phdr(&state->short_hdr);
    }
    xTrace1(blastp, TR_DETAILED, "in blast_push2 down_s = %lx",
	    (long) xGetSessnDown(state->self, 0));

    buf = msgPush(m, BLASTHLEN);
    xAssert(buf);
    blastHdrStore(&state->short_hdr, buf, BLASTHLEN, 0);

    xIfTrace(blastp,TR_DETAILED) {
	xTrace0(blastp, TR_ALWAYS, "message in header");
	blast_phdr(&state->short_hdr);
    }
    xTrace1(blastp, TR_DETAILED, "in blast_push2 msg len = %d", msgLength(m));
    if (xPush(xGetSessnDown(state->self, 0), m) ==  -1) {
	xTrace0(blastp, TR_ERRORS, "blast_push: can't send message");
	return XMSG_ERR_HANDLE;
    }
    return XMSG_NULL_HANDLE;
}


static void
push_fragment(s, frag, h)
    Sessn s;
    Msg *frag;
    BLAST_HDR *h;
{
    Msg msgToPush;
    void *buf;

    xIfTrace(blastp, TR_DETAILED) {
	xTrace0(blastp, TR_ALWAYS, "Outgoing message ");
	blast_phdr(h);
    }
    msgConstructCopy(&msgToPush, frag);

    buf = msgPush(&msgToPush, BLASTHLEN);
    xAssert(buf);
    blastHdrStore(h, buf, BLASTHLEN, 0);

    if (xPush(s, &msgToPush) ==  -1) {
	xTrace0(blastp, TR_ERRORS, "blast_push: can't send fragment");
    }
    msgDestroy(&msgToPush);
}

    	
XkHandle
blastPush(s, msg)
    Sessn s;
    Msg *msg;
{
    SState	*state;
    PState 	*pstate;
    MSG_STATE  	*mstate;
    BLAST_HDR 	*hdr;
    int 	seq;
    int 	num_frags;
    int 	i;
    Sessn	lls;
    chan_info_t *info;
    
    xTrace0(blastp, TR_EVENTS, "in blast_push");
    xAssert(xIsSessn(s));

    pstate = (PState *) s->myprotl->state;
    state = (SState *) s->state;
    xTrace1(blastp, TR_MORE_EVENTS, "blast_push: state = %d", state);
    xTrace1(blastp, TR_MORE_EVENTS,
	    "blast_push: outgoing msg len (no blast hdr): %d",
	    msgLength(msg));

    if (msgLength(msg) > state->fragmentSize * BLAST_MAX_FRAGS ) {
	xTrace2(blastp, TR_SOFT_ERRORS,
		"blast_push: message length %d > max size %d",
	       msgLength(msg), state->fragmentSize * BLAST_MAX_FRAGS);
	return XMSG_ERR_HANDLE;
    }
    
    /* if message is short, by-pass everthing */
    if ( msgLength(msg) < state->fragmentSize ) {
	return sendShortMsg(msg, state);
    }

    xTrace0(blastp, TR_DETAILED, "in blast_push3");

    /* get chan info */
    info = (chan_info_t *) msgGetAttr(msg,0);
    

    /* get new sequence number */
    if ( pstate->max_seq == BLAST_MAX_SEQ ) {
	xTraceS0(s, TR_MAJOR_EVENTS, "sequence number wraps");
	pstate->max_seq = 0;
    }
    seq = ++pstate->max_seq;
    
    /* need new message state */
    mstate = blastNewMstate(s);
    /* 
     * Add a reference count for this message
     */
    state->ircnt++;
    
    /* bind mstate to seqence number */
    mstate->binding = mapBind(pstate->send_map, &seq, mstate);
    if (mstate->binding == ERR_BIND) {
	xTrace0(blastp, TR_ERRORS, "blast_push: can't bind seqence number");
#if 0
	if (mstate) xFree((char *)mstate);
#endif
	return XMSG_ERR_HANDLE;
    } 
    mstate->state = state;
    
    /* fill in header */
    hdr = &mstate->hdr;
    *hdr = state->short_hdr;
    hdr->seq = seq;
    xIfTrace(blastp, TR_DETAILED) {
	xTrace0(blastp, TR_ALWAYS, "blasts_push: static header:");
	blast_phdr(hdr);
    }
    
    num_frags = (msgLength(msg) + state->fragmentSize - 1)/(state->fragmentSize);
    hdr->num_frag = num_frags;
    BLAST_MASK_SET_BIT(&hdr->mask, 1);
    
    /* send off fragments */
    semWait(&pstate->outstanding_messages);
    mstate->wait = SEND_CONST;
    lls = xGetSessnDown(s, 0);
    for ( i=1; i <= num_frags; i++ ) {
	Msg *packet = &mstate->frags[i];
	msgConstructEmpty(packet);
	msgBreak(msg, packet, state->fragmentSize);
	xTrace2(blastp, TR_MORE_EVENTS,
		"fragment: len[%d] = %d", i, msgLength(packet));
	/* fill in dynamic parts of header */
	hdr->len = msgLength(packet);
	xTrace1(blastp, TR_MORE_EVENTS, "blast_push: sending frag %d", i);
	push_fragment(lls, packet, hdr);
	BLAST_MASK_SHIFT_LEFT(hdr->mask, 1);
    }
    /* set timer to free message if no NACK is received */
    if ( ! mstate->event ) {
	xTrace1(blastp, TR_MORE_EVENTS,
		"scheduling blastSendTimeout for mstate %lx", (long) mstate);
	mstate->event = evSchedule(blastSendTimeout, mstate, mstate->wait);
    }
    if (info) {
	info->transport = s->myprotl;
	info->ticket = seq;
	info->reliable = 0;
	info->expensive = 1;
    }
    return seq;
}


/*
 * Retransmit the message referenced by mstate.  1's in 'mask' indicate
 * fragments received, 0's indicate fragments that need to be retransmitted.
 */
static void
retransmit(mask, mstate)
    int		mask;
    MSG_STATE 	*mstate;
{
    int 	i;
    SState	*state;
    BLAST_HDR 	*hdr;
    Sessn	lls;
    
    xTrace2(blastp, REXMIT_T, "blast retransmit: called seq = %d mask = %s",
	    mstate->hdr.seq, blastShowMask(mask));
    if (mstate == 0) {
	xTrace0(blastp, REXMIT_T, "retransmit: no message state");
	return;
    }
    if ((state = (SState *)mstate->state) == 0) {
	xTrace0(blastp, TR_ERRORS, "retransmit: no state");
	return;
    }
    mstate->nacks_received++;
    hdr = &(mstate->hdr);
    hdr->op = BLAST_RETRANSMIT;
    /* send off fragments */
    lls = xGetSessnDown(mstate->state->self, 0);
    for ( i=1; i<=hdr->num_frag; i++ ) {
	if ( ! BLAST_MASK_IS_BIT_SET(&mask, i) ) {
	    Msg *packet = &mstate->frags[i];
	    xTrace2(blastp, REXMIT_T,
		    "retransmit: retransmitting fragment %d, msgptr %lx",
		    i, (long) packet);
	    /* fill in dynamic parts of header */
	    BLAST_MASK_CLEAR(hdr->mask);
	    BLAST_MASK_SET_BIT(&hdr->mask, i);
	    hdr->len = msgLength(packet);
	    push_fragment(lls, packet, hdr);
	}
    }
}


/* 
 * Removes the mstate from the senders map, kills the timeout event,
 * and removes the mstate reference count associated with the map
 * binding.
 */
static void
unmapMstate( ms )
    MSG_STATE	*ms;
{
#ifdef XK_DEBUG
    Sessn	sessn;
#endif
    SState	*ss;
    PState	*ps;

    ss = ms->state;
#ifdef XK_DEBUG    
    sessn = ss->self;
#endif
    xTraceS0(sessn, TR_MORE_EVENTS, "unmapping mstate");
    ps = (PState *)xMyProtl(ss->self)->state;
    if ( ms->event ) {
	evCancel(ms->event);
	ms->event = 0;
    }
    if (ms->binding) {
	if ( mapRemoveBinding(ps->send_map, ms->binding) == XK_SUCCESS ) {
	    /* 
	     * Release reference count for the map binding
	     */
	    freeMstate(ms);
	} else {
	    xTraceS0(sessn, TR_ERRORS, "unmapMstate: can't unbind!");
	}
    } else {
	xTraceS0(sessn, TR_ERRORS, "unmapMstate: no binding!");
    }
}


/*
 * Free the message and state associated with the outgoing message referenced
 * by mstate.
 */
static void
freeMstate(mstate)
    MSG_STATE *mstate;
{
    PState	*pstate;
    int		i;
    Sessn	sessn;

    xTrace3(blastp, TR_MORE_EVENTS,
	    "blast output freeMstate, seq == %d, mstate == %lx, rcnt == %d",
	    mstate->hdr.seq, (long)mstate, mstate->rcnt);
    xAssert(mstate->rcnt > 0);
    mstate->rcnt--;
    if ( mstate->rcnt > 0 ) {
	return;
    }
    /* 
     * if rcnt == 0, unmapMstate should have been called.
     */
    xAssert(mstate->event == 0);
    sessn = ((SState *)mstate->state)->self;
    xTraceS0(sessn, TR_MORE_EVENTS, "truly deallocating sending mstate");
    pstate = (PState *)xMyProtl(sessn)->state;
    xTrace1(blastp, TR_MORE_EVENTS, "blast freeMstate: num_frags = %d",
	    mstate->hdr.num_frag);
    for ( i = 1; i <= mstate->hdr.num_frag; i++ ) {
	msgDestroy(&mstate->frags[i]);
    }
    if ( stackPush(pstate->mstateStack, (VOID *)mstate) ) {
	xTrace1(blastp, TR_MORE_EVENTS,
		"free_send_seq: mstate stack is full, freeing %lx",
		(long) mstate);
#if 0
	xFree((char *)mstate);
#endif
    }
    semSignal(&pstate->outstanding_messages);
    /* 
     * Remove ref count corresponding to this message
     */
    blastDecIrc(sessn);
}

/*
 * Are we making progress? 
 */
int
blast_Retransmit(pstate, seq)
    PState *pstate;
    int seq;
{
    MSG_STATE   *mstate;


    xTrace1(blastp, TR_MORE_EVENTS, "blast_Retransmit: begin seq = %d", seq);
    if (mapResolve(pstate->send_map, &seq, (void **)&mstate) == XK_FAILURE) {
        xTrace0(blastp, TR_SOFT_ERRORS, "blast_Retransmit: no mstate");
        return -1;
    }
    if (mstate->nacks_received) {
	mstate->nacks_received = 0;
	return 0;
    }
    return 1;
}


/* 
 * Kill the timeout event and free the outgoing message state
 */
int
blast_freeSendSeq(pstate, seq)
    PState *pstate;
    int seq;
{
    MSG_STATE	*mstate;
   
    
    xTrace1(blastp, TR_MORE_EVENTS, "free_send_msg: begin seq = %d", seq);
    if (mapResolve(pstate->send_map, &seq, (void **)&mstate) == XK_FAILURE) {
	xTrace0(blastp, TR_SOFT_ERRORS, "free_send_seq: nothing to free");
	return -1;
    } 
    unmapMstate(mstate);
    return 0;
}


/*
 * send_timout garbage collects the storage associated with 
 * a given sequence number. Since blast uses nacks it has 
 * to have some way of getting rid of storage 
 */
static void
blastSendTimeout(ev, arg)
    Event	ev;
    VOID 	*arg;
{
    MSG_STATE	*mstate = (MSG_STATE *)arg;

    xTrace0(blastp, REXMIT_T, "blast: send_timeout: timeout!");
    unmapMstate(mstate);
}

    
/* 
 * blastSenderPop -- process a retransmission request from out peer
 */
XkReturn
blastSenderPop(s, dg, hdr)
    Sessn s;
    Msg *dg;
    BLAST_HDR *hdr;
{
    PState	*pstate = (PState *)s->myprotl->state;
    MSG_STATE 	*mstate;
    
    xTrace0(blastp, TR_EVENTS, "BLAST_pop");
    
    /*
     * look for message state
     */
    if (mapResolve(pstate->send_map, &hdr->seq, (void **)&mstate) ==
	XK_FAILURE) {
	xTraceS0(s, TR_EVENTS, "senderPop: unmatched nack");
	return XK_SUCCESS;
    }
    xAssert(mstate->rcnt > 0);
    mstate->rcnt++;
    /* 
     * Cancel and restart timeout event
     */
    if ( mstate->event ) {
	xTraceS1(s, TR_MORE_EVENTS,
		"rescheduling send timeout for mstate %lx", (long) mstate);
	evCancel(mstate->event);
	mstate->event = evSchedule(blastSendTimeout, mstate, mstate->wait);
	retransmit(hdr->mask, mstate);
    } else {
	/* 
	 * If the sender timeout event hasn't been scheduled yet, either:
	 *
	 *	not all fragments have been sent yet ...
	 *  or
	 *	we're already processing a retransmisstion request
	 */
	xTraceS0(s, TR_SOFT_ERRORS, "retransmission request ignored");
    }
    freeMstate(mstate);	/* decrease ref count */
    return XK_SUCCESS;
} 


