/*
 * common_test.c
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 *
 * $Revision: 1.2 $
 * $Date: 1996/02/01 15:36:43 $
 */

/*
 * This code implements a ping-pong test for another protocol.
 * The same protocol code is used for both the client and the server.  
 *
 * It expects the definitions of following macros which describe
 * the lower protocol:
 *
 *	HOST_TYPE (e.g., ETHhost)
 *	INIT_FUNC (e.g., ethtest_init)
 *	TRACE_VAR (e.g., ethtestp)
 *	PROT_STRING (e.g., "eth")
 *
 * It also needs declarations for Client and Server addresses, e.g.:
 *	HOST_TYPE ServerAddr = { 0xC00C, 0x4558, 0x04d2 };  
 *	HOST_TYPE ClientAddr = { 0xC00C, 0x4558, 0x2694 };  
 *
 * It also needs definitions for the following macros controlling the test:
 *	TRIPS  (number of round trips to make)
 *	TIMES  (number of tests for each length)
 *	DELAY  (number of seconds to delay between each test
 *		(and timeout for declaring failure))
 *
 * Define the macro TIME to do timing calculations.
 *
 * *Finally*, define an array of integers 'lens[]' with the lengths for the
 * tests:
 *
 *	static int lens[] = { 
 *		  1, 200, 400, 600, 800, 1000, 1200
 *	};
 *
 */

/* STREAM_TEST may not work if simul>1 */

#include "xkernel.h"
#ifndef XKMACHKERNEL
#include "x_stdio.h"
#else
#include "assert.h"
#endif ! XKMACHKERNEL
#ifdef XK_MEMORY_THROTTLE
#include "xk_malloc.h"
#endif XK_MEMORY_THROTTLE

#ifdef __STDC__

int	INIT_FUNC( Protl );

typedef struct vars {
  HOST_TYPE 	myHost;
  HOST_TYPE	serverAddr, clientAddr;
  Protl		myProtl;
  Sessn		clientDownSes;
  IPhost 	host;
  char 		*hostStr;
  int 		count;
  int		sentMsgLength;
  int		serverParam, clientParam, testsSpecified, myTestSpecified;
  char		*serverString;
  int		trips;
  int		simul;  /* number of simultaneous messages circulating */
/* if simul>1, the trips are randomly divided among the messages */
  int		clientTotal;   
  XTime 	starttime;
  int		receivedLength;
  int 		gotone;
  int		noResponseCount;
  Msg		msga[100];
  int		msgi;
} Vars;

static Vars *VarsArray[100];
static int  VarsArrayN=0;

static	void 	client( Event, void * );
static 	void 	server( Event, void * );
static 	int	isServerDefault( Vars *);
static 	int	isClientDefault( Vars *);
static 	int	(*isServer)( Vars *) = isServerDefault;
static 	int	(*isClient)( Vars *) = isClientDefault;
static 	int	defaultRunTest( Vars *, int);
static  void	testInit( void );



#else

static	void 	client();
static 	void 	server();
static 	int	isServerDefault();
static 	int	isClientDefault();
static 	int	(*isServer)() = isServerDefault;
static 	int	(*isClient)() = isClientDefault;
/* static 	int	asyncRunTest(); */
static 	int	defaultRunTest();
static  void	testInit();

#endif __STDC__

#ifdef RPCTEST

#ifdef __STDC__
static 	XkReturn	testCallDemux( Protl, Sessn, Msg *, Msg * );
#else
static 	XkReturn	testCallDemux();
#endif __STDC__

#else

static 	XkHandle	clientPushResult = XMSG_NULL_HANDLE;
static 	XkHandle	serverPushResult = XMSG_NULL_HANDLE;

#ifdef __STDC__
static 	XkReturn	test_clientdemux( Protl, Sessn, Msg * );
static  XkReturn	defaultServerDemux( Protl, Sessn, Msg * );
#else
static 	XkReturn	test_serverdemux();
static 	XkReturn	test_clientdemux();
static  XkReturn	defaultServerDemux();
#endif __STDC__

#endif RPCTEST

#ifndef STR2HOST
#  define STR2HOST str2ipHost
#endif


#ifdef USE_CONTROL_SESSN
static	Sessn	controlSessn;
#endif

#ifdef RPCTEST

#  ifdef __STDC__
static XkReturn	serverCallDemuxDefault( Protl, Sessn, Msg *,  Msg * );
static int		tryCallDefault( Sessn, int, int );
#  else
static XkReturn	serverCallDemuxDefault();
static int		tryCallDefault();
#  endif

static XkReturn	(* serverCallDemux)() = serverCallDemuxDefault;
static int		(* tryCall)() = tryCallDefault;

#else

static XDemuxFunc serverDemux = defaultServerDemux;

#endif RPCTEST

#ifdef TIME
#  ifndef RPCTEST
/* static	XTime 	starttime; */
#  endif
static 	void	subtime(
#ifdef __STDC__
			XTime *t1, XTime *t2, XTime *result
#endif
			);
#endif
#ifdef STREAM_TEST
/* static	int	receivedLength = 0; */
#endif

#define FAILURE_LIMIT 2



static Pfi	runTest = defaultRunTest;


#ifdef XKMACHKERNEL
static int
sscanf1(str, format, a1)
char *str, format;
int *a1;
{
  int n;

  *a1=0;
  while (*str >= '0' && *str <= '9')
    *a1 = 10*(*a1) + (*str++ - '0');
  return(1);
}
#else
#define sscanf1 sscanf
#endif XKMACHKERNEL


#ifdef __STDC__
#define DOUBLEQUOTEWRAP(x) #x
#define STRINGIFY(z) DOUBLEQUOTEWRAP(z)
#endif

static void
print_compile_options()
{   printf("\nCompiled with options:\n");
    printf(
#ifdef __STDC__
    "HOST_TYPE "  STRINGIFY(HOST_TYPE)
    "; INIT_FUNC " STRINGIFY(INIT_FUNC)
    "; TRACEVAR "  STRINGIFY(TRACEVAR)  "; "
#endif __STDC__
    "PROT_STRING %s\n", PROT_STRING);
    printf("TRIPS = %d  TIMES = %d  DELAY = %d\n", TRIPS, TIMES, DELAY);
    printf("__STDC__ %s  PROFILE %s  TIME %s  XKMACHKERNEL %s\n",
#ifdef __STDC__
	   "on",
#else
	   "off",
#endif
#ifdef PROFILE
	   "on",
#else
	   "off",
#endif
#ifdef TIME
	   "on",
#else
	   "off",
#endif
#ifdef XKMACHKERNEL
	   "on"
#else
	   "off"
#endif
	   );
    printf("XK_MEMORY_THROTTLE %s  RPCTEST %s  STREAM_TEST %s\n",
#ifdef XK_MEMORY_THROTTLE
	   "on",
#else
	   "off",
#endif
#ifdef RPCTEST
	   "on",
#else
	   "off",
#endif
#ifdef STREAM_TEST
	   "on"
#else
	   "off"
#endif
	   );
    printf("FAILURE_LIMIT %d  CONCURRENCY ", FAILURE_LIMIT);
#ifdef CONCURRENCY
    printf("%d",CONCURRENCY);
#else
    printf("off");
#endif
    printf("  XK_INCOMING_MEMORY_MARK ");
#ifdef XK_INCOMING_MEMORY_MARK
    printf("%d", XK_INCOMING_MEMORY_MARK);
#else
    printf("undefined");
#endif
    printf("\n");
#if (defined(CUSTOM_ASSIGN) || defined(CUSTOM_OPENDONE) || \
     defined(CUSTOM_SERVER_DEMUX) || defined(CUSTOM_CLIENT_DEMUX))
    printf("with");
#ifdef CUSTOM_ASSIGN
	printf(" CUSTOM_ASSIGN");
#endif
#ifdef CUSTOM_OPENDONE
	printf(" CUSTOM_OPENDONE");
#endif
#ifdef CUSTOM_SERVER_DEMUX
	printf(" CUSTOM_SERVER_DEMUX");
#endif
#ifdef CUSTOM_CLIENT_DEMUX
	printf(" CUSTOM_CLIENT_DEMUX");
#endif
    printf("\n");
#endif
#if !(defined(CUSTOM_ASSIGN) && defined(CUSTOM_OPENDONE) && \
      defined(CUSTOM_SERVER_DEMUX) && defined(CUSTOM_CLIENT_DEMUX))
    printf("without");
#ifndef CUSTOM_ASSIGN
	printf(" CUSTOM_ASSIGN");
#endif
#ifndef CUSTOM_OPENDONE
	printf(" CUSTOM_OPENDONE");
#endif
#ifndef CUSTOM_SERVER_DEMUX
	printf(" CUSTOM_SERVER_DEMUX");
#endif
#ifndef CUSTOM_CLIENT_DEMUX
	printf(" CUSTOM_CLIENT_DEMUX");
#endif
    printf("\n");
#endif
    printf("SAVE_SERVER_SESSN %s  CLIENT_OPENABLE %s\n",
#ifdef SAVE_SERVER_SESSN
	   "on",
#else
	   "off",
#endif
#ifdef CLIENT_OPENABLE
	   "on"
#else
	   "off"
#endif
	   );
    printf("USE_CHECKSUM %s  INF_LOOP %s  XK_TEST_ABORT %s\n",
#ifdef USE_CHECKSUM
	   "on",
#else
	   "off",
#endif
#ifdef INF_LOOP
	   "on",
#else
	   "off",
#endif
#ifdef XK_TEST_ABORT
	   "on"
#else
	   "off"
#endif
	   );
  }



static void
processOptions(v)

Vars *v;
{
    int		i, foundMyAddr=0;
    char 	*arg, *hostStr, str[100];
    IPhost	host;

#define argPrefix(str) (! strncmp(arg, str, strlen(str)))
#define argEq(str) (! strcmp(arg, str) )

#ifndef XNETSIM
    foundMyAddr = 1;
#endif
    sprintf(str, "%s/%s", v->myProtl->name, v->myProtl->instName);
    for (i=1; i < globalArgc; i++) {
	arg = globalArgv[i];
	if (argEq("-N")) {
/* 	  printf("Options %s (%d): %s %s %s\n", str, foundMyAddr, arg,  */
/* 		 globalArgv[i+1], globalArgv[i+2]); */
	  if (foundMyAddr)
	    break;
	  arg = globalArgv[++i];
	  if (strcmp(str, arg)) {
	    i++;
	    continue;
	  }
	  arg = globalArgv[++i];
	  str2ipHost(&host, arg);
	  v->host = host;
	  foundMyAddr = 1;
	  v->hostStr = hostStr = arg;
	  xsimDbg(TET_FLAG,
		  printf("[%s] %s found my address\n", hostStr, str));
	  continue;
	}
	else if (!foundMyAddr)
	  continue;
	else if ( argEq("-s") ) {
	    v->serverParam = 1;
	} else if ( argPrefix("-c") ) {
	    v->clientParam = 1;
	    v->serverString = arg + 2;
	} else if ( argPrefix("-test") ) {
	    v->testsSpecified = 1;
	    arg += strlen("-test");
	    if ( argEq(PROT_STRING) ) {
		v->myTestSpecified = 1;
	    }
	} else if ( argPrefix("-trips=") ) {
	    sscanf1(arg + strlen("-trips="), "%d", &v->trips);
	} else if ( argPrefix("-simul=") ) {
	    sscanf1(arg + strlen("-simul="), "%d", &v->simul);
	    if (v->simul > 100) {
	      xsimDbg(TET_FLAG,
		      printf("[%s] simul clipped to 100\n", v->hostStr)); 
	      v->simul = 100;
	    }
	    if (v->simul < 1) {
	      xsimDbg(TET_FLAG,
		      printf("[%s] simul increased to 1\n", v->hostStr)); 
	      v->simul = 1;
	    }
	}
    }
    if (v->trips%v->simul) {
      v->trips = v->simul * (1 + v->trips/v->simul);
      xsimDbg(TET_FLAG,
	      printf("[%s] Rounded trips up to %d, a multiple of simul(%d).\n",
		     hostStr, v->trips, v->simul));
    };
    xsimDbg(TET_FLAG,
	    printf("[%s] Running with trips=%d, simul=%d.\n", hostStr,
		   v->trips, v->simul));
    if (v->simul>1)
      xsimDbg(TET_FLAG,
	      printf("[%s] Check server has a simul value <= %d, %s %d.\n", 
		     hostStr, v->simul, "and is a divisor of", v->trips));
#undef argPrefix
#undef argEq    
}


int
INIT_FUNC( self )
    Protl self;
{
    Protl	llp;
    Vars	*v;

    xIfTrace(prottest, TR_ERRORS) { print_compile_options(); };
    v = (Vars *) xMalloc(sizeof(Vars));
    self->state = (char *)v;
    VarsArray[VarsArrayN++] = v;
    v->myProtl = self;
    v->trips = TRIPS;
    v->simul = 1;
    v->serverAddr = ServerAddr;
    v->clientAddr = ClientAddr;
    v->clientDownSes = 0;
    v->receivedLength = 0;
    v->noResponseCount = 0;
    v->msgi = 0;
    processOptions(v);
    if ( v->testsSpecified && ! v->myTestSpecified ) {
	xTrace1(prottest, TR_SOFT_ERRORS, 
		"Parameters indicate %s test should not run",
		PROT_STRING);
	return 0;
    }
    printf("[%s] %s timing test\n", v->hostStr, PROT_STRING);
/*     print_compile_options();  */
    llp = xGetProtlDown(self, 0);
    if (!xIsProtl(llp)) {
	Kabort("Test protocol has no lower protocol");
    }
    xControlProtl(xGetProtlDown(self, 0), GETMYHOST, (char *)&v->myHost, 
	     sizeof(HOST_TYPE));
    /* 
     * Call the per-test initialization function which gives the test
     * the opportunity to override the default functions
     */
    testInit();
    if ((*isServer)(v)) {
	evDetach( evSchedule(server, (void *) v, 0) );
    } else if ( (*isClient)(v)) {
#ifdef CONCURRENCY	
	int	i;

	for ( i=0; i < CONCURRENCY; i++ )
#endif
	    evDetach( evSchedule(client, (void *) v, 0) );
    } else {
      printf("[%s] %stest: I am neither server nor client\n", 
	     v->hostStr, PROT_STRING);
    }
    return 0;
}


#ifndef RPCTEST

static void
checkHandle( v, h, str )
    Vars		*v;
    XkHandle	h;
    char		*str;
{
    switch ( h ) {
      case XMSG_ERR_HANDLE:
      case XMSG_ERR_WOULDBLOCK:
	sprintf(errBuf, "[%s] %s returns error handle %d", v->hostStr, str, h);
	Kabort(errBuf);
      default:
	;
    }
}

#endif


static int
isServerDefault(v)
	Vars *v;
{
    if ( v->serverParam ) {
	return TRUE;
    }
    return ! bcmp((char *)&v->myHost, (char *)&v->serverAddr, 
		  sizeof(HOST_TYPE));
}


static int
isClientDefault(v)
	Vars *v;
{
    
    if ( v->clientParam ) {
	STR2HOST(&v->serverAddr, v->serverString);
	v->clientAddr = v->myHost;
	return TRUE;
    }
    return ! bcmp((char *)&v->myHost, (char *)&v->clientAddr, 
		  sizeof(HOST_TYPE));
}


#ifndef CUSTOM_ASSIGN

static void
clientSetPart( v, p )
    Vars *v;
    Part *p;
{
    partInit(p, 1);
    partPush(*p, &v->serverAddr, sizeof(IPhost));
    if (sizeof(HOST_TYPE) == sizeof(ETHhost))
      printf("[%s] Client sending to: %s\n", v->hostStr, 
	     ethHostStr(&v->serverAddr));
    else
      printf("[%s] Client sending to: %s\n", v->hostStr, 
	     ipHostStr(&v->serverAddr));
}

static void
serverSetPart( v, p )
    Vars *v;
    Part *p;
{
    partInit(p, 1);
    partPush(*p, ANY_HOST, 0);
    printf("[%s] Server responding to: %s\n", v->hostStr, "ANY_HOST");
}

#endif ! CUSTOM_ASSIGN


#ifdef SAVE_SERVER_SESSN
#  ifdef __STDC__
static XkReturn	saveServerSessn( Protl, Protl, Sessn, Protl );
#  endif

static XkReturn
saveServerSessn( self, llp, s, hlpType )
    Protl self, llp, hlpType;
    Sessn s;
{
    xTrace1(prottest, TR_MAJOR_EVENTS,
	    "%s test program duplicates lower server session",
	    PROT_STRING);
    xDuplicate(s);
    return XK_SUCCESS;
}
#endif SAVE_SERVER_SESSN


static XkReturn
closeDone( v, lls )
    Vars	*v;
    Sessn	lls;
{
    xTrace2(prottest, TR_MAJOR_EVENTS, "%s test -- closedone (%x) called",
	    PROT_STRING, lls);
#ifdef SAVE_SERVER_SESSION
    if ( (*isServer)(v) ) {
	xClose(lls);
    }
#endif
    return XK_SUCCESS;
}


static void
server( ev, foo )
    Event	ev;
    VOID 	*foo;
{
    Part p;
    Vars *v=(Vars *)foo;
    
    printf("[%s] I am the  server\n", v->hostStr);
#ifdef RPCTEST
    v->myProtl->calldemux = testCallDemux;
#else
    v->myProtl->demux = serverDemux;
#endif
#ifdef SAVE_SERVER_SESSN
    v->myProtl->opendone = saveServerSessn;
#endif 
#ifdef CUSTOM_OPENDONE
    v->myProtl->opendone = customOpenDone;
#endif 
    v->myProtl->closedone = closeDone;
    serverSetPart(v, &p);
    if ( xOpenEnable(v->myProtl, v->myProtl, xGetProtlDown(v->myProtl, 0), &p)
		== XK_FAILURE ) {
      printf("[%s] %s test server can't openenable lower protocol\n",
	     v->hostStr, PROT_STRING);
    } else {
      xsimDbg(TET_FLAG,
	      printf("[%s] %s test server done with xopenenable\n", 
		     v->hostStr, PROT_STRING));
    }
    return;
}


static void
client( ev, foo )
    Event	ev;
    VOID 	*foo;
{
    Part	p[2];
    Vars	*v=(Vars *)foo;
    int 	lenindex;
    
#ifndef RPCTEST
    v->myProtl->demux = test_clientdemux;
#endif

    printf("[%s] I am the client\n", v->hostStr);
#ifdef CLIENT_OPENENABLE
    serverSetPart(v, p);
    if ( xOpenEnable(v->myProtl, v->myProtl, xGetProtlDown(v->myProtl, 0), p)
		== XK_FAILURE ) {
      printf("[%s] %s test client can't openenable lower protocol\n",
	     v->hostStr, PROT_STRING);
	return;
    } else {
      xsimDbg(TET_FLAG,
	      printf("[%s] %s test client done with xopenenable\n", 
		     v->hostStr, PROT_STRING));
    }
#endif
    clientSetPart(v, p);
    if ( v->clientDownSes == 0 ) {
    	v->clientDownSes = xOpen(v->myProtl, v->myProtl, 
				 xGetProtlDown(v->myProtl, 0), p);
	if ( v->clientDownSes == ERR_SESSN ) {
	  printf("[%s] %s test: open failed!\n", v->hostStr, 
		 PROT_STRING);
	    Kabort("End of test");
	    return;
	}
	xsimDbg(TET_FLAG,
		printf("[%s] test client has opened lower protocol\n", 
		       v->hostStr));
    }
    else {
      xsimDbg(TET_FLAG,
	      printf("[%s] clientDownSes != 0t\n", v->hostStr));
    }
#ifdef CLIENT_OPENDONE
    v->myProtl->opendone(v->myProtl, xGetProtlDown(v->myProtl, 0),
			 v->clientDownSes, v->myProtl);
#endif    
#ifdef USE_CONTROL_SESSN
    clientSetPart(v, p);
    if (xIsProtl(xGetProtlDown(v->myProtl, 1))) {
	controlSessn = xOpen(v->myProtl, v->myProtl, 
			     xGetProtlDown(v->myProtl, 1), p);
	if (!xIsSessn(controlSessn)) {
	    xError("test protl could not open control sessn");
	}
    } else {
	xTrace1(prottest, TR_EVENTS,
		"%s test client not opening control session",
		PROT_STRING);
    }
#endif USE_CONTROL_SESSN

#ifdef USE_CHECKSUM
    xControlSessn(v->clientDownSes, UDP_ENABLE_CHECKSUM, NULL, 0);
#endif
#ifdef INF_LOOP
    for (lenindex = 0; ; lenindex++) {
	if (lenindex >= sizeof(lens)/sizeof(long)) {
	    lenindex = 0;
	}
#else
    for (lenindex = 0; lenindex < sizeof(lens)/sizeof(long); lenindex++) {
#endif INF_LOOP	
	v->sentMsgLength = lens[lenindex];
	if ( runTest(v, lens[lenindex]) ) {
	    break;
	}
    }
	printf("[%s] End of test\n", v->hostStr);
#ifdef TCP_SAVE_TRACE
	{
	  char str[100];
	  sprintf(str, "%s-%s.td", v->myProtl->down[0]->name, 
		  v->myProtl->down[0]->instName);
	  xControlProtl(v->myProtl->down[0], TCP_DUMPTRACE, str, strlen(str));
	}
#endif
    xClose(v->clientDownSes);
    v->clientDownSes = 0;
#ifdef XK_TEST_ABORT
    Kabort("test client aborts at end of test");
#endif
    xTrace0(prottest, TR_FULL_TRACE, "test client returns");
}




#ifdef RPCTEST


static int
tryCallDefault( sessn, times, length )
  Sessn sessn;
  int times;
  int length;
{
    XkReturn ret_val;
    int i;
    Msg	savedMsg, request, reply;
    char *buf;
    int c = 0;
    
    buf = msgConstructAllocate(&savedMsg, length);
    msgConstructEmpty(&reply);
    msgConstructEmpty(&request);
    for (i=0; i<times; i++) {
	msgAssign(&request, &savedMsg);
	ret_val = xCall(sessn, &request, &reply);
	xIfTrace(prottest, TR_MAJOR_EVENTS) {
	    putchar('.');
	    if (! (++c % 50)) {
		putchar('\n');
	    }
	}
	if( ret_val == XK_FAILURE ) {
	    printf( "RPC call error %d\n" , ret_val );
	    goto abort;
	}
	if (msgLength(&reply) != length) {
	    printf("Bizarre reply length.  Expected %d, received %d\n",
		   length, msgLength(&reply));
	    goto abort;
	}
	msgTruncate(&reply, 0);
    }
#ifdef USE_CONTROL_SESSN
    if (xIsSessn(controlSessn)) {
	Msg		m;
	char	*strbuf;

	strbuf = msgConstructAllocate(&m, 80);
	strbuf[0] = '\n';
	sprintf(strbuf+1, "End of test %d", clientTotal);
	
	xCall(controlSessn, &m, 0);
	msgDestroy(&m);

    }
#endif USE_CONTROL_SESSN

abort:
    msgDestroy(&savedMsg);
    msgDestroy(&reply);
    msgDestroy(&request);
    return i;
}


/* 
 * RPC runTest
 */
static int
defaultRunTest( v, len )
    Vars *v;
    int  len;
{
    int 	test;
#ifdef TIME    
    XTime 	startTime, endTime, netTime;
#endif

    for (test = 0; test < TIMES; test++) {
      xsimDbg(TET_FLAG,
	      printf("[%s] Sending (%d) ...\n", v->hostStr, ++clientTotal));
	v->count = 0;
#ifdef PROFILE
	startProfile();
#endif
#ifdef TIME
	xGetTime(&v->startTime);
#endif
	if ( (v->count = tryCall(v->clientDownSes, trips, len)) == v->trips ) {
#ifdef TIME
	    xGetTime(&endTime);
	    subtime(&startTime, &endTime, &netTime);
	    printf("\n[%s] len = %4d, %d trips: %6d.%06d\n", 
		   v->strHost,  
		   len, trips, netTime.sec, netTime.usec);
#else
	    printf("\n[%s] len = %4d, %d trips\n", v->strHost, 
		   len, trips);
#endif
	}
	if ( v->count == 0 ) {
	    if ( v->noResponseCount++ == FAILURE_LIMIT ) {
	      printf("[%s] Server looks dead.  I'm outta here.\n",
		     v->hostStr);
		return 1;
	    }
	} else {
	    v->noResponseCount = 0;
	}
	Delay(DELAY * 1000);
#ifdef XK_MEMORY_THROTTLE
	{
	  extern int max_thread_pool_used, min_memory_avail,
	             max_xk_threads_inuse;
	  xsimDbg(TET_FLAG,
		  printf("[%s] used %d incoming threads; %d regular threads; max %d bytes memory",
		 v->strHost, 
		 max_thread_pool_used,
		 max_xk_threads_inuse,
		 min_memory_avail));
	}
#endif XK_MEMORY_THROTTLE
    }
    return 0;
}


static XkReturn
serverCallDemuxDefault(self, lls, dg, rMsg)
    Protl self;
    Sessn lls;
    Msg *dg, *rMsg;
{
  xsimDbg(TET_FLAG,
	  printf("[%s] serverCallDemuxDefault\n", 
		 ((Vars *)self->state)->hostStr));
    msgAssign(rMsg, dg);
    return XK_SUCCESS;
}


static XkReturn
testCallDemux( self, lls, dg, rMsg )
    Protl self;
    Sessn lls;
    Msg *dg, *rMsg;
{
    static int c = 0;
    
    xsimDbg(TET_FLAG,
	    printf("[%s] testCallDemux\n", ((Vars *)self->state)v->hostStr));
    xIfTrace(prottest, TR_MAJOR_EVENTS) {
	putchar('.');
	if (! (++c % 50)) {
	    putchar('\n');
	}
    }
    msgAssign(rMsg, dg);
    return serverCallDemux(self, lls, dg, rMsg);
}


#else	/* ! RPCTEST */



static int
defaultRunTest( v, len )
    Vars *v;
    int  len;
{
    Msg		savedMsg;
    int 	test, m;
    char	*buf;
    
    buf = msgConstructAllocate(&savedMsg, len);
    for (test = 0; test < TIMES; test++) {
	for (m=0; m<v->simul; m++) {msgConstructCopy(&v->msga[m], &savedMsg);};
	xsimDbg(TET_FLAG,
		printf("[%s] Sending (%d) ...\n", v->hostStr, 
		       ++v->clientTotal));
	xsimDbg(TET_FLAG,
	     printf("[%s] msg length: %d\n", v->hostStr, msgLength(&v->msga[0])));
	v->count = 0;
#ifdef PROFILE
	startProfile();
#endif
#ifdef TIME
	xGetTime(&v->starttime);
#endif
#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(DELAY * 1000);
#endif XK_MEMORY_THROTTLE
	for (m=0; m<v->simul; m++) {
	    clientPushResult = xPush(v->clientDownSes, &v->msga[m]);
	    checkHandle(v, clientPushResult, "client push");
	}
	do {
	    v->gotone = 0;
	    Delay(DELAY * 1000);
	} while (v->gotone);
	if (v->count < v->trips) {
	  xsimDbg(TET_FLAG,
		  printf("[s%] Test failed after receiving %d packets\n", 
			 v->hostStr, v->count));
#ifdef STREAM_TEST
	    v->receivedLength = 0;
#endif
	} 
	if ( v->count == 0 ) {
	    if ( v->noResponseCount++ == FAILURE_LIMIT ) {
	      xsimDbg(TET_FLAG,
		      printf("[%s] Server looks dead.  I'm outta here.\n",
			     v->hostStr));
		return 1;
	    }
	} else {
	    v->noResponseCount = 0;
	}
	for (m=0; m<v->simul; m++) { msgDestroy(&v->msga[m]); };
    }
    msgDestroy(&savedMsg); 
    return 0;
}

Vars *getMyVar( self )
    Protl self;
{
  char str[100];
  int  k;

  for (k=0; k<VarsArrayN; k++) {
    if (self == VarsArray[k]->myProtl)
      return VarsArray[k];
  }
  
  sprintf(str, "getMyVar fails for Protl: %x\n", self);
  Kabort(str);
  return NULL;
}

static XkReturn
defaultServerDemux( self, lls, dg )
    Protl self;
    Sessn lls;
    Msg *dg;
{
    int  i;
    Vars *v;
    static int c = 0;

    xIfTrace(prottest, TR_MAJOR_EVENTS) {
	putchar('.');
	if (! (c++ % 50)) {
	    putchar('\n');
	}
    }
    v = getMyVar(self);
    xsimDbg(TET_FLAG,
	    printf("[%s] defaultServerDemux\n", v->hostStr));
#ifdef CUSTOM_SERVER_DEMUX
    customServerDemux(self, lls, dg);
#endif CUSTOM_SERVER_DEMUX
#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(DELAY * 1000);
#endif XK_MEMORY_THROTTLE
    /* if simul>1, save up a group of messages, then return them */
    /* if things get out of sync, they get really screwed up! */
    if (v->simul>1) {
      msgConstructCopy(&v->msga[v->msgi],dg); v->msgi++;
      if (v->msgi==v->simul) {
	for (i=0; i<v->simul; i++) {
	    serverPushResult = xPush(lls, &v->msga[i]);
	    checkHandle(v, serverPushResult, "server push");
	    msgDestroy(&v->msga[i]);
	}
	v->msgi=0;
      };
    } else {
	serverPushResult = xPush(lls, dg);
	checkHandle(v, serverPushResult, "server push");
    }
    return XK_SUCCESS;
}



#  ifdef STREAM_TEST


static XkReturn
test_clientdemux( self, lls, dg )
    Protl self;
    Sessn lls;
    Msg *dg;
{
#ifdef TIME
    XTime 	now, total;
#endif
    char		*buf;
    Msg			msgToPush;
    XkHandle	h;
    Vars		*v;
    
    v = getMyVar(self);
    v->gotone = 1;

    xsimDbg(TET_FLAG,
	    printf("[%s] test_clientDemux\n", v->hostStr));
    xTrace1(prottest, TR_EVENTS, "R %d", msgLength(dg));
    v->receivedLength += msgLength(dg);
    xTrace1(prottest, TR_DETAILED, "total length = %d", v->receivedLength);
#ifdef CUSTOM_CLIENT_DEMUX
    customClientDemux(self, lls, dg);
#endif CUSTOM_CLIENT_DEMUX
    if (v->receivedLength == v->sentMsgLength) {
	/*
	 * Entire response has been received.
	 * Send another message
	 */
	if (++v->count < v->trips) {
	    xIfTrace(prottest, TR_MAJOR_EVENTS) { 
		putchar('.');
		if (! (v->count % 50)) {
		    putchar('\n');
		}
	    }
	  if (v->count+v->simul <= v->trips) {
	    buf = msgConstructAllocate(&msgToPush, v->receivedLength);
	    v->receivedLength = 0;
	    xTrace0(prottest, TR_EVENTS, "S");
#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(DELAY * 1000);
#endif XK_MEMORY_THROTTLE
	    h = xPush(lls, &msgToPush);
	    checkHandle(v, h, "client push");
	    msgDestroy(&msgToPush);
	  } else { v->receivedLength = 0; };
	} else {
#ifdef TIME
	    xGetTime(&now);
	    subtime(&v->starttime, &now, &total);
	    printf("\n[%s] len = %4d, %d trips: %6d.%06d\n", 
		   v->hostStr, v->receivedLength, v->trips, 
		   total.sec, total.usec);
#else
	    printf("\n[%s] len = %4d, %d trips\n", v->hostStr, 
		   v->receivedLength, v->trips);
#endif
	    v->receivedLength = 0;
#ifdef PROFILE
	    endProfile();
#endif
	}
    }
    return XK_SUCCESS;
}


#  else /* ! STREAM_TEST */


static XkReturn
test_clientdemux( self, lls, dg )
    Protl self;
    Sessn lls;
    Msg *dg;
{
#ifdef TIME
    XTime 	now, total;
#endif
    Vars   	*v;
    int 	i;

    v = getMyVar(self);
    xsimDbg(TET_FLAG,
	    printf("[%s] test_clientdemux\n", v->hostStr));
/* note that customdemux is not called on final response message */
#ifdef CUSTOM_CLIENT_DEMUX
    customClientDemux(self, lls, dg);
#endif CUSTOM_CLIENT_DEMUX
    v->gotone = 1;
    if ( ++v->count < v->trips ) {
	xIfTrace(prottest, TR_MAJOR_EVENTS) {
	    putchar('.');
	    if (! (v->count % 50)) {
		putchar('\n');
	    }
	}
#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(DELAY * 1000);
#endif XK_MEMORY_THROTTLE
	if (v->simul>1) {
	  msgConstructCopy(&v->msga[v->msgi],dg); v->msgi++;
	  if (v->msgi==v->simul) {
	    for (i=0; i<v->simul; i++) {
		clientPushResult = xPush(v->clientDownSes, &v->msga[i]);
		checkHandle(v, clientPushResult, "client push");
		msgDestroy(&v->msga[i]);
	    }
	    v->msgi=0;
	  };
	}
	else {
	  /* if (v->count+v->simul<=v->trips) */
	  clientPushResult = xPush(v->clientDownSes, dg);
	  checkHandle(v, clientPushResult, "client push");
      }
    } else {
        /* should clear out msga when simul>1, but skip it for now */
        v->msgi=0;
#ifdef TIME
	xGetTime(&now);
	subtime(&v->starttime, &now, &total);
	printf("\n[%s] len = %4d, %d trips: %6d.%06d\n", v->hostStr,
	       msgLength(dg), v->trips, total.sec, total.usec);
#else
	printf("\n[%s] len = %4d, %d trips\n", v->hostStr, 
	       msgLength(dg), v->trips);
#endif
	
#ifdef PROFILE
	endProfile();
#endif
    }
    return XK_SUCCESS;
}


#  endif STREAM_TEST



#endif RPCTEST




#ifdef TIME
static void
subtime(t1, t2, t3)
    XTime *t1, *t2, *t3;
{
    t3->sec = t2->sec - t1->sec;
    t3->usec = t2->usec - t1->usec;
    if (t3->usec < 0) {
	t3->usec += 1000000;
	t3->sec -= 1;
    }
}
#endif


