CIS 307: Pthreads

[Pthreads], [Threadsafe Functions], [Locks for Pthreads]

Threads

We will now consider the use of POSIX threads, as they are provided on OSF/1 on our alphas with the pthreads package. Note that these commands may not be available on all Unix systems.

A good source of information on using threads is in the books:

S.Kleiman,D.Shah,B.Smaalders: Programming with Threads, Prentice-Hall,1996

B.Nichols,D.Buttlar,J.Proulx: Pthreads Programming, O'Reilly, 1996

Before I forget, remember to compile programs that use threads as follows

cc sourcename.c -o execname -lpthread

As you remember, threads that are defined within a process share the address space of that process. This is very convenient since each thread can communicate with the other threads in that process through memory. It is also very dangerous because we get into problems of mutual exclusion and synchronization.

POSIX has a large number of functions for managing threads, using mutexes, using condition variables, etc. Check these services with the command

apropos pthread

Threads are not easy to understand well. There are all sorts of complexities hidden by names such as user threads, kernel threads, light-weight threads. There are obvious questions we will not answer. For example:

When using threads aim for simplicity and avoid being in situations where one has to answer questions such as the above.
An interesting fact is that in recent thread packages it is possible to have different threads of a single process execute on different processors at the same time. That is, within a process two or more threads may be executing with true concurrency.

We use the functions pthread_create and pthread_delay_np in a program just to see an example of concurrency. Here are the specifications of pthread_create and pthread_delay_np.

    #include <pthread.h>
    int pthread_create(
	pthread_t *thread,	/*The thread that is created */
	const pthread_attr_t attr,/*attributes for thread, if NULL it
                                 uses the pthread_attr_default*/
	void * (*start_routine)(void *), /*function executed by thread*/
	void * arg); /*address of argument passed to startroutine*/
        /* Returns 0 iff successful */

    The created thread is ready as soon as created and
    inherits scheduling discipline and signal mask from its creator.
    The definition of  pthread_attr_t on my alpha is:

    typedef struct __pthread_attr_t {
    long			_Pfield(valid);
    __pthreadLongString_t	_Pfield(name);
    __pthreadLongUint_t		_Pfield(arg);
    __pthreadLongUint_t		_Pfield(reserved)[19];
    } pthread_attr_t;

    Not exactly easy to use. It is best to go with the defaults (i.e. NULL)
    or to use functions to set particular aspects of the attribute.
    Check the meaning of such functions with the Unix shell command:

       apropos pthread_attr

    #include <pthread.h>
    int pthread_delay_np(
	struct timespec *interval); /* delay thread for specified time */
    where	struct timespec {
	    time_t tv_sec;   /* seconds */
	    long   tv_nsec;} /* nanoseconds */

And here is the example program. We run three concurrent threads, and this number can be easily changed. Later on in the course we will explore ways to synchronize threads. In the program we use rand_r instead of rand to generate random numbers because rand_r is re-entrant (i.e. thread-safe).

The use of rand_r instead of rand should give you an idea of what is
needed for threadsafe code [see below]. Here are the random 
functions we used to use:
       void srand(unsigned int seed);
       int rand(void);
srand sets some global variable to seed and each call to rand updates and
returns the value of the global variable. Of course if srand and rand
are called by more than one thread the global location holding the seed
is clobbered and the threads get unpredictable [unrepeatable] sequences
of integers. When using threads we use:
       int rand_r(unsigned int *seedptr);
and pass to it the address of a seed local to the calling thread [i.e.
different threads use different seed variables and initialize them
directly with an assignment, not srand.].
Notice that each thread executes code that writes to different locations so as not to have race conditions. The exception is the variable shrd which is write-shared to demonstrate that concurrent threads actually share the same address space.

   /* threadz.c -- compile with "cc threadz.c -o threadz -threads"*/

   #include  <sys/types.h>
   #include  <sys/timers.h>
   #include  <pthread.h>
   #include  <stdlib.h>

   #define THREADSCOUNT 3
   #define TOTALRUN   16
   #define TMIN 1
   #define TMAX 3
   #define TIMLEN 60

   int shrd;
   struct state {
     pthread_t t;          /* A thread */
     int who;              /* It identifies a thread */
     int seed;             /* The seed used for random number generator*/
     char buffer[TIMLEN];  /* String represnting current time */
   } states[THREADSCOUNT];

   void moo(struct state *s);


   int main(void)
   {
      int    i;
      struct timespec maintime;
      pid_t  pid;

      /* Initialize states */
      pid = getpid();
      for (i=0; i < THREADSCOUNT; i++) {
        states[i].seed = i + (int)pid;
        states[i].who = i;
        if (pthread_create(&(states[i].t), NULL, 
	                   (void *)moo, &(states[i])) != 0){
	     perror("pthread_create");
	     exit(1);}}
      /* Wait a while then exit: all existing threads will die */
      maintime.tv_sec = TOTALRUN;
      maintime.tv_nsec =0;
      pthread_delay_np(&maintime);
    }

   void getTime(struct timespec *ts, char buffer[], int len)
   /* It places the current time in ts and puts in buffer (of length len) */
   /* as a string the current time as a string */
   {
     getclock((timer_t)TIMEOFDAY, ts);
     ctime_r(&(ts->tv_sec), buffer, len);
     sprintf(&buffer[24], " and %d microseconds", (ts->tv_nsec)/1000);
   }

   void moo(struct state *s) {
     /* We assume that a thread sleeps in each loop, from a minimum of */
     /* TMIN to a maximum of TMAX, at random.                          */

     struct timespec tspec;
     struct timespec interval;
     int v;            /* Vaule returned by random number generator */

     printf("s->seed = %d\n", s->seed);
     while (1){
         getTime(&tspec, s->buffer, TIMLEN);
         v = rand_r(&(s->seed));
         printf("v = %d\n", v);
         shrd = s->who;
         printf("Thread %d with shrd = %d sleeps %2d secs at time %s\n", 
                s->who, shrd, TMIN + (v % (TMAX-TMIN)), s->buffer);
         /* sleep for an a time between TMIN and TMAX */
         interval.tv_sec = TMIN + (v % (TMAX-TMIN));
         interval.tv_nsec = 0;
         pthread_delay_np(&interval);
         getTime(&tspec, s->buffer, TIMLEN);
         printf("Thread %d with shrd = %d after sleep at time %s\n", 
		s->who, shrd, s->buffer);}
   }

If you run this program you will notice:

A thread can terminate its own execution with the command:


   #include <pthread.h>
   void pthread_exit(void *status);
   
   It exits the current thread and returns status to the thread waiting
   in pthread_join, if any.
   This command does not return the thread's resources to the system.

We can wait for termination of a specific thread with the function pthread_join:

   #include <pthread.h>
   int pthread_join(pthread_t * who, void **status);

   It suspends the calling thread until the thread who has terminated.
   Status will receive the value returned by the terminating thread
   when it exited with the pthread_exit command. It returns 0 iff
   successful.
   When a terminated thread is joined, its resources, including memory, 
   are reclaimed by the system.

We can modify the previous program so that the main thread waits for the termination of all the created threads by replacing in main the lines

      /* Wait a while then exit: all existing threads will die */
      maintime.tv_sec = TOTALRUN;
      maintime.tv_nsec =0;
      pthread_delay_np(&maintime);
with the lines
      /* Wait for all other threads to terminate */
      for (i=0; i < THREADSCOUNT; i++) {
          pthread_join(states[i].t, NULL);
          printf("Thread %d has terminated\n", i);

You can mark for deletion and reclaim the storage and other resources associated with a thread (of course, after it has terminated executing) with the command:

   #include <pthread.h>
   int pthread_detach(pthread_t * thread);

This command will not terminate a thread that is executing, only indicating that we want to reclaim automatically its storage when it terminates execution.
Other ways of reclaiming the resources of a thread are:

Threadsafe Functions

Since threads are executed concurrently in the same address space and control can be transferred between them at any time, we have to be very cautions in using them and make sure that concurrent executions of functions do not result in problems. We say that a function is threadsafe when it can be executed without problems by concurrent threads. Usually this means that the function uses only local variables and read-only global variables. In the case of functions that that write to global storage, they can be made threadsafe if they use appropriate locks. Concurrent executions of threadsafe functions should appear as atomic. When using functions in a thread beware if it is or not threadsafe. For example, many functions in the standard C library are not threadsafe, though threadsafe versions may also be available as we have seen for rand and rand_r].

Another thing to be aware of when using threads is that it is dangerous to use pointers from a thread to locations on the stack of another thread. For example if thread A calls a function moo and in there declares a variable x and passes x as parameter to a thread B it creates. Then moo returns. Now B is accessing a location that has been deallocated by A, and perhaprs reallocated with a different meaning. Moral: to a thread pass only dynamically allocated data, or static data.

Locks for Pthreads

We can use mutual exclusion semaphores, or locks, or mutexes with pthreads. These locks should be global to the threads.

    #include <pthread.h>
    int pthread_mutex_init(
	pthread_mutex_t *mutex;    /* The mutex being created */
	pthread_mutexattr_t attr); /* usually the default, i.e. NULL */
    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_unlock(pthread_mutex_t *mutex);

There are three kinds of mutexes depending on the value of the pthread_mutexattr_t attribute. We could have MUTEX_FAST_NP (the default), to be used in the standard lock..unlock protocol; MUTEX_RECURSIVE_NP: which allows one thread to do things like "lock .. lock .. unlock .. unlock"; MUTEX_NONRECURSIVE_NP is like the fast lock, but with better debugging facilities. One normally uses for the attribute the default value NULL.

With pthreads are also available condition variables. They will make the creation of monitors very easy (as we will see). Of course, these monitors will operate only within a single Unix process.

Here is a program with threads that use locks to share a resource.

   /* threadmutex.c  -- */

   #include	<sys/types.h>
   #include        <pthread.h>
   #define THREADSCOUNT 3

   pthread_t ts[THREADSCOUNT];

   pthread_mutex_t mutex;

   struct { int x;
         int y;} foo;     /*This is a global data structure shared by threads*/

   void moo(int * a);

   int main(void)
   {
   int    i;
   int    *  who;
   struct timespec maintime;

   /* Create a mutex */
   if (pthread_mutex_init(&mutex, NULL)) {
      perror("pthread_mutex_init");
      exit(1);}

   /* Create threads */
   for (i=0; i < THREADSCOUNT; i++) {
     if((who = (int *)malloc(sizeof(int))) == NULL) {
       /* I am using malloc as a way to make sure that each
        * thread uses different memory
        */
       perror("malloc");
       exit(1);}
     *who = i;
     if (pthread_create(&(ts[i]), NULL, (void *)moo, who) != 0) {
       perror("pthread_create");
       exit(1);
     }
   }

   /* Wait for created threads to die */
   for (i=0; i < THREADSCOUNT; i++) {
     pthread_join(ts[i], NULL);
     printf("Thread %d has terminated\n", i);
   }
  }

  void moo(int * a) {
  struct timespec interval;
  int i;

  for (i=0; i < 16; i++) {
      if (pthread_mutex_lock(&mutex)) {
	perror("pthread_mutex_lock");
	exit(1);}
      printf("I am thread %d before sleep; x=%d, y=%d\n", *a, foo.x, foo.y);
      foo.x = foo.y = *a;
      interval.tv_sec = 2;
      interval.tv_nsec = 0;
      pthread_delay_np(&interval);
      printf("I am thread %d after sleep; x=%d, y=%d\n", *a, foo.x, foo.y);
      if (pthread_mutex_unlock(&mutex)) {
	perror("pthread_mutex_unlock");
	exit(1);}
      /* Here is a small delay to give the other thread a chance to run */
      interval.tv_sec = 0;
      interval.tv_nsec = 1000000;
      pthread_delay_np(&interval);
    }
  }

ingargiola.cis.temple.edu