/*     
 * $RCSfile: inputProcess.c,v $
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 *
 * $Revision: 1.2 $
 * $Date: 1996/01/31 16:25:17 $
 */


/* 
 * Input buffer pools
 */

#include "msg.h"
#include "xk_debug.h"
#include "event.h"
#include "inputProcess.h"
#include "x_libc.h"


#define NOREF 0			/* buffer in use flag */
#define INUSE 1


int	traceshepherd;
#define traceinput	traceshepherd


static void
block_handler(Event ev, VOID *arg)
{
     InputBuffer *bp = (InputBuffer *)arg;
     
     xTrace1(input, TR_EVENTS, "input thread block handler runs, bp %x",
	     (u_int)bp);
     for (;;) {
	 semWait(&bp->sem);		/* wait for incoming data */
	 xTrace1(input, TR_MORE_EVENTS, "shepherd thread for block %x runs",
		 (int)bp);
	 bp->demux(bp);
	 bp->data = msgRefresh(&bp->msg, bp->maxBufSize);
	 bp->ref = NOREF;
     }
}




/* 
 * Return a handle to a new pool of shepherd threads/buffers
 */
BufferPool *
xkBufferPoolInit( int numBlocks, int msgSize, BlockDemuxFunc demux )
{
    int 	i;
    InputBuffer	*bp;
    BufferPool	*pool;
    
    xTrace1(input, TR_MAJOR_EVENTS,
	    "bufferPoolInit starts %d shepherd threads", numBlocks);
    pool = X_NEW(BufferPool);
    pool->total_blocks = numBlocks;
    pool->blocks = (InputBuffer *)xMalloc(numBlocks * sizeof(InputBuffer));
    bzero((char *)pool->blocks, numBlocks * sizeof(InputBuffer));
    pool->next_block = 0;
    
    for ( i=0, bp = pool->blocks; i < numBlocks; i++, bp++) {
	bp->id = i;
	bp->maxBufSize = msgSize;
	bp->demux = demux;
	bp->data = msgConstructAllocate(&bp->msg, bp->maxBufSize);
	bp->ref = NOREF;
	semInit(&bp->sem, 0);
	
	evDetach( evSchedule(block_handler, bp, 0));
    }
    return pool;
}


InputBuffer *
xkBufferPoolNextBlock( BufferPool *pool )
{
    InputBuffer *bp;
    int		i;

    i = pool->next_block;
    do {
	bp = &(pool->blocks[i]);
	i++;
	i %= pool->total_blocks;
    } while ( bp->ref == INUSE && i != pool->next_block );
    if ( bp->ref == INUSE ) {
	xTrace1(input, TR_SOFT_ERRORS, "Pool %x, all threads are busy",
		(u_int)pool);
	return NULL;
    }
    xTrace2(input, TR_MORE_EVENTS, "nextBlock returns block %x (# %d)",
	    (u_int)bp, i);
    bp->ref = INUSE;
    pool->next_block = i;
    return bp;
}


void
xkBufferPoolDump( BufferPool *pool )
{
    int	i;

    for ( i=0; i < pool->total_blocks; i++ ) {
	printf("%d: %s  ", i, pool->blocks[i].ref == INUSE ? "R" : "I");
    }
    printf("\n");
    printf("next_block == %d\n", pool->next_block);
}


void
xkBufferPoolReleaseBlock( InputBuffer *bp )
{
    bp->ref = NOREF;
}
