/*
 * @COPYRIGHT@
 * @COPYRIGHT@
 *
 * $RCSfile: msg.c,v $
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1996,1993,1991,1990  Arizona Board of Regents
 *
 * $Log: msg.c,v $
 * Revision 1.6  1997/04/23 18:06:30  rrp
 * another bug fix from scout in next_frag.  Guard for case (b)
 * should be >=, not >
 *
 * Revision 1.5  1997/01/07  22:42:19  rrp
 * various bug fixes from scout.
 *
 * Revision 1.4  1996/06/03  18:42:22  slm
 * Added ifdef/endif around linearize function.
 *
 * Revision 1.3  1996/05/21  17:57:24  slm
 * Fixed bug in use of msgWalkDone reported by David.
 *
 * Revision 1.2  1996/01/29  19:47:00  slm
 * New Scout message tool.
 *
 */
/*
 * The msg tool package supports tree structured message management.
 * The algorithms were originally developed by Peter Druschel in C++
 * (no kidding!).
 */
/* #include <scout_config.h> */

#include <alloca.h>

#include "msg.h"
/* #include "system.h" */
#include "trace.h"

/*
 * MAX_HDR_STACK_SIZE is the maximum length of a stack of protocol headers.
 * This doesn't affect correctness, but improves performance  pushing headers
 * will lead to at most one push overflow when this parameters is correctly
 * chosen.
 *
 * MSG_PAGE_SIZE is the unit in which memory is allocated (to reduce internal
 * memory fragmentation and/or to make effective use of pages with page-based
 * allocators).  This parameter is not necessarily related to the platforms's
 * virtual memory page size.
 *
 * Both parameters have to be an integer power of 2.
 */
#define MAX_HDR_STACK_SIZE  128
#define MSG_PAGE_SIZE       (MAX_HDR_STACK_SIZE)
#define NUM_MSG_PAGES(len)  \
    (((len) + MAX_HDR_STACK_SIZE + MSG_PAGE_SIZE - 1) / MSG_PAGE_SIZE)

#define PAIR_NODE_SIZE      (sizeof(struct MsgNodePair))
#define LEAF_NODE_SIZE      (sizeof(struct MsgNodeLeaf))
#define PAGE_NODE_SIZE      (MSG_PAGE_SIZE)
#define BUF_NODE_SIZE       (sizeof(struct MsgNodeBuf))

#define PAGE_NODE_DATA_SIZE \
    (PAGE_NODE_SIZE - (long)&((MsgNodePage)0)->page.buffer[0])

#define ROUNDUP(len,align)  ((len + align - 1) & ~(align - 1))

#define CONCAT(a,b)         a##b

#ifdef OPTION_MSG_STATISTICS

#define RECORD_USAGE(kind) {                                      \
    ++stats.CONCAT(kind,s_in_use);                                \
    if (stats.CONCAT(kind,s_in_use) > stats.CONCAT(kind,s_hiwat)) \
        stats.CONCAT(kind,s_hiwat) = stats.CONCAT(kind,s_in_use); \
    stats.mem_in_use += CONCAT(kind,_NODE_SIZE);                  \
    if (stats.mem_in_use > stats.mem_hiwat)                       \
        stats.mem_hiwat = stats.mem_in_use;                       \
}

#define RECORD_RELEASE(kind) {                   \
    --stats.CONCAT(kind,s_in_use);               \
    stats.mem_in_use -= CONCAT(kind,_NODE_SIZE); \
}

static struct {
    /*
     * These fields have funny case because the prefix must match the
     * spelling in enum NodeType.
     */
    int PAIRs_in_use;
    int PAIRs_hiwat;
    int PAGEs_in_use;
    int PAGEs_hiwat;
    int BUFs_in_use;
    int BUFs_hiwat;
    int mem_in_use;
    int mem_hiwat;
} stats;

#else /* !OPTION_MSG_STATISTICS */

#define RECORD_USAGE(prefix)
#define RECORD_RELEASE(prefix)

#endif /* OPTION_MSG_STATISTICS */

#define GET_MSG_NODE(kind1,kind2,n,pages) {                               \
    n = (CONCAT(MsgNode,kind1))xMalloc(CONCAT(kind2,_NODE_SIZE) * pages); \
    n->c.refCnt = 1;                                                      \
    n->c.type = CONCAT(MSG_NODE_,kind2);                                  \
    RECORD_USAGE(kind2);                                                  \
    xTrace1(msg, TR_MAJOR_EVENTS, "msg.GET_MSG_NODE()=%p", n);            \
}

#define FREE_MSG_NODE(kind,n) {                                  \
    xTrace1(msg, TR_MAJOR_EVENTS, "msg.FREE_MSG_NODE(%p)", n);   \
    xFree((void *)n);                                            \
    RECORD_RELEASE(kind);                                        \
    xTrace0(msg, TR_FUNCTIONAL_TRACE, "msg.FREE_MSG_NODE done"); \
}

typedef struct MsgStackEl *MsgStackEl;

struct MsgStackEl {
    MsgStackEl next;
    void       *val;
};

/*
 * This has to be a macro because we want to use alloca() for fast
 * stack allocation.
 */
#define push(sp,v) {                     \
    MsgStackEl _n = alloca(sizeof(*_n)); \
    _n->next = *(sp);                    \
    _n->val = (v);                       \
    *(sp) = _n;                          \
}

static inline void *
pop(MsgStackEl *sp)
{
    void *retval;

    retval = NULL;
    if (*sp) {
	retval = (void *)(*sp)->val;
	*sp = (*sp)->next;
    }
    return retval;
}

int tracemsg = 0;

static struct MsgNodeLeaf emptyLeaf = {
    {
	MSG_NODE_EMPTY,	/* type */
	1,		/* refCnt */
    },
    {
	0,		/* size */
	0		/* buf */
    }
};

#ifdef OPTION_MSG_CONTROL_RESOURCES
/* Limit for number of nodes allocated to a single msg: */
static int hard_node_limit = 1000;
#endif

static const int soft_node_limit = 100;

/* Utility functions: */

static inline void 
push_frag(MsgFragStack s, MsgNode tree, int head, int tail)
{
    if (s->tos >= s->limit PREDICT_FALSE) {
	MsgFrag new_frag;
	int     new_len;

	new_len   = (s->limit - s->bottom) + 16;
	new_frag = (MsgFrag)xRealloc((void *)s->bottom,
				     new_len * sizeof(struct MsgFrag));
	s->tos    = new_frag + (s->tos - s->bottom);
	s->limit  = new_frag + new_len;
	s->bottom = new_frag;
    }
    s->tos->head = head;
    s->tos->tail = tail;
    s->tos->tree = tree;
    ++s->tos;
}

static inline void
pop_frag(MsgFragStack s, MsgNode *tree, int *head, int *tail)
{
    if (s->tos <= s->bottom PREDICT_TRUE)
	*tree = 0;
    else {
	--s->tos;
	*head = s->tos->head;
	*tail = s->tos->tail;
	*tree = s->tos->tree;
    }
}

/*
 * Given the state in W, determine the next fragment in the message.  
 * Conceptually, W->F contains the next fragment to be processed.  However,
 * in the case where W->F is a pair and the left child is a leaf, the pair
 * node gets skipped entirely and the left and right child are processed
 * directly.  Repeated invocation of NEXT_FRAG() will traverse the message
 * tree in depth-first-search order.  Don't change this capriciously as
 * msgWalkToughNext() is performance sensitive.
 */
static inline void
next_frag(MsgWalk *w, MsgFrag f)
{
    MsgNode node;
    MsgFrag l, r;
    int     l_len;

    *f = w->f;
    node = w->f.tree;
    switch (node->c.type) {
        case MSG_NODE_PAIR:
	    /*
	     * There are three cases: (a) all data is in the left
	     * child, (b) all data is in the right child, and (c) the
	     * there is an overlap.
	     */
	    l = &((MsgNodePair)node)->pair.l;
	    r = &((MsgNodePair)node)->pair.r;
	    l_len = l->tail;
	    /*
	     * Notice: This code is taking advantage of the fact that
	     * l->head == 0.
	     */
	    xAssert(l->head == 0);

	    if (w->f.tail <= l_len) {
	        /* case (a): need to visit left child only: */
	        w->f.tree = l->tree;
	    }
	    else {
	        int wr_tail = w->f.tail - l_len + r->head;

	        if (w->f.head >= l_len) {
		    /* case (b): need to visit right child only: */
		    w->f.tree = r->tree;
		    w->f.head = w->f.head - l_len + r->head;
		    w->f.tail = wr_tail;
	        }
		else {
		    /* case (c): visit left, then right child: */
		    /*
		     * Optimize for the common case where the left child is a
		     * leaf:
		     */
		    if (l->tree->c.type != MSG_NODE_PAIR PREDICT_TRUE) {
		        /* cool: l->tree is a leaf: skip pair node */
		        f->tree = l->tree;
		        f->tail = l->tail;
		        w->f.tree = r->tree;
		        w->f.head = r->head;
		        w->f.tail = wr_tail;
		    }
		    else {
		        /* really have to do it the lame way: */
		        w->f.tree = l->tree;
		        w->f.tail = l->tail;
		        push_frag(&w->stack, r->tree, r->head, wr_tail);
		    }
	        }
	    }
	    break;

        case MSG_NODE_PAGE:
        case MSG_NODE_BUF:
        case MSG_NODE_EMPTY:
	    pop_frag(&w->stack, &w->f.tree, &w->f.head, &w->f.tail);
	    break;

        default:
	    xTrace1(msg, TR_ERRORS, "msg.next_frag: bad node type %d",
		    node->c.type);
	    break;
    }
}

static inline void
inc_ref(MsgNode n)
{
    xAssert(n && n->c.type != MSG_NODE_JUNK);
    xTrace2(msg, TR_MAJOR_EVENTS, "msg.inc_ref(node=%p,refCnt=%d)",
	    n, n->c.refCnt);
    ++n->c.refCnt;
    xTrace0(msg, TR_MAJOR_EVENTS, "msg.inc_ref: returning");
}

static MsgNodePair
new_pair_node(int loff, int llen, MsgNode ln, int roff, int rlen, MsgNode rn)
{
    MsgNodePair n;
    
    xTrace0(msg, TR_FUNCTIONAL_TRACE, "msg.new_pair_node");
    GET_MSG_NODE(Pair, PAIR, n, 1);

    /* message tool is optimized for this case: */
    xAssert(loff == 0);

    /* refCnt was set by the allocation above */
    n->pair.l.head = 0;
    n->pair.l.tail = llen;
    n->pair.l.tree = ln;
    n->pair.r.head = roff;
    n->pair.r.tail = roff + rlen;
    n->pair.r.tree = rn;
    return n;
}

static MsgNodePage
new_page_node(int npages)
{
    MsgNodePage n;

    xTrace1(msg, TR_FUNCTIONAL_TRACE, "msg.new_page_node(npages=%d)", npages);

#ifdef OPTION_MSG_STATISTICS
    if (npages > 1) {
	/* 
	 * GET_MSG_NODE takes care of statistics for single page
	 * allocations, but we need to add the difference for larger
	 * nodes. 
	 */
	stats.mem_in_use += (npages - 1) * MSG_PAGE_SIZE;
    }
#endif
    GET_MSG_NODE(Page, PAGE, n, npages);

    n->leaf.size = PAGE_NODE_DATA_SIZE + ((npages - 1) * MSG_PAGE_SIZE);
    n->leaf.buf = &n->page.buffer[0];
    return n;
}

static MsgNodeBuf
new_buf_node(MsgDeallocator deallocator, char *buffer, int size)
{
    MsgNodeBuf n;

    xTrace3(msg, TR_FUNCTIONAL_TRACE,
	    "msg.new_buf_node(free=%p,buf=%p,size=%ld)",
	    deallocator, buffer, size);

    GET_MSG_NODE(Buf, BUF, n, 1);

    n->leaf.size = size;
    n->leaf.buf  = buffer;
    n->buf.free  = deallocator;
    return n;
}

/*
 * A msgPush overflowed a buffer: create a new page node and join them
 * with a new pair node.
 */
static void *
push_overflow(Msg *m, int push_len)
{
    int head = m->f.head;
    int old_len = msgLength(m);
    MsgNodeLeaf leaf;

    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msg.push_overflow(m=%p,push_len=%ld)",
	    m, push_len);

    /* create a new root node and a new leaf: */
    leaf = (MsgNodeLeaf)new_page_node(NUM_MSG_PAGES(push_len));
    xAssert(leaf);

    m->f.tree = (MsgNode)new_pair_node(0, leaf->leaf.size, (MsgNode)leaf, head,
				       old_len, m->f.tree);
    m->f.head      = leaf->leaf.size - push_len;
    m->f.tail      = leaf->leaf.size + old_len;/* leaf.size = head + push_len */
    m->first.leaf  = leaf;
    m->first.head  = 0;
    m->first.tail  = leaf->leaf.size;
    m->first.isMine = TRUE;
#ifdef OPTION_MSG_CONTROL_RESOURCES
    m->numNodes    += 2;
#endif
    return leaf->leaf.buf + m->f.head;
}

/*
 * When the first node of a message has too little data to satisfy a
 * msgPeek request, the message is restructured to make the requested
 * data reside in one buffer.
 *
 * Returns a pointer to the beginning of the data.
 *
 * This is a SLOW operation that should be rarely called.
 */
static char *
peek_underflow(Msg *m, int peek_len)
{
    int         len, old_len, remaining;
    char        *super_buf, *buf;
    MsgNodeLeaf super;
    MsgWalk     w;

    xTrace0(msg, TR_SOFT_ERRORS, "msg.peek_underflow");

    old_len = msgLength(m);
    if (m->f.head >= m->first.tail) {
	/*
	 * The first leaf has no data left, so there is a chance that
	 * the next buffer contains at least LEN bytes, in which case
	 * no copying needs to be done.  So lets traverse the tree
	 * to find the second leaf:
	 */
	MsgNode node = m->f.tree;
	int     head, tail;

	head = m->f.head;
	xAssert(node->c.type == MSG_NODE_PAIR);
	do {
	    MsgNodePair p = (MsgNodePair)node;
	    MsgFrag     f;
	    int         l_len;

	    xAssert(p->pair.l.head == 0);
	    l_len = p->pair.l.tail;
	    if (head < l_len)
		f = &p->pair.l;
	    else {
		f = &p->pair.r;
		head = (head - l_len) + f->head;
	    }
	    tail = f->tail;
	    node = f->tree;
	} while (node->c.type == MSG_NODE_PAIR);
	len = head - tail;

	xTrace1(msg, TR_FULL_TRACE,
		"msg.peek_underflow: second leaf of size %ld", len);

	if (len >= peek_len) {
	    /*
	     * The second leaf contains all the data we need.  So we
	     * simply discard the first leaf and make the node
	     * containing the data the first leaf.  Notice that the
	     * reference to the first leaf is not lost as it is still
	     * being referenced through the message tree.  Cleaning up
	     * that reference is not worth the trouble.
	     */
	    m->first.leaf  = (MsgNodeLeaf)node;
	    m->first.head  = m->f.head - head;
	    m->first.tail  = m->first.head + tail;
	    m->first.isMine = FALSE;
	    return m->first.leaf->leaf.buf + head;
	}
    }
    xTrace0(msg, TR_SOFT_ERRORS, ("msg.peek_underflow: have to copy"));
    /*
     * The strategy here is to create a new super leaf that contains
     * at least PEEK_LEN bytes of data.
     */
    super = (MsgNodeLeaf)new_page_node(NUM_MSG_PAGES(peek_len));
    xAssert(super);
    super_buf = super->leaf.buf + super->leaf.size - peek_len;

    remaining = peek_len;
    msgWalkInit(&w, m); 
    while (buf = msgWalkNext(&w, &len)) {
	if (remaining < len)
	    len = remaining;
	memcpy(super_buf, buf, len);
	super_buf += len;
	remaining -= len;
	if (!remaining)
	    break;
    }
    msgWalkDone(&w);

    /*
     * Now substitute the superleaf for the ittybitty leaf(s).  We do
     * this by creating a new pair node whose left node is the super
     * leaf and whose right node is the old message tree.
     *
     * Notice how we setup the left and right fragment: the left
     * fragment spans the entire superleaf.  The current message size
     * will be correct because m->f.head is set accordingly.
     * To skip over the ittybitty leaves, we set the offset of the
     * right subtree to M->F.HEAD + PEEK_LEN because that's the offset
     * of the first data byte in the original tree.
     */
    m->f.tree = (MsgNode)new_pair_node(0, super->leaf.size, (MsgNode)super,
				       m->f.head + peek_len,
				       old_len - peek_len, m->f.tree);
    m->first.leaf  = (MsgNodeLeaf)super;
    m->first.head  = 0;
    m->first.tail  = m->first.leaf->leaf.size;
    m->first.isMine = TRUE;
    m->f.head      = m->first.leaf->leaf.size - peek_len;
    m->f.tail      = m->f.head + old_len;
#ifdef OPTION_MSG_CONTROL_RESOURCES
    m->numNodes    += 2;
#endif
    return m->first.leaf->leaf.buf + m->f.head;
}

/* Msg operations: */

/*
 * Initialize message M and return a pointer to a buffer that is LEN
 * bytes long.  LEN may be zero in which case M is empty and a NULL
 * pointer is returned.  If BUF is NULL, the contents of the message
 * is undefined.  If BUF is not NULL, the message is initialized with
 * the LEN data bytes pointed to by BUF.  If deallocator D is NULL,
 * memory necessary for the message data is allocated dynamically from
 * the heap (this implies copying data if BUF is non-NULL).  If D is
 * not NULL, the message directly uses the data buffer pointed to by
 * BUF to store the message data.  The message tool will invoke the
 * deallocator function D as soon as it determines that the data buffer is
 * no longer needed.  The arguments passed to the allocator correspond
 * to the values passed in BUF and LEN.
 */
void *
msgInit(Msg *m, int len, void *buf, MsgDeallocator deallocator)
{
    xTrace4(msg, TR_DETAILED, "msgInit(m=%p,len=%d,buf=%p,freefunc=%p)",
	    m, len, buf, deallocator);
    if (len == 0) {
	m->first.leaf = (MsgNodeLeaf)&emptyLeaf;
	m->first.isMine = FALSE;
	inc_ref((MsgNode)m->first.leaf);/* make sure msgPush stays away */
	m->f.head = m->f.tail = 0;
    }
    else {
	if (!deallocator) {
	    m->first.leaf = (MsgNodeLeaf)new_page_node(NUM_MSG_PAGES(len));
	    m->first.isMine = TRUE;
	    m->f.head = m->first.leaf->leaf.size - len;
	    if (buf)
		memcpy(m->first.leaf->leaf.buf + m->f.head, buf, len);
	}
	else {
	    m->first.leaf = (MsgNodeLeaf)new_buf_node(deallocator, buf, len);
	    m->first.isMine = FALSE;	/* can't write to this guy */
	    m->f.head = 0;
	}
	m->f.tail = m->first.leaf->leaf.size;
    }
    m->first.head = 0;
    m->first.tail = m->first.leaf->leaf.size;
    m->f.tree = (MsgNode)m->first.leaf;
    m->attrs = NULL;
    m->attrLen = 0;
#ifdef OPTION_MSG_CONTROL_RESOURCES
    m->numNodes = 1;
#endif
    xTrace1(msg, TR_DETAILED, "msgInit: returning %p\n",
	    m->first.leaf->leaf.buf + m->f.head);
    return m->first.leaf->leaf.buf + m->f.head;
}

/* Construct a msg that is a copy of another msg. */
inline void
msgInitWithMsg(Msg *m, Msg *template)
{
    xTrace0(msg, TR_FUNCTIONAL_TRACE, "msgInitWithMsg");

    inc_ref(template->f.tree);
    *m = *template;
    m->first.isMine = FALSE;
}

void *
msgRefresh(Msg *m, int len)
{
    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msgRefresh(m=%p,len=%ld)", m, len);

    xAssert(len > 0);
    if ((MsgNode)m->first.leaf != m->f.tree || len > m->first.leaf->leaf.size ||
	    m->f.tree->c.type != MSG_NODE_PAGE || m->f.tree->c.refCnt > 1) {
	msgDestroy(m);
	return msgInit(m, len, 0, 0);
    }
    m->f.tail      = m->first.leaf->leaf.size;
    m->f.head      = m->f.tail - len;
    m->first.head = 0;
    m->first.tail = m->first.leaf->leaf.size;
    m->first.isMine = TRUE;
    m->attrs = NULL;
    m->attrLen = 0;
#ifdef OPTION_MSG_CONTROL_RESOURCES
    m->numNodes    = 1;
#endif
    return m->first.leaf->leaf.buf + m->f.head;
}

void
msgToughDestroy(MsgNode n)
{
    MsgStackEl  stack = 0;
    MsgNode     left, right;
    MsgNodePage npage;
    MsgNodePair npair;
    MsgNodeBuf  nbuf;
    
    xTrace3(msg, TR_MAJOR_EVENTS, "msgToughDestroy(n=%p,refCnt=%d,type=%d)",
	    n, n->c.refCnt, n->c.type);
    do {
	switch(n->c.type) {
	    case MSG_NODE_EMPTY:
	        break;

	    case MSG_NODE_PAGE:
	        npage = (MsgNodePage)n;
#ifdef OPTION_MSG_STATISTICS
	        if (npage->leaf.size > PAGE_NODE_DATA_SIZE) {
		    /*
		     * Remove allocation for any extra pages attached to
		     * this node.
		     */
		    stats.mem_in_use -= npage->leaf.size - PAGE_NODE_DATA_SIZE;
	        }
#endif
	        FREE_MSG_NODE(PAGE, npage);
	        break;

	    case MSG_NODE_BUF:
	        nbuf = (MsgNodeBuf)n;
	        xTrace3(msg, TR_FULL_TRACE,
			"msgToughDestroy: MSG_NODE_BUF %p %p %d",
		        nbuf->buf.free, nbuf->leaf.buf, nbuf->leaf.size);
	        nbuf->buf.free(nbuf->leaf.buf, nbuf->leaf.size);
	        FREE_MSG_NODE(BUF, n);
	        break;

	    case MSG_NODE_PAIR:
	        npair = (MsgNodePair)n;
	        left  = npair->pair.l.tree;
	        right = npair->pair.r.tree;
	        FREE_MSG_NODE(PAIR, npair);
	        --left->c.refCnt;
	        --right->c.refCnt;
                xAssert(left->c.refCnt >= 0 && right->c.refCnt >= 0);
	        if (left->c.refCnt == 0) {
		    n = left;
		    if (left != right && right->c.refCnt == 0)
		        push(&stack, right);
		    continue;
	        }
		else if (right->c.refCnt == 0) {
		    n = right;
		    continue;
	        }
	        break;

	    default:
	        xTrace2(msg, TR_ERRORS,
		        "msgToughDestroy: wrong type %p %d", n, n->c.type);
	        Kabort("msgToughDestroy: wrong type");
	}
	n = (MsgNode)pop(&stack);
    } while (n);
    xTrace1(msg, TR_FUNCTIONAL_TRACE, "msgToughDestroy: done %p", n);
}

/* Discard at most LEN header-bytes from M and return TRUE if successful. */
void
msgDiscard(Msg *m, int len)
{
    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msgDiscard(m=%p,len=%ld)", m, len);

    if (len > msgLength(m))
	len = msgLength(m);
    m->f.head += len;
    m->first.isMine = FALSE;	/* enforce write-once rule */
}

/* Truncate M to length LEN.  LEN must be <= msgLength(m). */
void
msgTruncate(Msg *m, int len)
{
    int delta;

    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msgTruncate(m=%p,len=%ld)", m, len);
    delta = msgLength(m) - len;
    if (delta > 0) {
	m->f.tail -= delta;
        if (m->f.tail < m->first.tail) {
            m->first.tail = m->f.tail;
        }
    }
}

inline void
msgAssign(Msg *m, Msg *value)
{
    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msgAssign(m=%p,value=%p)", m, value);
    if (m != value) {
	msgDestroy(m);
	msgInitWithMsg(m, value);
    }
}

#ifdef OPTION_MSG_CONTROL_RESOURCES
/* Copy M into contiguous storage. */
static void
linearize(Msg *m)
{
    MsgWalk w;
    Msg     tmp;
    char    *new_buf;
    void    *buf;
    int     len;

    xTrace0(msg, TR_FUNCTIONAL_TRACE, "msg.linearize");

    len = msgLength(m);
    if (!len) {
	/* free the resources */
	msgDestroy(m);
	msgInit(m, 0, 0, 0);
	return;
    }

    if ((MsgNode)m->first.leaf == m->f.tree) {
	/* this msg is contiguous already */
	return;
    }

    /* create a new msg with a contiguous buffer and copy data into it: */
    new_buf = msgInit(&tmp, len, 0, 0);
    msgWalkInit(&w, m); 
    while (buf = msgWalkNext(&w, &len)) {
	memcpy(new_buf, buf, len);
	new_buf += len;
    }
    msgWalkDone(&w);
    msgAssign(m, &tmp);
}
#endif /* OPTION_MSG_CONTROL_RESOURCES */

/* Assign to M the concatenation of LEFT and RIGHT. */
void
msgJoin(Msg *m, Msg *left, Msg *right)
{
    int left_len = msgLength(left);
    int right_len = msgLength(right);

    xTrace3(msg, TR_FUNCTIONAL_TRACE, "msgJoin(res=%p,left=%p,right=%p)",
	    m, left, right);

    if (left_len == 0)
	msgAssign(m, right);
    else if (right_len == 0)
	msgAssign(m, left);
    else {
	inc_ref(left->f.tree);
	inc_ref(right->f.tree);
	msgDestroy(m);	/* cleanup my old binding */

	/* create a new pair node: */
	m->f.tree = (MsgNode)new_pair_node(0, left->f.tail, left->f.tree,
			                   right->f.head, right_len,
					   right->f.tree);
	xAssert(m->f.tree);
	m->f.head      = left->f.head;
	m->f.tail      = left->f.tail + right_len; /* m->tail=m->head+len(m) */
	m->first       = left->first;
        if (m != left) {
            m->first.isMine = FALSE;
        }
#ifdef OPTION_MSG_CONTROL_RESOURCES
	m->numNodes = left->numNodes + right->numNodes + 1;
	if (m->numNodes > hard_node_limit)
	    linearize(m);
#endif
    }
}

/* Remove a chunk of length LEN from the head of M and assign it to HEAD. */
void
msgBreak(Msg *m, Msg *fragM, int len)
{
    xTrace3(msg, TR_FUNCTIONAL_TRACE, "msgBreak(m=%p,fragM=%p,len=%ld)",
	    m, fragM, len);
    if (len > msgLength(m))
	len = msgLength(m);
    if (m != fragM) {
	inc_ref(m->f.tree);
	msgDestroy(fragM);
	fragM->f.tree = m->f.tree;
	fragM->f.head = m->f.head;
	fragM->first  = m->first;

	/* ensure at most one message owns first leaf: */
	m->first.isMine = FALSE;

	/* pop off the header */
	m->f.head += len;
    }
    /* set fragment length: */
    fragM->f.tail = fragM->f.head + len;
}

/*
 * Prepare for pushing a header of length LEN onto M.  Returns pointer
 * to header memory of length LEN.
 */
void *
msgPush(Msg *m, int len)
{
    int leftover = m->f.head - m->first.head - len;

    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msgPush(m=%p,len=%ld)", m, len);

    if (leftover >= 0 && m->f.head <= m->first.tail
        && (m->first.isMine
            || (/* see if we can cheaply own the leaf */
                m->f.tree == (MsgNode) m->first.leaf &&
                m->f.tree->c.refCnt == 1 &&
                (m->first.isMine = TRUE)))) /* yes, this is an assignment! */
    {
        m->f.head -= len;
        return m->first.leaf->leaf.buf + leftover;
    }
    /* need a new leaf */
    return push_overflow(m, len);
}

/*
 * Peek at header in M.  Returns pointer to contiguous buffer of length LEN
 * if successful or 0 if not.  Allows non-destructive peeking into a message.
 */
inline void *
msgPeek(Msg *m, int len)
{
    void *where;
    int available;

    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msgPeek(msg=%p,len=%ld)", m, len);

    if (len > msgLength(m) PREDICT_FALSE) {
	return 0;
    }

    /* msg is long enough */

    available = m->first.tail - m->f.head;
    if (len <= available) {
	where = m->first.leaf->leaf.buf + (m->f.head - m->first.head);
	m->first.isMine = FALSE;
    }
    else
	where = peek_underflow(m, len);
    return where;
}

/*
 * Pop a header off of M.  Returns pointer to contiguous buffer of
 * length LEN if successful or 0 if not.
 */
void *
msgPop(Msg *m, int len)
{
    void *where;

    xTrace2(msg, TR_FUNCTIONAL_TRACE, "msgPop(msg=%p,len=%ld)", m, len);
    where = msgPeek(m, len);
    if (!where)
	return 0;
    m->f.head += len;
    return where;
}

/*
 * Message walk functions.
 *
 * Routines for processing the message data; hides the message tree
 * structure and lengths of node buffers.
 */
void
msgWalkInit(MsgWalk *w, Msg *m)
{
    /* next frag to be processed: */
    w->f = m->f;
    /* empty stack: */
    w->stack.tos = w->stack.limit = w->stack.bottom = 0;
}

void *
msgWalkToughNext(MsgWalk *w, int *lenp)
{
    MsgNodeLeaf leaf;
    struct MsgFrag f;

    do {
	next_frag(w, &f);
	if (!f.tree)
	    return 0;
    } while (f.tree->c.type == MSG_NODE_PAIR);

    leaf = (MsgNodeLeaf)f.tree;
    *lenp = f.tail - f.head;
    return leaf->leaf.buf + f.head;
}

/* Associate an attribute with a message. */
XkReturn
msgSetAttr(Msg *message, int name, void *attribute, int length)
{
    /* Only the default attribute (name == 0) is supported at this time. */
    if (name != 0 PREDICT_FALSE) {
	xTrace1(msg, TR_SOFT_ERRORS,
		"msgSetAttr called with unsupported name %d", name);
	return XK_FAILURE;
    }

#ifdef XK_DEBUG
    if (message->attrs)
	xTrace0(msg, TR_SOFT_ERRORS,
		"msgSetAttr overriding previous non-null attribute");
#endif

    message->attrs   = attribute;
    message->attrLen = length;
    return XK_SUCCESS;
}

/* Retrieve an attribute associated with a message. */
void *
msgGetAttr(Msg *message, int name)
{
    /* Only the default attribute (name == 0) is supported at this time. */
    if (name != 0 PREDICT_FALSE) {
	xTrace1(msg, TR_SOFT_ERRORS,
		"msgGetAttr called with unsupported name %d", name);
	return NULL;
    }
    return message->attrs;
}

/*
 * Free up unnecessary resources allocated to M.
 */
void
msgCleanUp(Msg *m)
{
#ifdef OPTION_MSG_CONTROL_RESOURCES
    if (msgLength(m) > 0) {
	if (m->numNodes > soft_node_limit)
	    linearize(m);
    }
    else {
	msgDestroy(m);
	msgInit(m, 0, 0, 0);
    }
#else
    printf("msgCleanUp: "
	   "file was compiled without OPTION_MSG_CONTROL_RESOURCES\n");
#endif
}

#ifndef NDEBUG

#include <stdio.h>

static char blanks[] = "                                ";

static void
show_pair(MsgFrag f, int indent)
{
    MsgNodePair p = (MsgNodePair)f->tree;

    if (indent >= sizeof(blanks))
	indent = sizeof(blanks) - 1;
    printf("%.*sPair: f=(%4d,%4d), refCnt=%2d, l=(%4d,%4d), r=(%4d,%4d)\n",
	   indent, blanks, f->head, f->tail, p->c.refCnt,
	   p->pair.l.head, p->pair.l.tail, p->pair.r.head, p->pair.r.tail);
}

static void
show_leaf(MsgFrag f, int indent)
{
    MsgNodeLeaf l = (MsgNodeLeaf)f->tree;
    u_char *cp;
    int i;

    if (indent >= sizeof(blanks))
	indent = sizeof(blanks) - 1;
    printf("%.*sLeaf: f=(%4d,%4d), refCnt=%2d, buf=%p, size=%4d, data=<",
	   indent, blanks, f->head, f->tail, l->c.refCnt,
	   l->leaf.buf, l->leaf.size);

    /* Display at most the first 8 characters of data */
    cp = l->leaf.buf + f->head;
    for (i = 0; i < f->tail - f->head; ++i) {
	if (i >= 8) {
	    printf(",..");
	    break;
	}
	if (i > 0)
	    printf(",");
	printf("%.2x", *cp++);
    }
    printf(">\n");
}

#endif /* !NDEBUG */

void
msgShow(Msg *m)
{
#ifndef NDEBUG
    MsgStackEl     stack = 0;
    MsgWalk        w;
    struct MsgFrag f;
    MsgNode        expected;
    long           level = 0;

    xTrace1(msg, TR_FUNCTIONAL_TRACE, "msgShow(m=%p)", m);

    printf("Msg: f=(%4d,%4d), first=(buf=%p, size==%d, offset=%d, isMine=%d), "
	   "numNodes=%d\n",
	   m->f.head, m->f.tail, m->first.leaf->leaf.buf, 
           m->first.leaf->leaf.size, m->first.tail, m->first.isMine,
#ifdef OPTION_MSG_CONTROL_RESOURCES
	   m->numNodes
#else
	   0
#endif
	   );

    push(&stack, (void *)level);
    msgWalkInit(&w, m);
    while (w.f.tree) {
	expected = w.f.tree;
	next_frag(&w, &f);
	if (f.tree != expected) {
	    /* a pair node got skipped */
	    push(&stack, (void *)level);
	    push(&stack, (void *)level);
	}
	level = (long)pop(&stack);
	if (f.tree->c.type == MSG_NODE_PAIR) {
	    show_pair(&f, level + 1);
	    push(&stack, (void *)(level + 1));
	    push(&stack, (void *)(level + 1));
	}
	else
	    show_leaf(&f, level + 1);
    }
    msgWalkDone(&w);
#else
    printf("msgShow: file was compiled with NDEBUG\n");
#endif
}

void
msgStats(void)
{
#ifdef OPTION_MSG_STATISTICS
    printf("message statistics (current, high water):\n"
	   "    Pair nodes:      %6d, %6d\n"
	   "    Page nodes:      %6d, %6d\n"
	   "    Buffer nodes:    %6d, %6d\n"
	   "Total memory used:   %9d, %9d\n",
	   stats.PAIRs_in_use, stats.PAIRs_hiwat,
	   stats.PAGEs_in_use, stats.PAGEs_hiwat,
	   stats.BUFs_in_use, stats.BUFs_hiwat,
	   stats.mem_in_use, stats.mem_hiwat);
#else
    printf("msgStats: file was compiled without OPTION_MSG_STATISTICS\n");
#endif
}
