CIS 307: Unix III

[Select], [Threads]

Select

At times a process needs to wait for messages from more than one source, say two sources. If the process blocks on one source, it is in no position to know if there is a message from the other source. If it checks each source without blocking, it performs a busy loop. Unix gives us a mechanism, select, for blocking on more than one source, waking up when there is information from any of them. In fact select allows us to wait at the same time for messages, events, or write operations.

Select is a complex service:

    #include <sys/types.h>
    #include <sys/time.h>
    int select(
          int nfds,          /* Number of objects monitored*/
          fd_set *readfds,   /* Set of file descriptors open for reading*/
          fd_set *writefds,  /* Set of file descript. open for writing*/
          fd_set *exceptfds, /* Set of file descript. with exception pending*/
          struct timeval *timout); /* Structure specifying timeout */
        If timout is null, then we block until one of the fd objects
        becomes ready. If timout points to a structure with timeout 
        equal to 0, then we do not block at all. Otherwise we will block
        up to the time specified by timout.
    FD_ZERO(&fdset)   /* fdset becomes the empty set */
    FD_SET(fd,&fdset) /* fd is added to fdset */
    FD_CLR(fd,&fdset) /* fd is removed from fdset */
    FD_ISSET(fd,&fdset) /* it determines if fd is in fdset */
    The structure timeval, defined in sys/time.h is
	struct timeval { int tv_sec;    /* second */
			 int tv_usec;}  /* microseconds */

Here are three examples of use of select. The first is a simple program from Stevens for determining the maximum size for a pipe. [When run on my alpha the program prints that the size is 65536.]

    #include	<sys/types.h>
    #include	<sys/time.h>

    int main(void)
    {
	int		i, n, fd[2];
	fd_set		writeset;
	struct timeval	tv;

	if (pipe(fd) < 0){
		perror("pipe");
		exit(1);}
	FD_ZERO(&writeset);  /* set to zero the writeset */

	for (n = 0; ; n++) { /* write 1 byte at a time until pipe is full */
		FD_SET(fd[1], &writeset); /*add write-end of pipe to writeset*/
		tv.tv_sec = tv.tv_usec = 0; /* don't wait at all */
		/* select returns the number of objects (in readset, */
		/* writeset, or exceptionset) that are ready. */
		/* In our case there will be at most one ready object.*/
		if ( (i = select(fd[1]+1, NULL, &writeset, NULL, &tv)) < 0) {
			perror("select");
			exit(1);}
		else if (i == 0) /*We cannot write to pipe, i.e. it is full*/
			break;
		if (write(fd[1], "a", 1) != 1){
			perror("write error");
			exit(1);}
	}
	printf("pipe capacity = %d\n", n);
	exit(0);
    }
The second is a program also from Stevens for waiting on a timer.
    #include	<sys/types.h>
    #include	<sys/time.h>

    main(argc, argv)
    int	argc;
    char	*argv[];
    {
	long			atol();
	static struct timeval	timeout;

	if (argc != 3) {
		printf("usage: timer <#seconds> <#microseconds>");
		exit(1);}
	timeout.tv_sec  = atol(argv[1]);
	timeout.tv_usec = atol(argv[2]);

	/* selet blocks waiting for the timeout to expire. */
	if (select(0, (fd_set *) 0, (fd_set *) 0, (fd_set *) 0, &timeout) < 0){
		perror("select error");
		exit(1);}
	exit(0);
    }

The third is a simple program where a process reads fixed size messages from two pipes. You should terminate the program from the terminal with a CONTROL-C.

    #include <sys/types.h>
    #include <sys/time.h>
    #include <unistd.h>
    #include <stdio.h>
    #define MAXLINE 12

    int main(void)
    {
        int     i, m, n; 
	int     fd1[2];   /* pipe for communications from child1 to parent */
	int     fd2[2];   /* pipe for communications from child2 to parent */
        pid_t   pid;
        char    line[MAXLINE];
	fd_set   readset;
	static struct timeval timeout;

        if (pipe(fd1) < 0) {
                perror("pipe1");
                exit(1);}
        if (pipe(fd2) < 0) {
                perror("pipe2");
                exit(1);}
	/* child1 */
        if ( (pid = fork()) < 0) {
                perror("fork1");
                exit(1);}
        else if (pid == 0) {            
                close(fd1[0]);
		while (1) {
                    write(fd1[1], "from child1\n", MAXLINE);
		    sleep(1);}
		exit(0);}
	/* child2 */
        if ( (pid = fork()) < 0) {
                perror("fork2");
                exit(1);}
        else if (pid == 0) {            
                close(fd2[0]);
		while (1) {
                    write(fd2[1], "from child2\n", MAXLINE);
		    sleep(1.3);}
		exit(0);}
        /* parent */
        close(fd1[1]);
        close(fd2[1]);
	m = 1 + ((fd1[0]<fd2[0])?fd2[0]:fd1[0]); /*Max # of objs to wait for*/
	FD_ZERO(&readset);
	while (1) {
	    FD_SET(fd1[0], &readset);
	    FD_SET(fd2[0], &readset);
	    if (select(m,&readset, NULL,NULL,NULL) < 0) {
	       perror("select");
	       exit(1);}
            if (FD_ISSET(fd1[0], &readset)) {
               n = read(fd1[0], line, MAXLINE);
               write(STDOUT_FILENO, line, n);}
            if (FD_ISSET(fd2[0], &readset)) {
               n = read(fd2[0], line, MAXLINE);
               write(STDOUT_FILENO, line, n);}
	  }
        exit(0);
    }
Note that in a more realistic program we will deal with messages that are of variable sizes and more than two pipes (thinking of how you would deal with many file descriptors without having to increase the size of your program? can you also think of ways to be fair to the message sources?)

Threads

We will now consider the use of threads, as they are provided on OSF/1 on our alphas. Note that the commands are specific to OSF/1. On my Sun workstation Solaris uses different thread commands.
Before I forget, remember to compile programs that use threads in one of the ways shown below

cc sourcename.c -o execname -lpthreads

cc sourcename.c -o execname -threads

As you remember, threads defined within a process share the address space of that process. This is very convenient since each thread can communicate with the others through memory. It is also very dangerous because we get into problems of mutual exclusion and synchronization.

OSF/1 has a large number of functions for managing threads, using mutexes, using condition variables, etc. Check these services with the command

man pthread

In the programs below we use the functions pthread_create, pthread_delay_np, pthread_mutex_init, pthread_mutex_lock, pthread_mutex_unlock,

    #include <pthread.h>
    int pthread_create(
	pthread_t *thread,	/*The thread that is created */
	pthread_attr_t attr,    /*attributes for thread, pthread_attr_default*/
	pthread_startroutine_t start_routine, /*function executed by thread*/
	pthread_addr_t arg); /*address of argument passed to startroutine*/
        /* Returns 0 iff successful */

    #include <pthread.h>
    int pthread_delay_np(
	struct timespec *interval); /* delay thread for specified time */
    where
	struct timespec {
	    time_t tv_sec;   /* seconds */
	    long   tv_nsec;} /* nanoseconds */

    #include <pthread.h>
    int pthread_mutex_init(
	pthread_mutex_t *mutex;    /* The mutex being created */
	pthread_mutexattr_t attr); /* usually pthread_mutexattr_default */
    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    
    There are three kinds of mutexes: MUTEX_FAST_NP (the default), to be used
    in the standard lock..unlock protocol; MUTEX_RECURSIVE_NP: which allows
    one thread to do things like "lock .. lock .. unlock .. unlock";
    MUTEX_NONRECURSIVE_NP is like the fast lock, but with better debugging
    facilities.

The availability of mutexes and condition variables makes the creation of monitors very easy. Of course, these monitors will operate only within a single Unix process.

Here is an example of a program where two threads are created and execute concurrently within the process that created them.

   #include    <sys/types.h>
   #include    <pthread.h>

   pthread_t t1, t2;
   void moo(int * a);

   int main(void)
   {
   int           *  who;
   struct timespec maintime;

   /* Create two threads */
   if((who = (int *)malloc(sizeof(int))) == NULL) {
     perror("malloc");
     exit(1);}
   *who = 1;
   if (pthread_create(&t1, pthread_attr_default, (void *)moo, who) != 0) {
	  perror("pthread_create");
	  exit(1);
	}
   if((who = (int *)malloc(sizeof(int))) == NULL) {
     perror("malloc");
     exit(1);}
   *who = 2;
   if (pthread_create(&t2, pthread_attr_default, (void *)moo, who) != 0) {
	  perror("pthread_create");
	  exit(1);
	  }

   /* Wait a while then exit: threads will die */
   maintime.tv_sec = 16;
   maintime.tv_nsec =0;
   pthread_delay_np(&maintime);
   }

   void moo(int * a) {
   struct timespec interval;

   while (1){
      printf("I am thread %d before sleep\n", *a);
      interval.tv_sec = 2;
      interval.tv_nsec = 0;
      pthread_delay_np(&interval);
      printf("I am thread %d after sleep\n", *a);
    }
   }

And here is a similar example, but now using locks to share a resource.

   #include	<sys/types.h>
   #include        <pthread.h>

   pthread_t t1, t2;
   pthread_mutex_t mutex;

   struct { int x;
         int y;} foo;     /*This is a global data structure shared by threads*/

   void moo(int * a);

   int main(void)
   {
   int           *  who;
   struct timespec maintime;

   /* Create a mutex */
   if (pthread_mutex_init(&mutex, pthread_mutexattr_default)) {
      perror("pthread_mutex_init");
      exit(1);}

   /* Create two threads */
   if((who = (int *)malloc(sizeof(int))) == NULL) {
     perror("malloc");
     exit(1);}
   *who = 1;
   if (pthread_create(&t1, pthread_attr_default, (void *)moo, who) != 0) {
	  perror("pthread_create");
	  exit(1);
	}
   if((who = (int *)malloc(sizeof(int))) == NULL) {
     perror("malloc");
     exit(1);}
   *who = 2;
   if (pthread_create(&t2, pthread_attr_default, (void *)moo, who) != 0) {
	  perror("pthread_create");
	  exit(1);
	  }

   /* Wait a while then exit: threads will die */
   maintime.tv_sec = 16;
   maintime.tv_nsec =0;
   pthread_delay_np(&maintime);
   }

   void moo(int * a) {
   struct timespec interval;

   while (1){
      if (pthread_mutex_lock(&mutex)) {
	perror("pthread_mutex_lock");
	exit(1);}
      printf("I am thread %d before sleep; x=%d, y=%d\n", *a, foo.x, foo.y);
      foo.x = foo.y = *a;
      interval.tv_sec = 2;
      interval.tv_nsec = 0;
      pthread_delay_np(&interval);
      printf("I am thread %d after sleep; x=%d, y=%d\n", *a, foo.x, foo.y);
      if (pthread_mutex_unlock(&mutex)) {
	perror("pthread_mutex_unlock");
	exit(1);}
      /* Here is a small delay to give the other thread a chance to run */
      interval.tv_sec = 0;
      interval.tv_nsec = 1000000;
      pthread_delay_np(&interval);
    }
   }

There will be other occasions to see threads, and select, in action.

ingargiola.cis.temple.edu