/*
 * $RCSfile: common_test_async.c,v $
 *
 * x-kernel v3.3
 *
 * Copyright (c) 1993,1991,1990,1996  Arizona Board of Regents
 *
 * $Revision: 1.2 $
 * $Log: common_test_async.c,v $
 * Revision 1.2  1996/01/29 22:36:24  slm
 * Updated copyright and version.
 *
 * Revision 1.1  1995/07/28  22:18:04  slm
 * Initial revision
 *
 * Revision 1.5.2.3.1.2  1994/11/22  21:03:01  hkaram
 * Merged in David's cleanups
 */

static  int	close_done = 0;

#ifdef __STDC__

#  ifdef STREAM_TEST

/*static*/ XkReturn	clientStreamDemux( Protl, Sessn, Msg * );

#  else /* ! STREAM_TEST */

static XkReturn	clientDemux( Protl, Sessn, Msg * );

#  endif /* STREAM_TEST */

#  else /* __STDC__ */

#  ifdef STREAM_TEST

/*static*/ XkReturn	clientStreamDemux( );

#  else /* ! STREAM_TEST */

static XkReturn	clientDemux( );

#  endif /* STREAM_TEST */

#endif /* __STDC__ */


static XkReturn
clientCloseDone( lls )
     Sessn lls;
{
    xTrace2(prottest, TR_MAJOR_EVENTS, "%s test -- closedone (%x) called",
	    PROT_STRING, lls);
    ++close_done;
    return XK_SUCCESS;
}


static void
realmServerFuncs( self )
    Protl	self;
{
    self->demux = serverDemux;
}


static void
realmClientFuncs( self )
    Protl	self;
{
#ifdef STREAM_TEST
    self->demux = clientStreamDemux;
#else
    self->demux = clientDemux;
#endif
    self->closedone = clientCloseDone;
}


#ifdef CHECK_MESSAGE_CONTENTS

static void
print_message_contents(u_char *buf, int len)
{
#ifdef PRINT_MESSAGE_CONTENTS
    int m;

    printf("msg ");
    for (m = 0; m < len && m < 40; m++) {
	printf("%02x", buf[m]);
    } /* for */
    printf("\n");
    if (len >= 40) {
	printf("msg ");
	for (m = (len < 80) ? 40 : (len-40); m < len; m++) {
	    printf("%02x", buf[m]);
	} /* for */
	printf("\n");
    } /* if */
#endif
} /* print_message_contents */


static void
init_message_contents(u_char *buf, int len)
{
    int m;

    printf("Initializing message contents.\n");
    for (m = 0; m < len; m++) {
	buf[m] = 3*m+len;
    } /* for */
    print_message_contents(buf, len);
} /* init_message_contents */


static void
check_message_contents(Msg *dg, int msg_id)
{
    MsgWalk cxt;
    long clen;
    u_char *buf;
    int len, k, i;

    k = 0;
    len = msgLength(dg);

    msgWalkInit(&cxt, dg);
    while ((buf = msgWalkNext(&cxt, &clen)) != 0) {
	for (i = 0; i < clen; ++i, ++k) {
	    if (buf[i] != (u_char)(3*k+len)) {
		printf("Message compare failed! msg[%d]=%x (expected %x)\n"
		       "\tmsg#=%d len=%d.\n",
		       k, buf[k], (u_char)(3*k+len), msg_id, len);
		break;
	    } /* if */
	} /* for */
    } /* while */
    msgWalkDone(&cxt);

    if (k == len) {
	printf("Message # %d len %d compared ok.\n", msg_id, len);
    } /* if */
    print_message_contents(buf, len);
} /* check_message_contents */

#endif /* CHECK_MESSAGE_CONTENTS */


static
#if defined(__GNUC__)
inline
#endif
void checkHandle( h, str )
    XkHandle	h;
    char		*str;
{
    switch ( h ) {
      case XMSG_ERR_HANDLE:
      case XMSG_ERR_WOULDBLOCK:
	sprintf(errBuf, "%s returns error handle %d", str, h);
	Kabort(errBuf);
      default:
	;
    }
}


static int
defaultRunTest( self, len, testNumber )
    Protl	self;
    int 	len;
    int		testNumber;
{
    static Msg	msga[100];
    int 	test, m;
    u_char	*buf;
    static int	noResponseCount = 0;
    PState	*ps = (PState *)self->state;
#ifdef TIME
    XTime	now;
#endif

    xAssert(ps);
    buf = msgConstructAllocate(&ps->savedMsg, len);

#ifdef CHECK_MESSAGE_CONTENTS
    init_message_contents(buf, len);
#endif

    for (test = 0; test < TIMES; test++) {
	for (m=0; m<simul; m++) {msgConstructCopy(&msga[m], &ps->savedMsg);};
	printf("Sending (%d) ...\n", testNumber);
	printf("msg length: %d\n", msgLength(&msga[0]));
	ps->clientRcvd = 0;
	close_done = 0;
#ifdef PROFILE
	startProfile();
#endif
#ifdef TIME
	/* wait for timer tick to happen: */
	xGetTime(&now);
	do {
	    xGetTime(&starttime);
	} while (starttime.sec < now.sec + 1);
	xGetTime(&starttime);
#endif
#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(testdelay * 1000);
#endif /* XK_MEMORY_THROTTLE */
	for (m=0; m<simul; m++) {
	    ps->clientPushResult = xPush(ps->lls, &msga[m]);
	    checkHandle(ps->clientPushResult, "client push");
	}
	do {
	    ps->idle = 1;
	    Delay(testdelay * 1000);
#ifdef TCP_PUSH
	} while ( ps->clientRcvd < trips && !close_done );
#else
	} while ( ! ps->idle );
#endif
	if (close_done) {
	    printf("Peer closed connection after receiving %d packets\n",
		   ps->clientRcvd);
	    ps->lls = 0;
	    return 1;
	} /* if */
	if ( ps->clientRcvd < trips ) {
	    printf("Test failed after receiving %d packets\n", ps->clientRcvd);
#ifdef STREAM_TEST
	    ps->receivedLength = 0;
#endif
	} 
	if ( ps->clientRcvd == 0 ) {
	    if ( noResponseCount++ == FAILURE_LIMIT ) {
		printf("Server looks dead.  I'm outta here.\n");
		return 1;
	    }
	} else {
	    noResponseCount = 0;
	}
	for (m=0; m<simul; m++) { msgDestroy(&msga[m]); };
    }
    msgDestroy(&ps->savedMsg); 
    return 0;
}

/*static*/ XkReturn
defaultServerDemux( self, lls, dg )
    Protl self;
    Sessn lls;
    Msg *dg;
{
    PState	*ps = (PState *)self->state;
    static int c = 0;
    static Msg msga[100]; static int msgi = 0; int i;

    xIfTrace(prottest, TR_MAJOR_EVENTS) {
	putchar('.');
	if (! (c++ % 50)) {
	    putchar('\n');
	}
    }

#ifdef CHECK_MESSAGE_CONTENTS
    check_message_contents(dg, c);
#endif

#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(testdelay * 1000);
#endif /* XK_MEMORY_THROTTLE */
    /* if simul>1, save up a group of messages, then return them */
    /* if things get out of sync, they get really screwed up! */
    if (simul>1) {
      msgConstructCopy(&msga[msgi],dg); msgi++;
      if (msgi==simul) {
	for (i=0; i<simul; i++) {
	    ps->serverPushResult = xPush(lls, &msga[i]);
	    checkHandle(ps->serverPushResult, "server push");
	    msgDestroy(&msga[i]);
	}
	msgi=0;
      };
    } else {
	ps->serverPushResult = xPush(lls, dg);
	checkHandle(ps->serverPushResult, "server push");
    }
#ifdef CUSTOM_SERVER_DEMUX
    /*
     * For TCP this MUST come *after* pushing the message---xPush()
     * might block indefinitely.
     */
    customServerDemux(self, lls, 0);
#endif /* CUSTOM_SERVER_DEMUX */
    return XK_SUCCESS;
}





#ifdef STREAM_TEST


/*static*/ XkReturn
clientStreamDemux( self, lls, dg )
    Protl self;
    Sessn lls;
    Msg *dg;
{
#ifdef TIME
    XTime 	now, total;
#endif
    Msg		msgToPush;
    XkHandle	h;
    PState	*ps = (PState *)self->state;

    xAssert(ps);
    ps->idle = 0;
    xTrace1(prottest, TR_EVENTS, "R %d", msgLength(dg));
    ps->receivedLength += msgLength(dg);
    xTrace1(prottest, TR_DETAILED, "total length = %d", ps->receivedLength);
#ifdef CUSTOM_CLIENT_DEMUX
    customClientDemux(self, lls, dg);
#endif /* CUSTOM_CLIENT_DEMUX */
    if (ps->receivedLength == ps->sentMsgLength) {
	/*
	 * Entire response has been received.
	 * Send another message
	 */
	if (++ps->clientRcvd < trips) {
	    xIfTrace(prottest, TR_MAJOR_EVENTS) { 
		putchar('.');
		if (! (ps->clientRcvd % 50)) {
		    putchar('\n');
		}
	    }
	  if (ps->clientRcvd+simul <= trips) {
	    msgConstructCopy(&msgToPush, &ps->savedMsg);
	    ps->receivedLength = 0;
	    xTrace0(prottest, TR_EVENTS, "S");
#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(testdelay * 1000);
#endif /* XK_MEMORY_THROTTLE */
	    h = xPush(lls, &msgToPush);
	    checkHandle(h, "client push");
	    msgDestroy(&msgToPush);
	  } else { ps->receivedLength = 0; };
	} else {
#ifdef TIME
	    xGetTime(&now);
	    subtime(&starttime, &now, &total);
	    printf("\nlen = %4d, %d trips: %6d.%06d\n", 
		   ps->receivedLength, trips, total.sec, total.usec);
#else
	    printf("\nlen = %4d, %d trips\n", ps->receivedLength, trips);
#endif
	    ps->receivedLength = 0;
#ifdef PROFILE
	    endProfile();
#endif
	}
    }
    return XK_SUCCESS;
}


#else /* ! STREAM_TEST */



static XkReturn
clientDemux( self, lls, dg )
    Protl self;
    Sessn lls;
    Msg *dg;
{
    PState	*ps = (PState *)self->state;
#ifdef TIME
    XTime now, total;
#endif
    static Msg msga[100]; static int msgi = 0; int i;

#ifdef CHECK_MESSAGE_CONTENTS
    check_message_contents(dg, ps->clientRcvd);
#endif

/* note that customdemux is not called on final response message */
#ifdef CUSTOM_CLIENT_DEMUX
    customClientDemux(self, lls, dg);
#endif /* CUSTOM_CLIENT_DEMUX */
    ps->idle = 0;
    if ( ++ps->clientRcvd < trips ) {
	xIfTrace(prottest, TR_MAJOR_EVENTS) {
	    putchar('.');
	    if (! (ps->clientRcvd % 50)) {
		putchar('\n');
	    }
	}
#ifdef XK_MEMORY_THROTTLE
	while ( memory_unused < XK_INCOMING_MEMORY_MARK)
	  Delay(testdelay * 1000);
#endif /* XK_MEMORY_THROTTLE */
	if (simul>1) {
	  msgConstructCopy(&msga[msgi], &ps->savedMsg); msgi++;
	  if (msgi==simul) {
	    for (i=0; i<simul; i++) {
		ps->clientPushResult = xPush(lls, &msga[i]);
		checkHandle(ps->clientPushResult, "client push");
		msgDestroy(&msga[i]);
	    }
	    msgi=0;
	  };
	}
	else {
	    /* 
	     * We need to construct a copy of the original message
	     * rather than just loop back the incoming message to
	     * avoid an increasingly fragmented message structure in
	     * the case of loopback.
	     */
	    Msg	tmpMsg;

	    msgConstructCopy(&tmpMsg, &ps->savedMsg);
	    ps->clientPushResult = xPush(lls, &tmpMsg);
	    msgDestroy(&tmpMsg);
	    checkHandle(ps->clientPushResult, "client push");
      }
    } else {
        /* should clear out msga when simul>1, but skip it for now */
        msgi=0;
#ifdef TIME
	xGetTime(&now);
	subtime(&starttime, &now, &total);
	printf("\nlen = %4d, %d trips: %6d.%06d\n", 
		msgLength(dg), trips, total.sec, total.usec);
#else
	printf("\nlen = %4d, %d trips\n", msgLength(dg), trips);
#endif
	
#ifdef PROFILE
	endProfile();
#endif
    }
    return XK_SUCCESS;
}


#endif /* STREAM_TEST */
