Bogotobogo
contact@bogotobogo.com
Bookmark and Share




C++ Tutorial
Multi-Threaded Programming III
- C++ Class Thread for Pthreads - 2012
cplusplus logo
Full List of C++ Tutorials





YangYang



Multi-Threaded Programming III : Pthread


What are Pthreads?
  • POSIX Threads, or Pthreads, is a POSIX standard for threads. The standard, POSIX.1c, Threads extensions (IEEE Std 1003.1c-1995), defines an API for creating and manipulating threads.
  • Implementations of the API are available on many Unix-like POSIX systems such as FreeBSD, NetBSD, GNU/Linux, Mac OS X and Solaris, but Microsoft Windows implementations also exist. For example, the pthreads-w32 is available and supports a subset of the Pthread API for the Windows 32-bit platform.
  • The POSIX standard has continued to evolve and undergo revisions, including the Pthreads specification. The latest version is known as IEEE Std 1003.1, 2004 Edition.
  • Pthreads are defined as a set of C language programming types and procedure calls, implemented with a pthread.h header/include file and a thread library - though this library may be part of another library, such as libc, in some implementations.





The Pthread API

Pthreads API can be grouped into four:

  • Thread management:
    Routines that work directly on threads - creating, detaching, joining, etc. They also include functions to set/query thread attributes such as joinable, scheduling etc.

  • Mutexes:
    Routines that deal with synchronization, called a "mutex", which is an abbreviation for "mutual exclusion". Mutex functions provide for creating, destroying, locking and unlocking mutexes. These are supplemented by mutex attribute functions that set or modify attributes associated with mutexes.

  • Condition variables:
    Routines that address communications between threads that share a mutex. Based upon programmer specified conditions. This group includes functions to create, destroy, wait and signal based upon specified variable values. Functions to set/query condition variable attributes are also included.

  • Synchronization:
    Routines that manage read/write locks and barriers.


Your Ad Here



Creating Threads

  • Our main() program is a single, default thread. All other threads must be explicitly created by the programmer.

  • pthread_create creates a new thread and makes it executable. This routine can be called any number of times from anywhere within our code.

  • pthread_create (pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg) arguments:
    • thread:
      An opaque, unique identifier for the new thread returned by the subroutine. This is a pointer to pthread_t. When a thread is created, an identifier is written to the memory location to which this variable points. This identifier enables us to refer to the thread.

    • attr:
      An opaque attribute object that may be used to set thread attributes. You can specify a thread attributes object, or NULL for the default values.

    • start_routine:
      The routine that the thread will execute once it is created.
      void *(*start_routine)(void *)
              
      We should pass the address of a function taking a pointer to void as a parameter and the function will return a pointer to void. So, we can pass any type of single argument and return a pointer to any type.
      While using fork() causes execution to continue in the same location with a different return code, using a new thread explicitly provides a pointer to a function where the new thread should start executing.

    • arg:
      A single argument that may be passed to start_routine. It must be passed by reference as a pointer cast of type void. NULL may be used if no argument is to be passed.

  • The maximum number of threads that may be created by a process is implementation dependent.

  • Once created, threads are peers, and may create other threads. There is no implied hierarchy or dependency between threads.



Attributes of Threads

  • By default, a thread is created with certain attributes. Some of these attributes can be changed by the programmer via the thread attribute object.
  • pthread_attr_init() and pthread_attr_destroy() are used to initialize/destroy the thread attribute object.
  • Other routines are then used to query/set specific attributes in the thread attribute object.



Terminating Threads

  • There are several ways in which a Pthread may be terminated:
    • The thread returns from its starting routine (the main routine for the initial thread).
    • The thread makes a call to the pthread_exit subroutine.
    • The thread is canceled by another thread via the pthread_cancel routine
    • The entire process is terminated due to a call to either the exec or exit subroutines.
  • `
  • pthread_exit is used to explicitly exit a thread. Typically, the pthread_exit() routine is called after a thread has completed its work and is no longer required to exist. If main() finishes before the threads it has created, and exits with pthread_exit(), the other threads will continue to execute. Otherwise, they will be automatically terminated when main() finishes.
  • The programmer may optionally specify a termination status, which is stored as a void pointer for any thread that may join the calling thread.
  • Cleanup: the pthread_exit() routine does not close files; any files opened inside the thread will remain open after the thread is terminated.



Join

  • int pthread_join (pthread_t th, void **thread_return)
    The first parameter is the thread for which to wait, the identified that pthread_create filled in for us. The second argument is a pointer to a pointer that itself points to the return value from the thread. This function returns zero for success and an error code on failure.
  • When a thread is created, one of its attributes defines whether it is joinable or detached. Only threads that are created as joinable can be joined. If a thread is created as detached, it can never be joined.
  • The final draft of the POSIX standard specifies that threads should be created as joinable.
  • To explicitly create a thread as joinable or detached, the attr argument in the pthread_create() routine is used. The typical 4 step process is:
    • Declare a pthread attribute variable of the pthread_attr_t data type.
    • Initialize the attribute variable with pthread_attr_init().
    • Set the attribute detached status with pthread_attr_setdetachstate()
    • When done, free library resources used by the attribute with pthread_attr_destroy()



Detaching

There are cases we have to resynchronize our threads using pthread_join() before allowing the program to exit. We need to do this if we want to allow one thread to return data to the thread that created it. However, sometimes we neither need the second thread to return information to the main thread nor want the main thread to wait for it.

Suppose we create a second thread to spool a backup copy of a data file that is being edited while the main thread continues to service the user. When the backup has finished, the second thread can just terminate, and there is no need for it to join the main thread.

We can create threads that have this behavior. They are called detached threads, and we can create them by modifying the thread attributes or by calling pthread_detach().

  • The pthread_detach() routine can be used to explicitly detach a thread even though it was created as joinable.
  • There is no converse routine.



The Simplest Pthread Example

The example below is probably one of the simplest Pthread example. The newly created thread is sharing global variable with the original thread. It modifies the variable.

// thread1.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

void *thread_fnc(void * arg);

char thread_msg[] ="Hello Thread!";

int main()
{
        int ret;
        pthread_t my_thread;
        void *ret_join;

        ret =  pthread_create(&my_thread, NULL, thread_fnc, (void*) thread_msg);
        if(ret != 0) {
                perror("pthread_create failed\n");
                exit(EXIT_FAILURE);
        }
        printf("Waiting for thread to finish...\n");
        ret = pthread_join(my_thread, &ret_join);
        if(ret != 0) {
                perror("pthread_join failed");
                exit(EXIT_FAILURE);
        }
        printf("Thread joined, it returned %s\n", (char *) ret_join);
        printf("New thread message: %s\n",thread_msg);
        exit(EXIT_SUCCESS);
}

void *thread_fnc(void *arg)
{
        printf("This is thread_fnc(), arg is %s\n", (char*) arg);
        strcpy(thread_msg,"Bye!");
        pthread_exit("Bye");
}

And the make file:

thread1: thread1.o
        gcc -D_REENTRANT -o thread1 thread1.o  -lpthread
thread1.o: thread1.c
        gcc -c thread1.c
clean:
        rm -f *.o thread1

Output from the run:

$ ./thread1
Waiting for thread to finish...
This is thread_fnc(), arg is Hello Thread!
Thread joined, it returned Bye
New thread message: Bye!

We declare a prototype for the function that the thread calls when we create it:

void *thread_fnc(void * arg);

It takes a pointer to void as its argument and returns a pointer to void, which is required by pthread-create().

In main(), we call pthread-create() to start running our new thread:

ret =  pthread_create(&my_thread, NULL, thread_fnc, (void*) thread_msg);

We are passing the address of a pthread_t object that we can use to refer to the thread later. For the thread attribute, we pass NULL since we do not want to modify the default values.

If the call succeeds, two threads will be running. The original thread (main) continues and execute the code after pthread-create(), and a new thread starts executing in the thread-fnc().

The original thread checks if the new thread has started, and then calls pthread_join():

ret = pthread_join(my_thread, &ret_join);

We pass the identifier of the thread that we are waiting to join and a pointer to a result. This function will wait until the other thread terminates before it returns. Then, it prints the return value from the thread.

The new thread starts executing at the start of thread_fnc(), which updates global variable, returning a string to the main thread.




Synchronization Pthread Example - Semaphores

In this example, we will use a binary semaphore which takes 0 or 1. There is a more general type of semaphore, a counting semaphore which takes a wider range of values. Semaphores are used to protect a section of code so that only one thread can run it at the given time. To do this kind of task, a binary semaphore is needed. However, if we want to permit a limited number of threads to execute a piece of code, we may need a counting semaphore.

There are 4 basic semaphore functions, but unlike most of the function which start with pthread_, semaphore functions start with sem_.

A semaphore is created with the sem_init function, and it is declared in semaphore.h:

  • int sem_init(sem_t *sem, int pshared, unsigned int val);
    

    It initializes a semaphore object pointed by sem, sets its sharing option, and gives it an initial integer value. The pshared parameter controls the type of semaphore. If the value of pshared is 0, the semaphore is local to the current process. Otherwise, the semaphore may be shared between processes.

  • int sem_post(sem_t *sem);
    

    This function atomically increases the value of the semaphore by 1.

  • int sem_wait(sem_t *sem);
    

    This function atomically decreases the value of the semaphore by 1, but always waits until the semaphore has a nonzero count first. So, if we call sem_wait on a semaphore with a value of 2, the thread will continue executing but the semaphore will be decreased to 1. If we call it on a semaphore with a value of 0, the function will wait until some other thread has incremented the value so that it is no longer 0. If two thread are both waiting in sem_wait for the same semaphore to be nonzero and it is incremented once by a third process, only one of the two waiting process will get to decrement the semaphore and continue while the other will remain waiting.

  • int sem_destroy(sem_t *sem);
    

    This function tidies up the semaphore when we have finished with it. This function takes a pointer to a semaphore and tidies up any resources that it may have. If we attempt to destroy a semaphore for which some thread is waiting, we get an error.

// sem.c

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>

void *thread_fnc(void * arg);

/* semaphores are declared global so they can be accessed
   in main() and in thread routine */
sem_t my_semaphore;
char my_string[100];  /* shared variable */

int main()
{
        int ret;
        int value;
        pthread_t my_thread;
        void *ret_join;

        /* initialize mutex to 1 - binary semaphore */
        /* second param = 0 - semaphore is local */
        ret = sem_init(&my_semaphore, 0, 0);

        sem_getvalue(&my_semaphore, &value);
        printf("The initial value of the semaphore is %d\n", value);

        if(ret != 0) {
                perror("semaphore init failed\n");
                exit(EXIT_FAILURE);
        }

        ret =  pthread_create(&my_thread, NULL, thread_fnc, NULL);
        if(ret != 0) {
                perror("pthread_create failed\n");
                exit(EXIT_FAILURE);
        }

        printf("Type in some characters. Enter 'quit' to finish\n");
        while(strncmp("quit", my_string, 4) != 0) {
                fgets(my_string, 100, stdin);
                sem_post(&my_semaphore);
                sem_getvalue(&my_semaphore, &value);
                printf("The value of the semaphore after sem_post() is %d\n", value);
        }
        printf("Waiting for thread to finish...\n");
        ret = pthread_join(my_thread, &ret_join);
        if(ret != 0) {
                perror("pthread_join failed");
                exit(EXIT_FAILURE);
        }
        printf("Thread joined, it returned %s\n", (char *) ret_join);
        sem_destroy(&my_semaphore);
        exit(EXIT_SUCCESS);
}

void *thread_fnc(void *arg)
{
        int val;

        printf("This is thread_fnc(), waiting for nonzero count...\n");
        sem_getvalue(&my_semaphore, &val);
        printf("The value of the semaphore in thread_fnc() is %d\n", val);
        sem_wait(&my_semaphore);
        sem_getvalue(&my_semaphore, &val);
        printf("The value of the semaphore after sem_wait() in thread_fnc() is %d\n", val);
        while(strncmp("quit", my_string, 4) != 0) {
                printf("You typed in %d characters\n",strlen(my_string)-1);
                sem_getvalue(&my_semaphore, &val);
                printf("The value of the semaphore before sem_wait() in thread_fnc() is %d\n", val);
                sem_wait(&my_semaphore);
                sem_getvalue(&my_semaphore, &val);
                printf("The value of the semaphore after sem_wait() in thread_fnc() is %d\n", val);
        }
        pthread_exit(NULL);
}

Output is:

$ ./sem
The initial value of the semaphore is 0
Type in some characters. Enter 'quit' to finish
This is thread_fnc(), waiting for nonzero count...
The value of the semaphore in thread_fnc() is 0
1234
The value of the semaphore after sem_post() is 1
The value of the semaphore after sem_wait() in thread_fnc() is 0
You typed in 4 characters
The value of the semaphore before sem_wait() in thread_fnc() is 0
98
The value of the semaphore after sem_post() is 1
The value of the semaphore after sem_wait() in thread_fnc() is 0
You typed in 2 characters
The value of the semaphore before sem_wait() in thread_fnc() is 0
quit
The value of the semaphore after sem_post() is 1
Waiting for thread to finish...
The value of the semaphore after sem_wait() in thread_fnc() is 0
Thread joined, it returned (null)

In main(), after creating a new thread, we read in text, and put it into my_string which is global, and the incremented the semaphore with sem_post():

        while(strncmp("quit", my_string, 4) != 0) {
                fgets(my_string, 100, stdin);
                sem_post(&my_semaphore);
        }

In the new thread, we wait for the semaphore and then count the characters from the input:

        sem_wait(&my_semaphore);
        while(strncmp("quit", my_string, 4) != 0) {
                printf("You typed in %d characters\n",strlen(my_string)-1);
                sem_wait(&my_semaphore);
        }

While the semaphore is set, we are waiting for keyboard input. When we have input, we release the semaphore allowing the second thread to count the characters before the first thread reads the keyboard input again.

Note that both threads share the same my_string array.




Synchronization Pthread Example - Mutexes

The other way of synchronizing access is with mutexes, which act by allowing us to lock an object so that only one thread can access to it. To control access, we lock a mutex before entering the section of the code, and then unlock it when we have finished.

  • int pthread_mutex_init(pthread_mutex_t *m_mutex, const pthread_mutexattr_t *mutexattr);
  • int pthread_mutex_lock(pthread_mutex_t *m_mutex);
    
  • int pthread_mutex_unlock(pthread_mutex_t *m_mutex);
    
  • int pthread_mutex_destroy(pthread_mutex_t *m_mutex);
    

As with semaphore, all of the functions take a pointer to a previously declared object, in this case, pthread_mutex_t. The extra attribute parameter pthread_mutex_init allows us to provide attributes for the mutex, which controls its behavior.

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>

void *thread_fnc(void * arg);

pthread_mutex_t my_mutex;
char my_string[100];  /* shared variable */
int time_to_exit = 0;

int main()
{
        int ret;
        pthread_t my_thread;
        void *ret_join;

        ret = pthread_mutex_init(&my_mutex, NULL);
        if(ret != 0) {
                perror("mutex init failed\n");
                exit(EXIT_FAILURE);
        }

        ret =  pthread_create(&my_thread, NULL, thread_fnc, NULL);
        if(ret != 0) {
                perror("pthread_create failed\n");
                exit(EXIT_FAILURE);
        }
        pthread_mutex_lock(&my_mutex);
        printf("Type in some characters. Enter 'quit' to finish\n");
        while(!time_to_exit) {
                fgets(my_string, 100, stdin);
                pthread_mutex_unlock(&my_mutex);
                while(1) {
                        if(my_string[0] != '\0') {
                                pthread_mutex_unlock(&my_mutex);
                                sleep(1);
                        }
                        else
                                break;
                }
        }
        pthread_mutex_unlock(&my_mutex);

        printf("Waiting for thread to finish...\n");
        ret = pthread_join(my_thread, &ret_join);
        if(ret != 0) {
                perror("pthread_join failed");
                exit(EXIT_FAILURE);
        }
        printf("Thread joined\n");
        pthread_mutex_destroy(&my_mutex);
        exit(EXIT_SUCCESS);
}

void *thread_fnc(void *arg)
{
        sleep(1);
        pthread_mutex_lock(&my_mutex);
        while(strncmp("quit", my_string, 4) != 0) {
                printf("You typed in %d characters\n",strlen(my_string)-1);
                my_string[0]='\0';
                pthread_mutex_unlock(&my_mutex);
                sleep(1);
                pthread_mutex_lock(&my_mutex);
                if(my_string[0] != '\0') {
                        pthread_mutex_unlock(&my_mutex);
                        sleep(1);
                        pthread_mutex_lock(&my_mutex);
                }
        }
        time_to_exit = 1;
        my_string[0] = '\0';
        pthread_mutex_unlock(&my_mutex);
        pthread_exit(NULL);
}              

Let's look at the thread function. The new thread tries to lock the mutex. If it's already locked, the call will block until it is released. Once we have access, we check to see if we are being requested to exit. If we are requested to exit, then simply set time_to_exit, zap the first character of the my_string, and exit.

If we do not want to exit, count the characters and then zap the first character to a null. We use the first character being null as a way of telling the reader program that we have finished the counting. We then unlock the mutex and wait for the main thread to run. We attempt to lock the mutex and, when we succeed, check if the main thread has given us any more work to do. If that's not the case, we unlock the mutex and wait more. If we have work to do, we count the characters and loop through again.

Here is the output:

$ ./mutex
Type in some characters. Enter 'quit' to finish
12345
You typed in 5 characters
You typed in -1 characters
999
You typed in 3 characters
You typed in -1 characters
You typed in -1 characters
quit
Waiting for thread to finish...
Thread joined



C++ Classes for Pthreads

We'll make C++ classes Runnable and Thread for Pthreads. The interfaces are almost identical to the Win32 version of the previous chapter. The only difference is the Thread class constructor has a parameter indicating whether or not the thread is to be created in a detached state. The default is set to undetached

In this code, we added communications between the threads. We selected shared memory to demonstrate the communications between the threads. Because threads in the same program can reference global variables or call methods on a shared object, threads in different processes can access the same kernel objects by calling kernel routines.

#include <iostream>
#include <pthread.h>
#include <cassert>
#include <error.h>

using namespace std;

class Runnable {
public:
	virtual void* run() = 0;
	virtual ~Runnable() = 0;
};

// Pure virtual destructor: function body required
Runnable::~Runnable(){};

class Thread {
public:
	Thread(auto_ptr<Runnable> run, bool isDetached = false);
	Thread(bool isDetached = false);
	virtual ~Thread();
	void start();
	void* join();
private:
	// thread ID
	pthread_t PthreadThreadID;
	// true if thread created in detached state
	bool detached;
	pthread_attr_t threadAttribute;
	// runnable object will be deleted automatically
	auto_ptr<Runnable> runnable;
	Thread(const Thread&);
	const Thread& operator=(const Thread&);
	// called when run() completes
	void setCompleted();
	// stores return value from run()
	void* result;
	virtual void* run() {}
	static void* startThreadRunnable(void* pVoid);
	static void* startThread(void* pVoid);
	void printError(char * msg, int status, char* fileName, int lineNumber);
};

Thread::Thread(auto_ptr<Runnable> r, bool isDetached) : 
		runnable(r), detached(isDetached) {
	if(!runnable.get()){
		cout << "Thread::Thread(auto_ptr<Runnable> r, bool isDetached)"\
		"failed at " << " " << __FILE__ <<":" << __LINE__ << "-" <<
		" runnable is NULL" << endl;
		exit(-1);
	}
}

Thread::Thread(bool isDetached) : runnable(NULL), detached(isDetached) {}

void* Thread::startThreadRunnable(void* pVoid) {
	// thread start function when a Runnable is involved
	Thread* runnableThread = static_cast<Thread*>(pVoid);
	assert(runnableThread);
	runnableThread->result = runnableThread->runnable->run();
	runnableThread->setCompleted();
	return runnableThread->result;
}

void* Thread::startThread(void* pVoid) {
	// thread start function when no Runnable is involved
	Thread* aThread = static_cast<Thread*>(pVoid);
	assert(aThread);
	aThread->result = aThread->run();
	aThread->setCompleted();
	return aThread->result;
}

Thread::~Thread() {}

void Thread::start() {
	// initialize attribute object
	int status = pthread_attr_init(&threadAttribute);
	if(status) {
		printError("pthread_attr_init failed at", status,
			__FILE__, __LINE__);
		exit(status);
	}

	// set the scheduling scope attribute
	status = pthread_attr_setscope(&threadAttribute,
					PTHREAD_SCOPE_SYSTEM);
	if(status) {
		printError("pthread_attr_setscope failed at", status,
			__FILE__, __LINE__);
		exit(status);
	}

	if(!detached) {
		if(!runnable.get()) {
			status = pthread_create(&PthreadThreadID, &threadAttribute,
				Thread::startThread, (void*)this);	
			if(status) {
				printError("pthread_create failed at", status,
					__FILE__, __LINE__);
				exit(status);
			}
		}
		else {
			status = pthread_create(&PthreadThreadID, &threadAttribute,
				Thread::startThreadRunnable, (void*)this);	
			if(status) {
				printError("pthread_create failed at", status,
					__FILE__, __LINE__);
				exit(status);
			}
		}
	}
	else {
		// set the detachstate attribute to detached
		status = pthread_attr_setdetachstate(&threadAttribute,
						PTHREAD_CREATE_DETACHED);	
		if(status) {
			printError("pthread_attr_setdetachstate failed at", status,
			__FILE__, __LINE__);
			exit(status);
		}

		if(!runnable.get()) {
			status = pthread_create(&PthreadThreadID, &threadAttribute,
				Thread::startThread, (void*)this);	
			if(status) {
				printError("pthread_create failed at", status,
					__FILE__, __LINE__);
				exit(status);
			}
		}
		else {
			status = pthread_create(&PthreadThreadID, &threadAttribute,
				Thread::startThreadRunnable, (void*)this);	
			if(status) {
				printError("pthread_create failed at", status,
					__FILE__, __LINE__);
				exit(status);
			}
		}
	}
	status = pthread_attr_destroy(&threadAttribute);
	if(status) {
		printError("pthread_attr_destroy failed at", status,
			__FILE__, __LINE__);
		exit(status);
	}
}


void* Thread::join() {
	// A thread calling T.join() waits until thread T completes.
	int status = pthread_join(PthreadThreadID, NULL);
	// result was already saved by thread start function
	if(status) {
		printError("pthread_join failed at", status,
			__FILE__, __LINE__);
		exit(status);
	}
	return result;
}

void Thread::setCompleted() {
// completion handled by pthread_join()
}

void Thread::printError(char * msg, int status, char* fileName, int lineNumber) {
	cout << msg << " " << fileName << ":" << lineNumber <<
		"-" << strerror(status) << endl;
}


// shared variable
int s = 0;

class communicatingThread: public Thread {
public:
	communicatingThread(int ID) : myID(ID) {}
	virtual void* run();
private:
	int myID;
};

void* communicatingThread::run() {
	cout << "Thread " << myID << " is running!" << endl;
	// increment s by million times
	for (int i = 0; i < 1000000; i++) s+=1;
	return 0;
}

int main() {

	auto_ptr<communicatingThread> thread1(new communicatingThread(1));
	auto_ptr<communicatingThread> thread2(new communicatingThread(2));
	thread1->start();
	thread2->start();
	thread1->join();
	thread2->join();

	cout << "s = " << s << endl; 
	return 0;
}

  1. In main(), we created two communicatingTthreads.
    auto_ptr<communicatingThread> thread1(new communicatingThread(1));
    auto_ptr<communicatingThread> thread2(new communicatingThread(2));
    
  2. Each communicatingThread increments the global shared variable s one million times.
    for (int i = 0; i < 1000000; i++) s+=1;
    
  3. The main thread uses join() to wait for the communicatingThread to complete.
    thread1->join();
    thread2->join();
    

The results from the run should be 2,000,000, most of the runs.

Note
We may get a message saying "undefined reference to pthread_join", when we compile/link with g++ code.cpp.
Then, try g++ code.cpp -lpthread.



YangYang2





Full List of C++ Tutorials