Tuesday 17 September 2013

Inter Thread Priority Based Transfer Queue (Inter Thread Communication C++)

One of the biggest headaches that a software engineer comes across when writing multi threaded code in C++ is the access and modification of data by multiple threads. These can make life complicated for any developer in the initial development stage, fixing bugs due to race conditions, and achieving performance goals (due to lock contention). I was faced with a problem where I had to transfer streams of data across threads at high rates without sacrificing performance. The main concern that I had was the performance loss due to lock contention, since multiple threads needed to send data to a single thread which processed data. The solution proved very effective which is why I wanted to share it.

Simply put the situation is a producer consumer problem but with multiple producers. Examining the scenario, it’s apparent that the producers have the potential to create objects at a higher rate than the consumer can consume them. This can result in the consumer getting blocked more often than the producers (producers acquiring the lock more often than the consumer) which reduces the processing speed in the consumer thread. 

The object transfers were done using two queues inside the transfer queue; an input queue for the insertion of data and an output queue for the consumption. When the input queue is filled with its first object the transfer queue sets a windows event; this event notifies that the consumer thread should start consuming data. When the consumer tries to consume its first message it finds an empty output queue. At this instance the input queue is checked for messages and if present (which it is) it's swapped with the output queue. At this point the producers are free to insert objects to the transfer queue (input queue) and the consumer is free to consume the data (from the output queue). When the output queue is empty the input queue is checked for messages and swapped if non empty; the same procedure is repeated until the transfer queue is empty. Note that the consumer is expected to consume (pop objects) until the transfer queue returns null since the event is triggered only when there is one message in the transfer queue. 

It is interesting to see where locks are needed, how lock contention is reduced and how the performance increase is gained. First off the notification for consumption is only sent once, when the transfer queue has only one message. Because of this the kernel involvement is reduced by a great deal and the notification overhead will be at a minimum but the biggest impact on the performance boost is due to the isolation of the input and the output queue and how the synchronization is done. A lock should be placed when the messages are inserted to the transfer queue, since multiple threads are expected to insert messages. The same lock should be placed when the input and output queues are swapped. Note that no locks are needed on consumption of objects; this is because the swapping takes place in the consumer thread and the producer threads does not have any involvement with the output queue. When analyzing the situation it should be clear that this mechanism favors the consumer rather than the producer. The effect of Lock contention is mainly affected to the producer which in this case is not a big problem. Performance reduction due to lock contention is minimal for the consumer since the locks are used only when the queue swapping takes place.

The operation of the transfer queue is simple and affective but it holds a big problem. The transfer queue operates in a first come first served basis and the consumer thread is expected to work in a tight loop (consumption of objects). This implies that no other notifications or data can be transferred to the consumer when the transfer queue is at work until all the objects in the transfer queue is consumed. The solution was to introduce priorities. 

The same mechanism was kept but multiple input and output queues were introduced. At the beginning of the pop call a sweep is made and the non empty queue with the highest priority was selected and set as the active output queue. This mechanism allowed the objects with the same priority to be received in the same order as they were sent and the objects that have higher priority to take precedence over the lower priority objects. It should be noted that because of the sweep there is some overhead compared with the no priority version of the queue. 

This is a simple example of how the Transfer queue can be used

#include <iostream>
#include "windows.h"
#include "PriorityTransferQueue.h"

using namespace std;
using namespace Utils;

PriorityTransferQueue<int> _queue(10);
HANDLE _notification_event_handle = NULL; 
   
DWORD ThreadProcWrapper(LPVOID lpvParam)
{
    while(1)
    {  
        int* i = NULL;
        DWORD result = WaitForSingleObject(_notification_event_handle, INFINITE);  

        while((i = _queue.Pop()) != NULL)
        {
            cout << "received " << *i << " count " << _queue.Size() << endl;
        }
    }

    return 0;
} 

DWORD PriorityThreadProc(LPVOID lpvParam)
{
    int count = 0;

    while(1)
    {
        _queue.Push(new int(count), 5);
        count++;
    };

    return 0;
}

int main()
{
    DWORD        _thread_id_notification;

    _notification_event_handle = CreateEvent(NULL, FALSE, FALSE, NULL);

    _queue.SetTransferNotificationEvent(_notification_event_handle);

    HANDLE _monitor_thread_handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProcWrapper, 
                   NULL, 0, &_thread_id_notification);  

    CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PriorityThreadProc, NULL, 0, &_thread_id_notification);  

    int count = 100000;

    while(1)
    {
        _queue.Push(new int(count), 0);
        count++;  

        if(count == 100100)
            break;
    };

    cin.ignore(1);
    return 0;
}
The source for the transfer queue can be downloaded from here