/*     
 * $RCSfile: nx_drv.c,v $
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 *
 * $Log: nx_drv.c,v $
 * Revision 1.3  1997/05/19 23:00:55  dorgival
 * Cleaned the code, simplified synchronization
 *
 * Revision 1.2  1997/05/15 20:23:06  dorgival
 * This version works correctly, but performance is bad.
 * Version 1.1 had errors in the timing routine and/or the synchronization,
 * so it only seemed faster than hippi_drv, but it wasn't.
 *
 * Revision 1.1  1997/05/13 17:27:22  dorgival
 * Initial revision
 *
 */
/*
 * x-kernel "NX-driver". Replaces the hippi_drv driver with an interface
 * using NX messages which communicate with another program in the application
 * space, running *in* the HiPPI I/O node. The idea is to remove NORMA from
 * the loop.
 */

#include <fcntl.h>
#include <netinet/in.h>
/*
 * #include <nx.h> Replaced by:
 */

extern void crecv( long, void *, long);
extern void crecvx( long, void *, long, long, long, long* );
extern void csend( long, void *, long, long, long);
extern long iprobe( long);
extern long iprobex( long, long, long, long *);
extern long isend( long, void *, long, long, long);
extern void msgwait( long);
extern long mynode( void);
extern long numnodes( void);
extern long myptype( void);
extern long msginfo[8];

int my_node, num_nodes;

#include "xk_thread.h"
#include "shepherd_pool.h"
#include "hippi_hdrs.h"
#include "xkernel.h"
#include "eth.h"
#include "eth_i.h"
#include "bind_wrapper.h"

#define ULP            (0x81)  /* In RFC 2067 it's 0x04, but for now use this.*/
#define PORT             (-1)
#define MAX_MTU       (65280)  /* See RFC 2067 */
#define HIPPI_FP_HDR      (8)
#define HIPPI_LE_HDR     (24)
#define LLC_SNAP_HDR      (8)
#define MAX_PACKET    (HIPPI_FP_HDR+HIPPI_LE_HDR+LLC_SNAP_HDR+MAX_MTU)

#define MIN_SHEPHERDS      2
#define MAX_SHEPHERDS      4
#define MIN_HOLDER         4
#define MAX_HOLDER         8

#define NX_TYPE_IOID    0
#define NX_TYPE_REG     1
#define NX_TYPE_FRGLST  2
#define NX_TYPE_FRGREQ  3   /* not used */
#define NX_TYPE_FRG     4
#define NX_TYPE_MSGLEN  5
#define NX_TYPE_MSGREQ  6   /* not used */
#define NX_TYPE_MSG     7
#define NX_TYPE_SYNC    8

int hippi_node = 0;

static int instantiated = 0;

typedef struct {
    char               devname[128];
    int                ihandle;
    int                mtu;
    int                shepherds;
    Protl              arp;
    ETHhost            myHost;          /* IEEE address for interface */
    Map                iMap;            /* IEEE address to I-Field */
    shepherd_pool_t    shepherd_pool;
    hippi_header_out_t outgoing_header;
    int                incoming_size;
} PState;

typedef struct {
    char* data;
    int   length;
} msg_frag_t;

typedef msg_frag_t frag_array_t[128];

int tracenx_drv;

int tracenx_drvp;

       void     nx_drv_init( Protl self );
static XkHandle nxPush( Sessn self, Msg* msg );
static XkReturn nxOpenEnable( Protl self, Protl hlp, Protl hlpType, Part* p);
static int      nxControl( Protl s, int op, char* buf, int len );
static void     nx_shepherd( Protl self, Msg* msg );
static void     hippi_print( hippi_header_any_t* hh, int outgoing );
static void     nx_hippi_filter( PState* ps );
static void     nx_loop( void* arg );
static void     main_nx_handler( Protl self );

static XkReturn readMTU(       Protl, char**, int, int, VOID* );
static XkReturn readShepherds( Protl, char**, int, int, VOID* );
static XkReturn readDevice(    Protl, char**, int, int, VOID* );
static XkReturn readImap(      Protl, char**, int, int, VOID* );

void nx_print( hippi_header_any_t* hh, int outgoing );

/**************************************************************************
 * This is just a hack to implement a simple synchronization among x-kernel
 * processes in a way similar to gsync(). The problem is that nx is not
 * thread safe and the process in the hippi node cannot call that function,
 * so we use it to synchronize the others. It is very basic, uses busy
 * waiting, and can only be used by one thread in each process. More than
 * one thread will cause unexpected behavior, but is not checked.
 *************************************************************************
 */

int gsync_requested = 0;
int gsync_achieved  = 0;

static ProtlRomOpt protlOpts[] = {
    { "mtu",       3, readMTU },
    { "shepherds", 3, readShepherds },
    { "device",    4, readDevice },       /* device <filename> <ieee802 addr> */
    { "imap",      4, readImap },         /* imap <ieee802 addr><i-field><ULP>*/
};

#define ROMARG1 (2)
#define ROMARG2 (3)

static XkReturn
readMTU( Protl self, char** str, int nFields, int line, VOID* arg )
{
    XkReturn ret;
    PState*  ps = (PState*)self->state;

    ret = sscanf(str[ROMARG1],"%d",&ps->mtu) >= 1? XK_SUCCESS: XK_FAILURE;

    xTrace1(nx_drv,TR_DETAILED,"readMTU: will use mtu %d",
            ps->mtu);
    return ret;
}

static XkReturn
readShepherds( Protl self, char** str, int nFields, int line, VOID* arg )
{
    XkReturn ret;
    PState*  ps = (PState*)self->state;

    ret = sscanf(str[ROMARG1],"%d",&ps->shepherds) >= 1? XK_SUCCESS:XK_FAILURE;

    xTrace1(nx_drv,TR_DETAILED,"readShepherds: will use %d shepherds",
            ps->shepherds);
    return ret;
}

static XkReturn
readDevice( Protl self, char** str, int nFields, int line, VOID* arg )
{
    PState* ps = (PState*)self->state;

    if (sscanf(str[ROMARG1],"%s",ps->devname) < 1) {
	xError("readDevice: error while reading device name");
	return XK_FAILURE;
    }
    if (str2ethHost(&ps->myHost,str[ROMARG2]) != XK_SUCCESS) {
	xError("readDevice: error while reading ieee address of interface");
	return XK_FAILURE;
    }
    xTrace2(nx_drv,TR_DETAILED,"readDevice: using device %s, address %s\n",
            ps->devname, ethHostStr(&ps->myHost) );
    return XK_SUCCESS;
}

static XkReturn
readImap( Protl self, char** str, int nFields, int line, VOID* arg )
{
    PState* ps = (PState*)self->state;
    ETHhost ieee_addr;
    int     ifield;

    if (str2ethHost(&ieee_addr,str[ROMARG1]) != XK_SUCCESS) {
	xError("readImap: error while reading ieee address");
	return XK_FAILURE;
    }
    if (sscanf(str[ROMARG2],"%x",&ifield) < 1) {
	xError("readImap: error while reading I-field");
	return XK_FAILURE;
    }

    if ( mapBind(ps->iMap,&ieee_addr,(void*)ifield) == ERR_BIND ){
	xError("readImap: unable to add new mapping");
	return XK_FAILURE;
    }
    xTrace2(nx_drv,TR_DETAILED,"readImap: bound %s to ifield 0x%x",
            ethHostStr(&ieee_addr), ifield );
    return XK_SUCCESS;
}

/*
 * main_nx_handler: main thread in compute nodes, interfaces with hippi node.
 */

static void
main_nx_handler( Protl self )
{
    PState*       ps    = (PState*) self->state;
    Msg*          msg;
    int           total_length;
    char*         buf;
    msg_holder_t* holder;

    while ( 1 ) {

	crecv( NX_TYPE_MSGLEN, &total_length, sizeof(total_length) );
	
	xTrace1(nx_drv,TR_FULL_TRACE,
		"main_nx_handler: incoming nx msg %d bytes", total_length );

	if ( ( total_length < 0 ) || ( total_length > MAX_PACKET ) ) {
	    xTrace1(nx_drv,TR_ERRORS,"main_nx_handler: msg with %d bytes",
		    total_length);
	    return;
	}

	holder = get_free_holder( &ps->shepherd_pool );

	xTrace1(nx_drv,TR_DETAILED,"main_nx_handler: got holder, msg 0x%x",
		&(holder->msg) );

	if ( !holder ) {
	    xTrace0(nx_drv,TR_ERRORS,"main_nx_handler: no holders");
	    holder = X_NEW(msg_holder_t);
	} else {
	    msgDestroy( &holder->msg );
	}

	xTrace0(nx_drv,TR_FULL_TRACE,"main_nx_handler: construct new msg");

	buf = msgConstructAllocate( &holder->msg, total_length );

	xTrace1(nx_drv,TR_FULL_TRACE, "main_nx_handler: wait for msg (%d)",
		NX_TYPE_MSG );
	/*
	csend( NX_TYPE_MSGREQ, &my_node,sizeof(my_node),hippi_node,myptype());
	*/
	crecv( NX_TYPE_MSG, buf, total_length );

	xTrace0(nx_drv,TR_FULL_TRACE,
	      "main_nx_handler: calling trigger_shepherd for msg handling");
	trigger_shepherd( &ps->shepherd_pool, holder );
	xTrace0(nx_drv,TR_FULL_TRACE,
	      "main_nx_handler: returned from trigger_shepherd");
    }
}

static void
nx_shepherd( Protl self, Msg* msg )
{
    ETHhdr             eth_hdr;
    hippi_header_in_t* hippi_header_in;
    xk_thread_t*       my_thread = threadSelf();

    xTrace3(nx_drv,TR_FULL_TRACE,"nx_shepherd(0x%x): msg 0x%x, %d bytes",
	    my_thread, msg, msgLength(msg) );
    if ( ! self->up ) {
	xTrace0(nx_drv, TR_ERRORS, "nx_shepherd: no upper protocol!");
	return;
    }

    /*
     * Build xkernel messages from the mach message, throwing away
     * the HiPPI headers
     */
    hippi_header_in = msgPop(msg, HIPPI_FP_HDR+HIPPI_LE_HDR+LLC_SNAP_HDR );
    if ( hippi_header_in == NULL ) {
	xTrace0(nx_drv, TR_SOFT_ERRORS,
		"nx_shepherd: incoming message too small!");
	return;
    }

    xIfTrace(nx_drv,TR_GROSS_EVENTS)
    {
	xIfTrace(nx_drv,TR_MAJOR_EVENTS) {
	} else {
	    hippi_print( (hippi_header_any_t*) hippi_header_in, 0 );
	}
    }

    bcopy(&hippi_header_in->le.src_ieee_address,&eth_hdr.src, sizeof(ETHhost));
    bcopy(&hippi_header_in->le.dst_ieee_address,&eth_hdr.dst, sizeof(ETHhost));
    eth_hdr.type = hippi_header_in->llc.ether_type;

    xTrace3(nx_drv,TR_FULL_TRACE,
            "nx_shepherd: building attr (src=%s,dst=%s,type=0x%x)",
	    ethHostStr(&eth_hdr.src), ethHostStr(&eth_hdr.dst), eth_hdr.type );

    msgSetAttr(msg, 0, &eth_hdr, sizeof(ETHhdr) );
    /*
     * find the self and upper protocols from some known structure
     */
    
    xTrace0(nx_drv,TR_FULL_TRACE,"nx_shepherd: calling xDemux");
    xDemux( xGetUp(self), (Sessn)self, msg );

}

void
nx_drv_init( Protl self )
{
    PState* ps;

    if ( instantiated ) {
	xError("Only one instance of nx_drv allowed at this point, sorry!");
	exit(1);
    } else {
	instantiated = 1;
    }

    ps = X_NEW(PState);
    bzero((char *)ps, sizeof(PState));
    self->state = (VOID *)ps;

    ps->mtu        = MAX_MTU;
    ps->shepherds  = MIN_SHEPHERDS;
    ps->devname[0] = '0';
    ps->iMap       = mapCreate(47, sizeof(ETHhost));

    xTrace1(nx_drv,TR_FULL_TRACE,"nx_drv_init(0x%x) entered",self);
    findProtlRomOpts(self, protlOpts, sizeof(protlOpts)/sizeof(ProtlRomOpt),0);

    my_node   = mynode();
    num_nodes = numnodes();

    if ( my_node == 0 ) {
	xTrace1(nx_drv,TR_FULL_TRACE,"node %d is hippi node, running filter",
	        my_node );
	nx_hippi_filter( ps );
	xTrace1(nx_drv,TR_ERRORS,"node %d returned from filter", my_node );
	/* Should never return */
	exit(0);
    }

    xTrace1(nx_drv,TR_FULL_TRACE,"node %d is compute node", my_node );

    self->push         = nxPush;
    self->controlprotl = nxControl;
    self->openenable   = nxOpenEnable;
    self->up           = NULL;

    /*
     * At this point we have myHost filled from the rom file, we must
     * adapt it to the real address based on this node's number:
     */

    shepherd_pool_init( &ps->shepherd_pool,
                        MIN_HOLDER, MAX_HOLDER,
			(shepherd_func_t)nx_shepherd, self,
			ps->shepherds, MAX_SHEPHERDS );

    /*
     * The dst_sw_address is a hack at this point. As a general solution,
     * a packet filter should be programmed on a per session basis
     * based on the TCP port and other fields. At this point we are
     * simplifying things by just looking at a single field at the
     * header, and dst_sw_address is used in this case. This restricts
     * connections from node n in one partition to reach only node n on
     * the other partition, but that will do for now.
     */
    bzero(&ps->outgoing_header,sizeof(hippi_header_out_t));
    ps->outgoing_header.fp.fields.ulp     = ULP;
    ps->outgoing_header.fp.fields.p       = 1;
    ps->outgoing_header.fp.fields.b       = 0;
    ps->outgoing_header.fp.fields.d1_size = 3;
    ps->outgoing_header.fp.fields.d2_off  = 0;
    ps->outgoing_header.le.fc             = 0;
    ps->outgoing_header.le.w              = 0;
    ps->outgoing_header.le.m_type         = 0;
    ps->outgoing_header.le.dst_sw_address = my_node; /* HACK!!! */
    ps->outgoing_header.le.dat            = 0;
    ps->outgoing_header.le.sat            = 0;
    ps->outgoing_header.le.src_sw_address = 0;
    ps->outgoing_header.llc.dsap          = 0xAA;
    ps->outgoing_header.llc.ssap          = 0xAA;
    ps->outgoing_header.llc.control       = 03;
    ps->outgoing_header.llc.ether_type    = 0x0800;

    xTrace0(nx_drv,TR_FULL_TRACE,"nx_drv_init: waiting for hippi_node id");
    crecv( NX_TYPE_IOID, &hippi_node, sizeof(hippi_node) );

    xTrace2(nx_drv,TR_FULL_TRACE,"nx_drv_init: hippi_node is %d, send REG %d",
            hippi_node, my_node );
    csend( NX_TYPE_REG, &my_node, sizeof(my_node), hippi_node, myptype() );

    xTrace0(nx_drv,TR_FULL_TRACE,"nx_drv_init: notifying main thread");

    PTHREAD_MUTEX_LOCK( &main_thread_desc.mutex );
    main_thread_desc.func = (void(*)(void*))main_nx_handler;
    main_thread_desc.arg  = self;
    PTHREAD_MUTEX_UNLOCK( &main_thread_desc.mutex );
    PTHREAD_COND_SIGNAL( &main_thread_desc.cond );
}

static XkHandle
nxPush( Sessn self, Msg* msg )
{
    ETHhdr*              hdr;
    PState*              ps;
    int                  hlp_msg_len;
    Msg*                 msg_copy;
    char*                ptr;
    int                  ifield;
    ETHhost              ethBcastHost;
    hippi_header_out_t   hippi_header_out;
    int                  frgreq;

    MsgWalk       walk;
    frag_array_t  frag_array;
    msg_frag_t*   frag;
    int           total_length;

    hdr = msgGetAttr(msg, 0);
    ps  = (PState*) (self->state);
    ethBcastHost.high = 0xffff;
    ethBcastHost.mid  = 0xffff;
    ethBcastHost.low  = 0xffff;

    xTrace2(nx_drv,TR_FUNCTIONAL_TRACE,"nxPush(0x%x,0x%x) called",
            self, msg );

    if ( ETH_ADS_EQUAL( (hdr->dst), ethBcastHost ) ) {
	xError("nxPush: not capable of broadcasts, yet!");
	return XMSG_NULL_HANDLE;
    }

    hlp_msg_len = msgLength(msg);

    xTrace2(nx_drv,TR_FUNCTIONAL_TRACE,"nxPush: msgLength(0x%x) = %d",
            msg, hlp_msg_len );
    /*
     * Fill in predefined fields in header, and then go on with 
     * the variable ones (i-field, src/dest addresses, length)
     */

    if ( mapResolve( ps->iMap, &(hdr->dst), (void**)&ifield ) != XK_SUCCESS){
	xTrace1(nx_drv,TR_ERRORS,
	        "nxPush: unable to map ieee address %s to i-field",
		ethHostStr(&(hdr->dst)) );
	return XMSG_NULL_HANDLE;
    } else {
	xTrace2(nx_drv,TR_FULL_TRACE,
	        "nxPush: ieee address %s mapped to i-field 0x%x",
		ethHostStr(&(hdr->dst)), ifield );
    }

    bcopy(&(ps->outgoing_header),&hippi_header_out,sizeof(hippi_header_out_t));

    hippi_header_out.cci.Ifield         = htonl(ifield);
    hippi_header_out.fp.words.w1        = htonl(hippi_header_out.fp.words.w1);
    hippi_header_out.fp.fields.d2_size  = htonl(hlp_msg_len+8);

    hippi_header_out.llc.ether_type     = hdr->type;
    bcopy(&(hdr->src),&hippi_header_out.le.src_ieee_address,sizeof(ETHhost));
    bcopy(&(hdr->dst),&hippi_header_out.le.dst_ieee_address,sizeof(ETHhost));

    ptr = msgPush(msg, sizeof(hippi_header_out_t));
    xAssert(ptr);
    bcopy(&hippi_header_out,ptr,sizeof(hippi_header_out_t));

    /* At this point the message is complete and ready to be sent. */

    /*
     * First, determine what the fragment sizes are and notify the hippi node
     */
    total_length = 0;
    frag = &(frag_array[1]);
    msgWalkInit( &walk, msg );
    while ((frag->data=msgWalkNext(&walk,&(frag->length))) != NULL ) {
	total_length += frag->length;
	frag++;
    }
    frag->length = 0;
    frag_array[0].length = total_length;
    msgWalkDone( &walk );

    csend(NX_TYPE_FRGLST, frag_array,sizeof(frag_array),hippi_node,myptype());

    /*
     * Now that the hippi node is ready to allocate space for the whole
     * message, send all fragments.
     */

    frag = &(frag_array[1]);
    while ( frag->length > 0 ) {
	/*
	crecv(NX_TYPE_FRGREQ, &frgreq, sizeof(frgreq) );
	*/
	xTrace2(nx_drv,TR_DETAILED,
		       "nxPush: send msg type %d to node %d",
		       NX_TYPE_FRG, hippi_node );
	csend(NX_TYPE_FRG,frag->data,frag->length,hippi_node,myptype());
	frag++;
    }

    xTrace0(nx_drv,TR_DETAILED,"nxPush: all fragments sent");

    return XMSG_NULL_HANDLE;
}

static XkReturn
nxOpenEnable( Protl self, Protl hlp, Protl hlpType, Part* p)
{
    if ( self->up ) {
	xError("hippiOpenEnable called multiple times!");
	return XK_FAILURE;
    }
    self->up = hlp;
    return XK_SUCCESS;
}

static int
nxControl( Protl s, int op, char* buf, int len )
{
    PState* ps = (PState*)s->state;

    switch (op) {
	/* NOTE: must also implement ARP registration. */

      case GETMYHOST:
	checkLen(len, sizeof(ETHhost));
	bcopy((char *) &ps->myHost, buf, sizeof(ETHhost));
	return (sizeof(ETHhost));
    
      case ETH_REGISTER_ARP:
        /*
         * ARP registers itself with us so we can ask it to perform
         * callbacks in order to simulate hardware broadcast.
         * (Not used at this point)
         */
        checkLen(len, sizeof(Protl));
        ps->arp = *(Protl *)buf;
        return 0;


      case GETMAXPACKET: case GETOPTPACKET:
	checkLen(len,sizeof(int));
	*(int*)buf = ps->mtu;
        return sizeof(int);

      default:
        return -1;
    }
}

void
hippi_print( hippi_header_any_t* hh, int outgoing )
{
    char                    copy[64*1024];
    union  hippi_fp_header* fpptr;
    struct hippi_le_header* leptr;
    struct llc*             llcptr;
    char*                   msg;
    int                     size;

    if ( outgoing ) {
	size = ntohl(hh->out.fp.fields.d2_size)+sizeof(hippi_header_out_t);
    } else {
	size = ntohl(hh->in.fp.fields.d2_size)+sizeof(hippi_header_in_t);
    }
    size -= 8; /* 8 bytes of llc are in d2_size byt are part of hippi_header */

    fprintf(stderr,"Message size: %d bytes\n", size);

    memcpy(&copy,hh,size);
    hh = (hippi_header_any_t*)copy;

    if ( outgoing ) {
	fpptr  = &(hh->out.fp);
	leptr  = &(hh->out.le);
	llcptr = &(hh->out.llc);
	msg    = copy+sizeof(hippi_header_out_t);
	fprintf(stderr,"Outgoing: I-field is 0x%02x\n", hh->out.cci.Ifield );
    } else {
	fpptr  = &(hh->in.fp);
	leptr  = &(hh->in.le);
	llcptr = &(hh->in.llc);
	msg    = copy+sizeof(hippi_header_in_t);
    }
    

    fpptr->words.w1 = ntohl(fpptr->words.w1);
    fpptr->words.w2 = ntohl(fpptr->words.w2);

    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|      %02x       |%d|%d|        %03x          |      %02x       |  %x  |\n",
    fpptr->fields.ulp&0xFF, fpptr->fields.p&0x01, fpptr->fields.b&0x01, fpptr->fields.res, fpptr->fields.d1_size, fpptr->fields.d2_off);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|                     d2_size = %3d                             |\n",
    fpptr->fields.d2_size);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|  %x  |%x|   %x   |                  %06x                       |\n",
    leptr->fc, leptr->w, leptr->m_type, leptr->dst_sw_address);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|   %x   |   %x   |                  %06x                       |\n",
    leptr->dat, leptr->sat, leptr->src_sw_address);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|              %x                |             %04x              |\n",
    leptr->res, leptr->dst_ieee_address.high);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|              %04x                           %04x              |\n",
    leptr->dst_ieee_address.mid, leptr->dst_ieee_address.low);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|              %x                |             %04x              |\n",
    leptr->le_local_admin, leptr->src_ieee_address.high);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|              %04x                           %04x              |\n",
    leptr->src_ieee_address.mid, leptr->src_ieee_address.low);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|       %02x      |       %02x      |       %02x      |       %02x      |\n",
    llcptr->ssap, llcptr->dsap, llcptr->control, llcptr->org_code[3]);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
    fprintf(stderr,
    "|       %02x      |       %02x      |             %04x              |\n",
    llcptr->org_code[2], llcptr->org_code[0], llcptr->ether_type);
    fprintf(stderr,
    "+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n");
}

/*
 * nx_hippi_filter: thread responsible for the hippi_reads in the hippi node.
 */

void
nx_hippi_filter( PState* ps )
{
    int i;
    char* hippi_packet;
    int   length;

    hippi_header_in_t* hh;
    int                compute_node, node_req;

    xTrace1(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: on node %d",my_node);
    csend( NX_TYPE_IOID, &my_node, sizeof(my_node), -1, myptype() );
    
    xTrace1(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: waiting for %d nodes",
            numnodes() - 1 );
    
    /* Wait for other nodes to register. In a final version this should
     * initialize a map - and maybe this might have to be processed at
     * any time, not only at startup.
     */
    for (i=1;i<numnodes();++i) {
	crecv( NX_TYPE_REG, &compute_node, sizeof(compute_node) );
	xTrace1(nx_drv,TR_FULL_TRACE,"Node %d registered\n", compute_node );
    }
    xTrace1(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: got all nodes, open %s",
            ps->devname );

    if ( ( ps->ihandle = hippi_open( ps->devname,  HIPPI_RAW, O_RDWR ) ) < 0 ) {
	xError("nx_hippi_filter: unable to open device\n");
	exit(1);
    }

    if ( hippi_bind( ps->ihandle, ULP, PORT ) < 0 ) {
	xError("nx_hippi_filter: hippi_bind failed");
	exit(1);
    }

    xTrace0(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: main_thread is be nx_loop" );

    PTHREAD_MUTEX_LOCK( &main_thread_desc.mutex );
    main_thread_desc.func = (void(*)(void*))nx_loop;
    main_thread_desc.arg  = (void*) ps;
    PTHREAD_MUTEX_UNLOCK( &main_thread_desc.mutex );
    PTHREAD_COND_SIGNAL( &main_thread_desc.cond );

    xTrace0(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: loop forever" );

    while (1) {
	xTrace0(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: block on hippi_read" );
	length = hippi_read( ps->ihandle, &hippi_packet, MAX_PACKET );
	xTrace1(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: read %d bytes",
	        length );

        if ( ( length < 0 ) || ( length > MAX_PACKET ) ) {
            xTrace1(nx_drv,TR_ERRORS,
	            "nx_hippi_filter: hippi_read failed, len=%d", length );
            if ( length > 0 ) {
                hippi_memfree( ps->ihandle, hippi_packet, length, TRUE );
            }
            continue;
        }
	/*
	hippi_print( (hippi_header_any_t*) hippi_packet, 0 );
	*/
	xTrace1(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: append msg 0x%x",
	        hippi_packet);

	/* 
	 * New code to send nx message right from this thread:
	 */
	hh           = (hippi_header_in_t*)hippi_packet;
	compute_node = hh->le.dst_sw_address;
	
	xTrace2(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: msg for node %d, len %d",
		compute_node, length );

	csend(NX_TYPE_MSGLEN,&(length),sizeof(length),compute_node, myptype() );
	
	/*
	crecvx(NX_TYPE_MSGREQ,&node_req,sizeof(node_req),
	       compute_node, -1, msginfo );
	*/
	
	if ( compute_node != node_req ) {
	    xTrace2(nx_drv,TR_ERRORS,"compute node(%d) != reqnode(%d)",
		    compute_node, node_req );
	}

	xTrace1(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: call csend for buf 0x%x",
		hippi_packet );
	
	csend( NX_TYPE_MSG, hippi_packet, length, compute_node, myptype() );

	xTrace0(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: releasing buffer");

	hippi_memfree(ps->ihandle,hippi_packet,length,1);

	xTrace0(nx_drv,TR_FULL_TRACE,"nx_hippi_filter: buffer released");
    }
}

/*
 * nx_loop: main thread in the hippi node
 */

void
nx_loop( void* arg )
{
    PState* ps = (PState*) arg;
    char*   data;
    int     length;

    frag_array_t fragments;
    msg_frag_t*  frag;
    char*        nx_data_ptr;
    int          node_req;

    long nx_info[8];
    long  nx_node;
    long  gsync_cnt = 1;

    xTrace0(nx_drv,TR_FULL_TRACE,"nx_loop: started");


    while (1) {
	xTrace0(nx_drv,TR_FULL_TRACE,"nx_loop: wait for FRGLST" );

	crecvx( NX_TYPE_FRGLST, fragments, sizeof(fragments),-1,-1,nx_info);
	nx_node = nx_info[2];

	if ( fragments->length == 0 ) {
	    xTrace1(nx_drv,TR_ERRORS,"nx_loop: null message (%d)",
		    NX_TYPE_FRGLST );
	    continue;
	}

	length = LEN_ROUND8(fragments->length);
	data   = hippi_memget( ps->ihandle, length );

	xTrace3(nx_drv,TR_FULL_TRACE,
		"nx_loop: got request from node %d, %d byte msg, alloc %d",
		nx_node, fragments->length, length);

	frag = &(fragments[1]);
	nx_data_ptr = data;

	while ( frag->length > 0 ) {
	    /*
	    csend( NX_TYPE_FRGREQ,&my_node,sizeof(my_node),nx_node,myptype());
	    */
	    crecvx( NX_TYPE_FRG, nx_data_ptr, frag->length, nx_node,
		    -1, msginfo );
	    xTrace1(nx_drv,TR_FULL_TRACE,"nx_loop: got %d bytes of msg",
		    frag->length);
	    nx_data_ptr += frag->length;
	    frag++;
	}
	xTrace1(nx_drv,TR_FULL_TRACE,
		"nx_loop: got whole msg (%d bytes), call hippi_write",
		 length );
	/*
	hippi_print( (hippi_header_any_t*) data, 1 );
	*/

	if ( hippi_write( ps->ihandle, data, length ) < 0 ){
	    xError("nx_loop: hippi_write failed");
	}
	xTrace0(nx_drv,TR_FULL_TRACE,"nx_loop: hippi_write returned");
	hippi_memfree(ps->ihandle,data,length,FALSE);
    }
}
