/*
 * shepherd_pool.c - check shepherd_pool.h for description
 */

#include <stdio.h>
#include <values.h>
#include "msg.h"
#include "xk_thread.h"
#include "xk_fifo.h"
#include "shepherd_pool.h"
#include "xkernel.h"

static void shepherd_stub( shepherd_pool_t* pool );

int traceshepherd;

/*
 * shepherd_stub: it runs in an infinite loop, removing messages from
 * the pool and calling the real shepherd function associated with that pool.
 * The main point is that it must manage the lock to the x-kernel for
 * the shepherd function, using the lock queue for shepherds.
 */

static void
shepherd_stub( shepherd_pool_t* pool )
{
    msg_holder_t* holder;
    xk_thread_t*  this_thread = threadSelf();

    xTrace2(shepherd,TR_EVENTS,"shepherd_stub(pool 0x%x) runs as thread 0x%x",
            pool, this_thread );

    while (1) {

	xTrace1(shepherd,TR_EVENTS,
	        "shepherd_stub: thread 0x%x calling xkFifoRemove",this_thread);

	/* First, block until a message is available to be shepherded */
	holder = xkFifoRemove( &pool->shepherd_queue ); 

	xTrace2(shepherd,TR_MAJOR_EVENTS,"shepherd_stub: thread 0x%x got holder 0x%x",
		this_thread, holder );

	xTrace1(shepherd,TR_DETAILED,
	        "shepherd_stub: thread 0x%x calling xk_shepherd_lock",
		this_thread);

	/* Now, block until exclusive access to the x-kernel is granted */
	xk_shepherd_lock();

	xTrace2(shepherd,TR_EVENTS, "shepherd_stub: thread 0x%x calling 0x%x",
		this_thread, pool->shepherd );

	/* Execute the real shepherd function */
	pool->shepherd( pool->arg, &holder->msg );

	xTrace2(shepherd,TR_MAJOR_EVENTS,
	        "shepherd_stub: thread 0x%x returned from function 0x%x",
		this_thread, pool->shepherd );

	xTrace1(shepherd,TR_DETAILED,
	        "shepherd_stub: thread 0x%x calling xk_all_unlock",
		this_thread);
	
	xk_all_unlock();

	/*
	 * Finally, return the holder to the free list. Note that
	 * holder->msg still exists. We are leaving the task of 
	 * destroying it to the device driver main loop.
	 */

	xTrace3(shepherd,TR_EVENTS,
	        "shepherd_stub: thread 0x%x calling xkFifoAppend(q0x%x,e0x%x)",
		this_thread, pool, holder );

	xkFifoAppend( &pool->free_holders, holder );

	xTrace1(shepherd,TR_MORE_EVENTS,
	        "shepherd_stub: thread 0x%x restarting main shepherd_stub loop",
		this_thread );
    }
    xTrace0(shepherd,TR_ERRORS,
	    "shepherd_stub: exiting --- this should never happen!!!");
}

shepherd_pool_t*
shepherd_pool_init( shepherd_pool_t* pool,
	          int holder_low_limit, int holder_high_limit,
		  shepherd_func_t shepherd, void* arg,
		  int shepherd_low_limit, int shepherd_high_limit )
{
    int           i;
    msg_holder_t* holder;

    xTrace3(shepherd,TR_FUNCTIONAL_TRACE,
            "shepherd_pool_init: pool 0x%x, shepherd function 0x%x, arg 0x%x",
	    pool, shepherd, arg );

    if (!pool) {
	pool = X_NEW(shepherd_pool_t);
	xTrace1(shepherd,TR_MORE_EVENTS,"shepherd_pool_init: alloc'd 0x%x",pool);
    }
    
    if ( !holder_high_limit ) {
	holder_high_limit = MAXINT;
    } else if ( holder_low_limit > holder_high_limit ) {
	/* Maybe we should add a warning message here */
	holder_high_limit = holder_low_limit;
    }
    if ( !shepherd_high_limit ) {
	shepherd_high_limit = MAXINT;
    } else if ( shepherd_low_limit > shepherd_high_limit ) {
	/* Maybe we should add a warning message here */
	shepherd_high_limit = shepherd_low_limit;
    }
    xTrace2(shepherd,TR_MORE_EVENTS,"shepherd_pool_init: holders min %d, max %d",
	    holder_low_limit, holder_high_limit );
    xTrace2(shepherd,TR_MORE_EVENTS,"shepherd_pool_init: shepherd min %d,max %d",
	    shepherd_low_limit, shepherd_high_limit );

    xkFifoInit( &pool->free_holders );
    xkFifoInit( &pool->shepherd_queue );
    pool->holder_low_limit    = holder_low_limit;
    pool->holder_high_limit   = holder_high_limit;
    pool->holder_cnt          = 0;
    pool->shepherd            = shepherd;
    pool->arg                 = arg;
    pool->shepherd_low_limit  = shepherd_low_limit;
    pool->shepherd_high_limit = shepherd_high_limit;
    pool->shepherd_cnt        = 0;

    xTrace1(shepherd,TR_EVENTS,"shepherd_pool_init: appending %d empty holders",
	    holder_low_limit );

    for (i=0;i<holder_low_limit;++i) {
	holder = X_NEW(msg_holder_t);
	msgConstructEmpty( &holder->msg );
    
	xTrace2(shepherd,TR_MORE_EVENTS,
	        "shepherd_pool_init: calling xkFifoAppend(0x%x,0x%x)",
		&pool->free_holders, holder );

	xkFifoAppend( &pool->free_holders, holder );
	pool->holder_cnt++;
    }

    xTrace1(shepherd,TR_EVENTS,"shepherd_pool_init:creating %d shepherd threads",
	    shepherd_low_limit );

    for (i=0;i<shepherd_low_limit;++i) {
    
	xTrace1(shepherd,TR_MORE_EVENTS,
	        "shepherd_pool_init: calling pThreadCreate(0x%x)", pool );

	pThreadCreate( (ThreadFunc)shepherd_stub, pool, "shepherd_stub" );
	pool->shepherd_cnt++;
    }
    xTrace1(shepherd,TR_FUNCTIONAL_TRACE,"shepherd_pool_init: returning 0x%x",
	    pool );
    return pool;
}

msg_holder_t*
get_free_holder( shepherd_pool_t* pool )
{
    msg_holder_t* holder;
    xk_thread_t*  this_thread = threadSelf();

    xTrace2(shepherd,TR_FUNCTIONAL_TRACE,
            "get_free_holder(0x%x) called by thread 0x%x", pool, this_thread );

    xTrace2(shepherd,TR_EVENTS,
            "get_free_holder: thread 0x%x calling xkFifoNBRemove(pool 0x%x)",
	    this_thread,pool);

    holder = xkFifoNBRemove( &pool->free_holders );

    if ( !holder ) {
	if ( pool->holder_cnt < pool->holder_high_limit ) {
	    holder = X_NEW(msg_holder_t);
	    msgConstructEmpty(&holder->msg);
	    pool->holder_cnt++;
	    xTrace2(shepherd,TR_MORE_EVENTS,
	            "get_free_holder: create new holder(0x%x) in pool 0x%x",
		    holder, pool );
	} else {
	    xTrace0(shepherd,TR_MORE_EVENTS,
	            "get_free_holder: couldn't get any holder, returning NULL");
	    /* 
	     * There are two options here: block the thread
	     *     (holder = xkFifoRemove( &pool->free_holders );)
	     * or return NULL and let the caller deal with it.
	     * Blocking does not sound like a good idea, since
	     * it would prevent the device driver from receiving
	     * other packets.
	     */
	    return NULL;
	}
    }
    xTrace2(shepherd,TR_EVENTS,"get_free_holder(0x%x): returning 0x%x",pool,holder);
    return holder;
}

void
trigger_shepherd( shepherd_pool_t* pool, msg_holder_t* holder )
{
    xk_thread_t*  this_thread = threadSelf();

    xTrace3(shepherd,TR_FUNCTIONAL_TRACE,
            "trigger_shepherd(pool 0x%x, holder 0x%x): in thread 0x%x",
	    pool, holder, this_thread );

    if ( xkFifoAppend( &pool->shepherd_queue, holder  ) == 0 ) {
        xTrace2(shepherd,TR_MORE_EVENTS,
	        "trigger_shepherd(0x%x,0x%x): msg not handled at once",
		pool, holder );
	if ( pool->shepherd_cnt < pool->shepherd_high_limit ) {
	    xTrace2(shepherd,TR_MORE_EVENTS,
		    "trigger_shepherd(0x%x,0x%x): creating a new thread",
		    pool, holder );
	    pool->shepherd_cnt++;
	    pThreadCreate( (ThreadFunc)shepherd_stub, pool, "shepherd_stub" );
	}
    }
    xTrace3(shepherd,TR_FUNCTIONAL_TRACE,
            "trigger_shepherd(pool 0x%x, holder 0x%x): thread 0x%x returning",
	    pool, holder, this_thread );
}
