/*     
 * $RCSfile: xk_fifo.c,v $
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 *
 * $Log: xk_fifo.c,v $
 * Revision 1.3  1997/03/17 01:30:45  dorgival
 * Added macros around pthread_mutex/cond calls
 *
 * Revision 1.2  1997/03/16 18:25:39  dorgival
 * Increased tracing info with this_thread->status
 *
 * Revision 1.1  1997/03/11 20:47:08  dorgival
 * Initial revision
 *
 *
 */

/*
 * Concurrent single-reader/multi-writer FIFO queues.
 */
#include <stdlib.h>
#include <assert.h>

#include "trace.h"
#include "xk_debug.h"
#include "xk_thread.h"
#include "xk_fifo.h"
#include "x_util.h"

int tracefifo;

void
xkFifoInit (xk_fifo_t* q)
{
    xTrace1(fifo,TR_FUNCTIONAL_TRACE,"xkFifoInit: initializing fifo 0x%x",q);
    q->head = q->tail = (void *) 0;
    q->nblocked = 0;
    q->nqueued  = 0;
    pthread_mutex_init( &(q->mutex), pthread_mutexattr_default );
    pthread_cond_init( &(q->cond), pthread_condattr_default );
    xTrace1(fifo,TR_FUNCTIONAL_TRACE,"xkFifoInit(0x%x) returning",q);
}

int
xkFifoAppend (xk_fifo_t* q, void * element)
{
    int          handled;
    xk_thread_t* this_thread = threadSelf();
    FIFOEl       el = element;

    xTrace2(fifo,TR_FUNCTIONAL_TRACE,"xkFifoAppend(fifo 0x%x,elem 0x%x)",
            q, element );

    assert(element);

    el->next = NULL;

    xTrace2(fifo,TR_EVENTS,
            "xkFifoAppend: thread 0x%x trying to lock pmutex for queue 0x%x",
	    this_thread, q);

    PTHREAD_MUTEX_LOCK( &(q->mutex) );

    xTrace2(fifo,TR_EVENTS,
            "xkFifoAppend: thread 0x%x got lock of pmutex for queue 0x%x",
	    this_thread, q);

    if (q->head) {
	q->tail->next = el;
    } else {
	q->head = el;
    }
    q->tail = el;
    q->nqueued++;

    handled = (q->nblocked >= q->nqueued);

    xTrace2(fifo,TR_MORE_EVENTS,
            "There are %d queued messages, and %d blocked threads",
	    q->nqueued, q->nblocked );

    if ( handled ) {
	xTrace1(fifo,TR_MORE_EVENTS,
		"There are msgs to be handled, signalling pcond for queue 0x%s",
		q );
	pthread_cond_signal( &(q->cond) );
    }

    PTHREAD_MUTEX_UNLOCK( &(q->mutex) );

    xTrace2(fifo,TR_EVENTS,
            "xkFifoAppend: thread 0x%x released lock of pmutex for queue 0x%x",
	    this_thread, q);

    xTrace1(fifo,TR_FUNCTIONAL_TRACE,"xkFifoAppend(0x%x) returning",q);

    return handled;
}


void *
xkFifoRemove (xk_fifo_t* q)
{
    FIFOEl       el = NULL;
    xk_thread_t* this_thread = threadSelf();

    xTrace1(fifo,TR_FUNCTIONAL_TRACE,"xkFifoRemove(fifo 0x%x)", q );

    xTrace2(fifo,TR_EVENTS,
            "xkFifoRemove: thread 0x%x trying to lock pmutex for queue 0x%x",
	    this_thread, q);

    PTHREAD_MUTEX_LOCK( &(q->mutex) );

    xTrace2(fifo,TR_EVENTS,
            "xkFifoRemove: thread 0x%x got lock of pmutex for queue 0x%x",
	    this_thread, q);

    while (!q->head) {
	/* cannot remove from empty fifo: */
	q->nblocked++;

	xTrace2(fifo,TR_EVENTS,
		"xkFifoRemove: thread 0x%x waiting on pcond for queue 0x%x",
		this_thread, q);

	PTHREAD_COND_WAIT(&(q->cond),&(q->mutex));

	xTrace2(fifo,TR_EVENTS,
		"xkFifoRemove: thread 0x%x signalled on pcond for queue 0x%x",
		this_thread, q);

	q->nblocked--;
    }

    el = q->head;
    q->head = el->next;
    q->nqueued--;

    PTHREAD_MUTEX_UNLOCK( &(q->mutex) );

    xTrace3(fifo,TR_EVENTS,
	    "xkFifoRemove: thread 0x%x got element 0x%x, unlocked queue 0x%x",
	    this_thread, el, q);

    return el;
}

void *
xkFifoNBRemove (xk_fifo_t* q)
{
    FIFOEl       el = NULL;
    xk_thread_t* this_thread = threadSelf();

    xTrace1(fifo,TR_FUNCTIONAL_TRACE,"xkFifoNBRemove(fifo 0x%x)", q );

    xTrace2(fifo,TR_EVENTS,
            "xkFifoNBRemove: thread 0x%x trying to lock pmutex for queue 0x%x",
	    this_thread, q);

    PTHREAD_MUTEX_LOCK( &(q->mutex) );

    xTrace2(fifo,TR_EVENTS,
            "xkFifoNBRemove: thread 0x%x got lock to pmutex for queue 0x%x",
	    this_thread, q);

    if (!q->head) {
	/* cannot remove from empty fifo: */
	PTHREAD_MUTEX_UNLOCK( &(q->mutex) );
	xTrace2(fifo,TR_EVENTS,
	        "xkFifoNBRemove: thread 0x%x unlocked queue 0x%x, returning 0",
		this_thread, q );
	return NULL;
    }

    el = q->head;
    q->head = el->next;
    q->nqueued--;

    PTHREAD_MUTEX_UNLOCK( &(q->mutex) );

    xTrace3(fifo,TR_EVENTS,
	    "xkFifoNBRemove: thread 0x%x got element 0x%x, unlocked queue 0x%x",
	    this_thread, el, q);

    return el;
}
