PThreads

  • PThreads == POSIX Threads
  • POSIX == Portable Operating System Interface

Birrell's Mechanism:

  • Thread
  • Fork(proc, arg)
    • thread creation
  • join(thread)

Pthreads

  • Thread: pthread_t aThread; // Type of thread
  • Fork(proc, args): Thread creation
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                   void * (*start_routine)(void *), void * arg);
  • join (tread)
int pthread_join(pthread_t thread, void **status);

Pthread Attributes

int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);
pthread_attr_{set/get}{attribute} // set the value, read value
  • specified in pthread_create
  • defines features of the new thread

    • stack size; scheduling policy; priority; inheritance; joinable; system/process scope etc
  • has default behavior with NULL in pthread_create

detaching pthread

  • default: joinable threads
    • the parent should not terminate until it's child complete its tasks
    • and have been joined by children via the explicit join operation
      • if parents exits early ==> children will become zombie! (no one adopt them!)
  • detached threads:
    • for this thread, when parent exit, children can continue their execution
    • make the parent and children equivalent to each other.
int pthread_detach();
pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DECATHED);

An example of Pthread creation with detachingpthread

/* PThread Creation */ 

#include <stdio.h>
#include <pthread.h>

void *foo (void *arg) {        /* thread main */
    printf("Foobar!\n");
    pthread_exit(NULL);
}

int main (void) {

    int i;
    pthread_t tid;

    pthread_attr_t attr;
    pthread_attr_init(&attr); // Required!!!
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // make it detached
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
    pthread_create(&tid, &attr, foo, NULL);

    return 0;
}

Pthread creation example

/* PThread Creation Quiz 1 */ 

#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 4

void *hello (void *arg) { /* thread main */
    printf("Hello Thread\n");
    return 0;
}

int main (void) {
    int i;
    pthread_t tid[NUM_THREADS];

    for (i = 0; i < NUM_THREADS; i++) { /* create/fork threads */
        pthread_create(&tid[i], NULL, hello, NULL);
    }

    for (i = 0; i < NUM_THREADS; i++) { /* wait/join threads */
        pthread_join(tid[i], NULL);
    }
    return 0;
}

The output would be :
Hello Thread!
Hello Thread!
Hello Thread!
Hello Thread!

Thread Creation example 2

/* A Slightly Less Simple PThreads Example */

#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 4

void *threadFunc(void *pArg) { /* thread main */
    int *p = (int*)pArg;
    int myNum = *p;
    // here, both *p and myNum are private variables
    printf("Thread number %d\n", myNum);
    return 0;
}

int main(void) {
    int i;
    pthread_t tid[NUM_THREADS];

    for(i = 0; i < NUM_THREADS; i++) { /* create/fork threads */
        pthread_create(&tid[i], NULL, threadFunc, &i);
    }

    for(i = 0; i < NUM_THREADS; i++) { /* wait/join threads */
        pthread_join(tid[i], NULL);
    }
    return 0;
}

Quiz

All three answers are possible!!! (the weird third one!)Why?

  • "i" is defined in main ==> it's globally visible variable!
  • when it changes in one thread ==> all other threads see new value!!!
  • This is called data race or race condition ==> a thread tries to read a value, while another thread modifies it!

To fix the race condition problem

  • put it into an array! (505 HW1 use the same trick!)
  • by Creating this array, it's like as if we created local storage/private storage for the arguments of every single one of the threads that we create.
/* PThread Creation Quiz 3 */ 

#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 4

void *threadFunc(void *pArg) { /* thread main */
    int myNum = *((int*)pArg);
    printf("Thread number %d\n", myNum);
    return 0;
}

int main(void) {

    int i;
    int tNum[NUM_THREADS];
    pthread_t tid[NUM_THREADS];

    for(i = 0; i < NUM_THREADS; i++) { /* create/fork threads */
        tNum[i] = i;
        pthread_create(&tid[i], NULL, threadFunc, &tNum[i]);
    }

    for(i = 0; i < NUM_THREADS; i++) { /* wait/join threads */
        pthread_join(tid[i], NULL);
    }

    return 0;
}

Pthread Mutexes

  • to solve mutual exclusion problems among concurrent threads.
    • only one thread at a time can perform modifications or otherwise access that shared variable.

Birrell's Mechanism:

  • Mutex
//Pthreads
pthread_mutex_t aMutex; // mutex type
  • Lock(mutex){}
//explicit lock
int pthread_mutex_lock(pthread_mutex_t *mutex);
//explicit ublock
int pthread_mutex_unlock(pthread_mutex_t *mutex);

Other Mutex operation

  • first mutex must be explicitly initialize
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
//mutex attrubutes == specifies mutex behavior when a mutex is shared among proceses

int pthread_mutex_trylock(pthread_mutex_t *mutex);
// check the mutex, if it is in use, return immediately and notify the calling thread
// that the mutex is not available.

int pthread_mutex_destroy(pthread_mutex_t *mutex);

// and many others

Mutex safety tips

  • shared data should always be accessed through a single mutex!
  • mutex scope must be visible to all!!!!!
  • globally order locks
    • for all threads, lock mutexes in order. (make sure deadlock don't happen)
  • always unlock a mutex and always unlock the correct one!

Pthread Condition Variables

Birrell's mechanisms:

  • condition
  • wait
  • signal
  • Broadcast

Pthread:

  • contidion
pthread_cond_t aCond: // type of cond variable
  • wait
    • a thread that's entering the wait operation, a thread that must wait, will automatically release the mutex and place itself on the wait queue that's associate with the condition variable.
    • when the thread is waken up, it will automatically re-acquire the mutex before actually exiting the wait operation.
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
  • signal
int pthread_cond_signal(pthread_cond_t *cond);
  • Broadcast
int pthread_cond_broadcast(pthread_cond_t, *cond);

Other condition variable operations

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond); // has to explicitly freed

Condition variable safety tips

  • do not forget to notify waiting threads
    • predicate change => signal/ broadcast correct condition variable
  • when in doubt weather you should use signal or broadcast, use broad cast until you figure out what the desired behaviour is.

    • but will loss performance
  • you do not need a mutex to signal/broadcsat

Producer and Consumer example in Pthreads

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define BUF_SIZE 3        /* Size of shared buffer */

int buffer[BUF_SIZE];      /* shared buffer */
int add = 0;              /* place to add next element */
int rem = 0;              /* place to remove next element */
int num = 0;              /* number elements in buffer */

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;      /* mutex lock for buffer */
pthread_cond_t c_cons = PTHREAD_COND_INITIALIZER; /* consumer waits on this cond var */
pthread_cond_t c_prod = PTHREAD_COND_INITIALIZER; /* producer waits on this cond var */

void *producer (void *param);
void *consumer (void *param);

int main(int argc, char *argv[]) {

    pthread_t tid1, tid2;  /* thread identifiers */
    int i;

    /* create the threads; may be any number, in general */
    if(pthread_create(&tid1, NULL, producer, NULL) != 0) {
        fprintf(stderr, "Unable to create producer thread\n");
        exit(1);
    }

    if(pthread_create(&tid2, NULL, consumer, NULL) != 0) {
        fprintf(stderr, "Unable to create consumer thread\n");
        exit(1);
    }

    /* wait for created thread to exit */
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("Parent quiting\n");

    return 0;
}

/* Produce value(s) */
void *producer(void *param) {

    int i;
    for (i=1; i<=20; i++) {

        /* Insert into buffer */
        pthread_mutex_lock (&m);    
            if (num > BUF_SIZE) {
                exit(1);  /* overflow */
            }

            while (num == BUF_SIZE) {  /* block if buffer is full */
                pthread_cond_wait (&c_prod, &m);
            }

            /* if executing here, buffer not full so add element */
            buffer[add] = i; // buffer is not full, so add elem
            add = (add+1) % BUF_SIZE;
            num++;
        pthread_mutex_unlock (&m);

        pthread_cond_signal (&c_cons);
        printf ("producer: inserted %d\n", i);
        fflush (stdout);
    }

    printf("producer quiting\n");
    fflush(stdout);
    return 0;
}

/* Consume value(s); Note the consumer never terminates */
void *consumer(void *param) {

    int i;

    while(1) {

        pthread_mutex_lock (&m);
            if (num < 0) {
                exit(1);
            } /* underflow */

            while (num == 0) {  /* block if buffer empty */
                pthread_cond_wait (&c_cons, &m);
            }

            /* if executing here, buffer not empty so remove element */
            i = buffer[rem];
            rem = (rem+1) % BUF_SIZE; //wrap around condition
            num--;
        pthread_mutex_unlock (&m);

        pthread_cond_signal (&c_prod);
        printf ("Consume value %d\n", i);  fflush(stdout);

    }
    return 0;
}

results matching ""

    No results matching ""