2015년 4월 29일 수요일

The relationship between Condition variable and Mutex

1. pthread API for Condition variable


The pthread's condition variable is used to define specific condition. The pthread_cond_wait function is used to wait for condition and pthread_cond_signal is used to wake up the waiting entity. The definition of two functions are like below.

    int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
    int pthread_cond_signal(pthread_cond_t *cond);

You have to watch the second parameter of pthread_cond_wait carefully. This requires the pointer of mutex. Additionally, you keep in mind that pthread_cond_signal can be ignored if pthread_cond_wait is not executed. So, pthread_cond_signal can wake up only after pthread_cond_wait is executed.

The certain condition seems to have the calculation or comparison, these are changeable due to usage, pthread API does not relate with these part. So pthread uses condition variable in the abstract. You can name just like Condition_A, Condition_B and use them. The real meaning of condition variable will be given when user does programming. So real routine to check the specific condition occurs besides the pthread API.

The flow diagram of waiting condition is like below. First, check the specific condition and if this condition does not meet, we will wait through pthread_cond_wait.

The flow diagram of signalling condition is like below. Some execution causes the condition update. If this can make the specific condition, it will call the pthread_cond_signal.


2. Operation flow analysis in Multi-threads


Now, we will check why mutex is necessary for condition variable. You must consider multi-thread environment, so all threads can run con-currently. We show that condition check and condition update can happen on waiting part and signalling part.
If the condition update happens during the process of condition check at waiting part, this can cause the corruption of shared data and result in wrong operation.

In below diagram, Thread_B in right side runs first. If Thread_A modifies the condition during condition check in Thread_B, Thread_B could not notice this change. And pthread_cond_signal in Thread_A can execute before pthread_cond_wait of Thread_B, Thread_B will not wake up forever.


If we add mutex lock/unlock in both Thread_A and Thread_B to prevent concurrent access, this problem can be resolved. Below diagram shows how to use mutex.


The thread which calls pthread_cond_wait will get into suspend mode, mutex should be
unlocked before thread is suspended. So, pthread_cond_wait is implemented to unlock mutex inside of function. This is the reason why pthread_cond_wait needs the pointer of mutex.

And, the thread which called pthread_cond_wait function will be resumed by pthread_cond_signal. At this moment, the mutex which was unlocked before suspend will be locked again. So you have to remember the post processing (unlock mutex).
The flow diagram of overall processing is like below.


PS. I wrote this post based on my experience of pthread programming. If you have any suggestion, please leave the comment then I will check and update.

How to make message queue with Condition variable - Implementation

3. SisoQueue Class Definition


We will use C++ to implement the message queue. And we will call the queue's class name as "SisoQueue". The SISO stands for "single input single output" which show one input entity and one output entity. The member variables and methods of SisoQueue are like below.

#define CD_TRUE     (1)
#define CD_FALSE    (0)
#define CD_ERROR    (-1)
#define CD_SUCCESS  (0)

class SisoQueue
{
    int m_id;          // unique number for debug tracking
    int m_len;         // queue length (recommend 3)

    void **m_pp_data;  // array to contains messages
    
    // read index points out the place to read when queue get
    // write index points out the place to write when queue put
    // If read index is same as write index, this means queue empty or full
    // So, is_empty flag is used to distinguish between queue empty and full
    int m_write_index; // index position for queue put
    int m_read_index;  // index position for queue get
    int m_is_empty;

    // condition variable to wait in put method when queue is full 
    pthread_cond_t m_put_cond;   
    // condition variable to wait int get method when queue is empty
    pthread_cond_t m_get_cond;   
    pthread_mutex_t m_mutex; // mutex for access control between threads

public:
    // prepares the resource and initialize member variables
    SisoQueue(int id, int len);

    // releases the resource
    ~SisoQueue(); 

    //returns whether siso queue is full or not (CD_TRUE, CD_FALSE).
    int is_full();

    // sends the message into queue
    //   if queue is full, this will wait until queue is available
    int put(void * p_data);

    // receives the message from queue
    //   if queue is empty, this will wait until the message comes
    void * get();

};

4. SisoQueue method implementation


4.1 Constructor and Destructor

Constructor initializes the member variables and allocate the array with queue length size which can contain messages. Destructor free the array and destroy mutex and condition variables.

SisoQueue::SisoQueue(int id, int len)
{
    m_id = id;
    m_len = len;
    m_write_index = 0;
    m_read_index = 0;
    m_is_empty = CD_TRUE;
    m_pp_data = (void **) malloc(sizeof(void *) * len);

    pthread_cond_init(&m_put_cond, NULL);
    pthread_cond_init(&m_get_cond, NULL);
    pthread_mutex_init(&m_mutex, NULL);
}

SisoQueue::~SisoQueue()
{
    free(m_pp_data);

    pthread_cond_destroy(&m_put_cond);
    pthread_cond_destroy(&m_get_cond);
    pthread_mutex_destroy(&m_mutex);
}

4.2 put

This will insert the message in the queue. If the queue is empty, this will signal the m_get_cond to wake up the consumer thread which might wait for m_get_cond. If the queue is full, this will wait for m_put_cond.

int SisoQueue::put(void * p_data)
{
    pthread_mutex_lock(&m_mutex);

    if( m_is_empty )
    {
        m_pp_data[m_write_index] = p_data;
        m_write_index = (m_write_index+1) % m_len; // update write_index
        m_is_empty = CD_FALSE;
        pthread_cond_signal(&m_get_cond);
    }
    else
    {
        if( m_write_index == m_read_index ) // if queue is full
        {
            pthread_cond_wait(&m_put_cond, &m_mutex);

// if queue is still full, show error message
            if( !m_is_empty && (m_write_index==m_read_index) )
            {
                printf("ERROR: SisoQueue(id:%d) put error\n", m_id);
                pthread_mutex_unlock(&m_mutex);
                return CD_ERROR_QUEUE_PUT;
            }
        }

        m_pp_data[m_write_index] = p_data;
        m_write_index = (m_write_index+1) % m_len; // update write_index
        m_is_empty = CD_FALSE;
    }

    pthread_mutex_unlock(&m_mutex);
    return CD_SUCCESS;
}

4.3 get

This will retrieve the message from the queue. If queue is empty, this will wait for m_get_cond. If queue is not empty, this will return the message which read index points out and this will signal the m_put_cond to producer thread which can wait for m_put_cond. If the producer thread does not wait m_put_cond, this signal will be ignored and this will not make any problem.

void * SisoQueue::get()
{
    void * p_data;

    pthread_mutex_lock(&m_mutex);

    if( m_is_empty )
    {
        pthread_cond_wait(&m_get_cond, &m_mutex);
        if( m_is_empty )
        {
            printf("ERROR: SisoQueue(id:%d) get error\n", m_id);
            pthread_mutex_unlock(&m_mutex);
            return NULL;
        }
    }

    p_data = m_pp_data[m_read_index];
    m_pp_data[m_read_index] = NULL;
    m_read_index = (m_read_index+1) % m_len; // update read_index

    if( m_write_index == m_read_index ) // if queue is empty
        m_is_empty = CD_TRUE;

    pthread_cond_signal(&m_put_cond);
    pthread_mutex_unlock(&m_mutex);
    return p_data;
}

4.4 is_full

This will return the result whether SisoQueue is full or not. The return value will be CD_TRUE or CD_FALSE.

int SisoQueue::is_full()
{
    int rv;
    
    pthread_mutex_lock(&m_mutex);
    rv = CD_FALSE;
    if( !m_is_empty && (m_write_index==m_read_index) ) // if queue is full
    {
        rv = CD_TRUE;
    }
    pthread_mutex_unlock(&m_mutex);
    return rv;


}

2015년 4월 28일 화요일

How to make message queue with Condition variable - Purpose & Design

1. Purpose


This blog will explain how to make the queue for message transfer between threads in Linux. We assume that pthread is available in your environment.

Generally speaking, some OS may provide the message queue mechanism. But, pthread library supports the basic primitives such as mutex (lock mechanism) and condition variable (signal mechanism). This will show how to make the queue with these primitives.

2. Design


We will design the queue to transfer the message between 2 threads. One thread is called as producer thread and the other is called consumer thread. Producer thread will generate the message and put it on the queue and consumer thread will take the message from the queue and handle it. We assume that each thread's role is fixed and message will be transferred uni-directionally just like below diagram.


We will call the activity which consumer thread takes the message from queue as "get". If the queue is empty, the "get" method will wait until message comes in the queue. We will call the activity which producer thread stores the message into queue as "put". If the queue is full, the "put" method will wait until queue status is changed. As you know, waiting for some condition and signalling this waiting thread is implemented via condition variable. So this blog will show how to use the condition variable.

We need the mutex to prevent the data corruption by concurrent access to queue's internal data structure. The "put" and "get" methods can update the queue's internal data, so we will define and use the mutex to protect the queue's internal data. Both "put" and "get" methods require the wait activity, so we will use two condition variables for them. The "put" method will use put_cond and and "get" method will use get_cond.
And the wait operation for condition variables requires the mutex also. So we will share the same mutex which is already defined to protect queue's internal data.
(The relationship between condition variable and mutex will be covered later in details.)

Message type will be defined as "void *" and both producer and consumer threads will type-cast this as their usage. As you know, "void *" type is so general, so you can use this for any purpose without source modification. Just like the return type of malloc is defined as "void *" in C language. We will use the array data structure for the container of message queue. Because the queue's size is fixed at initialization, array is proper than linked list. We will access the array structure with index variable and this array will be filled and retrieved as circular format.

We defined the two index to connect put and get method with index. Write index is related with put method and get read index is related with get method. Write index is increased when message is stored in the queue though put activity and read index is increased when message is retrieved from the queue through get activity.


Above diagram shows three states of queue. Left one shows initial state of queue which has the length as 9. Write index and read index points at '0'. Middle state shows that write index is increased as 4 after producer thread stored 4 messages in the queue. Right state shows that consumer thread fetch the 2 messages from the queue. So read index is increased as 2.

We will implement that the length of queue is configured when queue is initialized. Generally speaking, queue's length is configurable and depends on user's usage. However, if the system is designed well, 3~5 will be suitable.