Tuesday 17 September 2013

Inter Thread Priority Based Transfer Queue (Inter Thread Communication C++)

One of the biggest headaches that a software engineer comes across when writing multi threaded code in C++ is the access and modification of data by multiple threads. These can make life complicated for any developer in the initial development stage, fixing bugs due to race conditions, and achieving performance goals (due to lock contention). I was faced with a problem where I had to transfer streams of data across threads at high rates without sacrificing performance. The main concern that I had was the performance loss due to lock contention, since multiple threads needed to send data to a single thread which processed data. The solution proved very effective which is why I wanted to share it.

Simply put the situation is a producer consumer problem but with multiple producers. Examining the scenario, it’s apparent that the producers have the potential to create objects at a higher rate than the consumer can consume them. This can result in the consumer getting blocked more often than the producers (producers acquiring the lock more often than the consumer) which reduces the processing speed in the consumer thread. 

The object transfers were done using two queues inside the transfer queue; an input queue for the insertion of data and an output queue for the consumption. When the input queue is filled with its first object the transfer queue sets a windows event; this event notifies that the consumer thread should start consuming data. When the consumer tries to consume its first message it finds an empty output queue. At this instance the input queue is checked for messages and if present (which it is) it's swapped with the output queue. At this point the producers are free to insert objects to the transfer queue (input queue) and the consumer is free to consume the data (from the output queue). When the output queue is empty the input queue is checked for messages and swapped if non empty; the same procedure is repeated until the transfer queue is empty. Note that the consumer is expected to consume (pop objects) until the transfer queue returns null since the event is triggered only when there is one message in the transfer queue. 

It is interesting to see where locks are needed, how lock contention is reduced and how the performance increase is gained. First off the notification for consumption is only sent once, when the transfer queue has only one message. Because of this the kernel involvement is reduced by a great deal and the notification overhead will be at a minimum but the biggest impact on the performance boost is due to the isolation of the input and the output queue and how the synchronization is done. A lock should be placed when the messages are inserted to the transfer queue, since multiple threads are expected to insert messages. The same lock should be placed when the input and output queues are swapped. Note that no locks are needed on consumption of objects; this is because the swapping takes place in the consumer thread and the producer threads does not have any involvement with the output queue. When analyzing the situation it should be clear that this mechanism favors the consumer rather than the producer. The effect of Lock contention is mainly affected to the producer which in this case is not a big problem. Performance reduction due to lock contention is minimal for the consumer since the locks are used only when the queue swapping takes place.

The operation of the transfer queue is simple and affective but it holds a big problem. The transfer queue operates in a first come first served basis and the consumer thread is expected to work in a tight loop (consumption of objects). This implies that no other notifications or data can be transferred to the consumer when the transfer queue is at work until all the objects in the transfer queue is consumed. The solution was to introduce priorities. 

The same mechanism was kept but multiple input and output queues were introduced. At the beginning of the pop call a sweep is made and the non empty queue with the highest priority was selected and set as the active output queue. This mechanism allowed the objects with the same priority to be received in the same order as they were sent and the objects that have higher priority to take precedence over the lower priority objects. It should be noted that because of the sweep there is some overhead compared with the no priority version of the queue. 

This is a simple example of how the Transfer queue can be used

#include <iostream>
#include "windows.h"
#include "PriorityTransferQueue.h"

using namespace std;
using namespace Utils;

PriorityTransferQueue<int> _queue(10);
HANDLE _notification_event_handle = NULL; 
   
DWORD ThreadProcWrapper(LPVOID lpvParam)
{
    while(1)
    {  
        int* i = NULL;
        DWORD result = WaitForSingleObject(_notification_event_handle, INFINITE);  

        while((i = _queue.Pop()) != NULL)
        {
            cout << "received " << *i << " count " << _queue.Size() << endl;
        }
    }

    return 0;
} 

DWORD PriorityThreadProc(LPVOID lpvParam)
{
    int count = 0;

    while(1)
    {
        _queue.Push(new int(count), 5);
        count++;
    };

    return 0;
}

int main()
{
    DWORD        _thread_id_notification;

    _notification_event_handle = CreateEvent(NULL, FALSE, FALSE, NULL);

    _queue.SetTransferNotificationEvent(_notification_event_handle);

    HANDLE _monitor_thread_handle = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProcWrapper, 
                   NULL, 0, &_thread_id_notification);  

    CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PriorityThreadProc, NULL, 0, &_thread_id_notification);  

    int count = 100000;

    while(1)
    {
        _queue.Push(new int(count), 0);
        count++;  

        if(count == 100100)
            break;
    };

    cin.ignore(1);
    return 0;
}
The source for the transfer queue can be downloaded from here

Wednesday 17 July 2013

Thread Safe Object Pool In C++

During the development phase of a project I was faced with a task where I had to create and destroy objects frequently. As most of you know, creating and destroying objects can be a very expensive task. At the frequency that I had to deal with object deletion and creation it was obvious that I could not achieve the performance requirements the application had to meet. To make matters worse some of the objects had a heavy constructor which dealt with expensive instructions. So I had to device a mechanism to reduce the overhead which meant creating an object pool or a memory pool. The route I had to take in this situation was an object pool since a memory pool was infeasible because of the heavy constructor.

For those who are not familiar with the object pool pattern, an object pool is a creational pattern which enables applications to re-use created objects by recycling. Contrary to the conventional method of creation and deletion, an object pool stores the used objects in a pool and re-submits them (after restoring it to its initial state) to the application when it requires. By using this pattern the overhead of creating and destroying objects can be minimized as well as making the object retrieval time constant.

Now back to the problem at hand. The application was a multi-threaded application which needed to transfer objects between threads, which implied that the objects passed had a probability of being destroyed in a different thread than it was originally created. This raised new problems. The objects had to be handled in multiple threads, which implied that a shared pool should be kept. Taking this route meant, for retrieval, locks should be used. As the number of threads increase the use of locks will degrade performance rapidly making all the additional trouble of the object pool in vain. The second option was to keep a small pool of objects for each thread. But that route also had a serious pitfall. If one thread was only producing objects and the other thread was only consuming objects, the first thread had to create objects in the conventional sense (using new or malloc) since the object pool would always be empty (only creation). The second thread would have a high number of objects in its pool because the high number of object consumptions. This is also unacceptable since the solution would not be effective in situations like this. 

As a solution thread specific pools were kept as well as a global pool, where chunk of objects would be transferred to the global pool after a limit has been exceeded, and any thread with an empty object pool could get objects from the global pool (first transferring a chunk of objects to the thread and pushing an object out from that chunk). This decision led to a couple of advantages, upon object retrieval no locks were needed since the objects were taken from the local pool and the local limits could be tuned to get the maximum performance with minimum memory consumption.

The implementation was straight forward. For the local pool static thread specific template pointers were kept to keep track of the local pool (which was simply a linked list). Static was used since multiple pools in the same thread should not create multiple linked lists. It was combined with the thread specific variable specifier to stop it from being a global variable (The combination guaranteed that multiple pools of the same type used the same linked list in the same thread). The global linked list was a static linked list. An option of passing an allocator and a deallocator was also given since that opened up the option of memory allocation for objects in different methods (VirtualAlloc, LocalAlloc, new, malloc, shared memory... etc.). A default allocator and a deallocator was assigned with the new and delete combination so that it was unnecessary to write allocators and deallocators each and every-time the pool was used.

Variables were required to keep a track of the thread pools (next, previous) I was left with a decision between keeping wrappers for the objects or enforcing the object to have the additional variables that was needed for tracking. I took the latter decision since the prior method required to pool the wrapper objects as well as keeping track of them, the additional overhead seemed overkill since most of the time it was just a matter of defining a couple of variables in the pooled object (class or the struct). So it was made a requirement that the object pooled had the following variables

_next - Should be of the pooled type
_previous - Should be of the pooled type
_in_pool - bool (to keep track if the object is already pooled/avoid pooling the object twice by mistake)

Even though the description of the internals were lengthy (hope not boring) the method in which the pool should be used is straight forward. The following sample shows how to create an object pool with a custom allocator and a deallocator and how to create and destroy objects.


#include <iostream>
#include "ObjectPool.h"

using namespace std;

struct tmp
{
    tmp* _next;
    tmp* _previous;
    bool _in_pool;
    int i;
};

struct CustomDeAllocator
{
    void operator () (void* obj, size_t size) const
    {
       tmp* t = (tmp*)obj;
       delete t;
    }
};

struct CustomAllocator
{
    void* operator () (size_t size) const
    {
       return ::operator new(size);
    }
};

int main()
{
    Utils::ObjectPool<tmp, CustomAllocator, CustomDeAllocator> pool;

    tmp* t1 = pool.Create();
    pool.Destroy(t1);

    return 0;
}




The source code for the object pool can be downloaded from the link below.

Object Pool Code