February 23, 2013

High speed generic queue class


This article presents a generic class implementing a high speed queue. It has been specially designed for communication use but can be used for anything else.

Reading this article, you'll learn:
  • How to design a generic class
  • How to use nested data type
  • How to implement an enumerator
  • How to make a class thread safe
  • How to use variant record
  • How to use methods in a record
The code itself is quite short! Thanks to the efficient Delphi language.


High Performance

About high speed: Initially, I had an issue with performance in a communication system where a process was producing messages to be handled and another process was processing the messages. The two processes are totally asynchronous. I use the term "process" not to refer to different programs, but to different units of a single program.

The bottle neck was coming from memory allocation. Record where allocated by the producer process and freed by the processing processes. This resulted in a large amount of memory being allocated freed.

I designed this class to avoid as much as possible memory allocation. The design makes use of a several doubly linked lists of "buffers". The buffers are actually the data type specified by the generic type. Usually it is a record with a structure specific to the messages exchanged between the two process. If the application requires several types of buffers, then you should use a variant record to hold all types. We will see an example later.

High speed comes also from the fact that you don't copy data except when really forced to. To avoid copying data, we use pointers. Data stay where it is, we access it thru his address. The compiler takes care of the details for us.


Doubly linked list as a queue

A queue is a data structure in which you can append items (aka "node") at one end and remove items at the other end. This scheme is frequently named "First in - First out" or FIFO for short. You have another variant which append and remove items at the same end. That one is named LIFO which stands for "Last in - First out". In the application I described above, I need a FIFO queue.

FIFO queues can be implemented in a lot of different ways. Here I selected a doubly linked list because in my context this is the fastest way doing it.

A doubly linked list is a very classic data structure. Basically a doubly linked list is a variable number of items (or nodes) of any data type. One item is linked to the next using a pointer and linked to the previous using another pointer. There are two more separate pointers: one point to the first item and one point to the last item. To append items, we use the pointer to the last item and to remove items, we use the pointer to the first item.

You can find a lot of articles about it everywhere on the internet. So I will only describe what make my implementation specific.


High performance revisited

Now that we have a doubly linked list in hand, we will use it to achieve high performance. We will use 3 linked lists to avoid memory allocation:
  • Linked list of active items
  • Linked list of inactive items
  • Linked list of acquired items
Active items are those referring to the items added to the queue and not removed yet from the queue. They are actively waiting to be processed.

Inactive items are those having already be processed. Instead of freeing the processed items, we put it in the inactive queue where they are waiting for reuse. When a new active item has to be created, before allocating one new item, the inactive linked list is checked and if not empty, an item is removed from the list and reused, that is moved to the end of the active list. If the inactive list is empty, then a new item is allocated. This has a tremendous impact on the performance: adding a large number of items take 30 mS the first time when they need to be allocated. Later, when they can be reused, it only takes 17 mS!

Finally, the acquired items linked list is used when there are several processes consuming the same queue. For example when we have a multithread application and several threads are fetching messages from the same active queue. The third list is used to store one item extracted from the active list while it is processed so that another thread won't take the same item. After it has been processed, the "acquired" item is moved to the inactive list using a method I named Release.

Generic class use

A generic class is a class which has a variable part not known when designing the class. In our case, the variable part is a record. I named the generic class TCommQueue.

To use TCommQueue, you have to declare a record type and a variable. Let's assume the record type is:

  TCommFrame = packed record
    Name : array [0..50] of Char;
    Size : Integer;
  end;

This is a totally arbitrary record made simple for this article. In a real application, this record will be much more complex with all the members required for your application.

The variable representing the queue is declared as follow:

    FQueue : TCommQueue<TCommFrame>;

It is probably a member variable of a form in your application or in a component, data module or whatever you need. It doesn't matters in fact. TCommQueue inherit from TObject so before use, you must create it and after use you must free it. In most applications, you will do that in the constructor and destructor of your form, data module or component. A more complete source code is as follow:


type
    TForm1 = class(TForm)   
    protected
        FQueue : TCommQueue<TCommFrame>;
    public
        constructor Create(OAwner: TComponent); override;
        destructor Destroy; override
    end;

implementation

constructor TForm1.Create(AOwner : TComponent);
begin
    inherited;
    FQueue := TCommQueue<TCommFrame>.Create;
end;

destructor TForm1.Destroy;
begin
    FreeAndNil(FQueue);
    inherited;
end;

You see that using a generic type is very easy: you use the generic type name (TCommQueue here) and append your own data type in angle brackets ( here).

The compiler will compile the generic class by replacing the parameter type with the one you supplied.


Generic class declared

Now it's time to see the declaration of TCommQueue generic class. Remember we are using doubly linked list which are defined here as nested data type.

Nested data types have been introduced with Delphi XE3. See online help at http://docwiki.embarcadero.com/RADStudio/XE3/en/Nested_Type_Declarations

Nested data types are not related to generics, but are very handy in that case because nested data types within a generic class may make use on the parameter data type.

type
    // Forward declarations
    TCommQueue<T : record>     = class;
    TCommQueueEnum<T : record> = class;

    TInitCallBack = procedure (Sender : TObject;
                               Item   : Pointer) of object;

    TCommQueue<T : record> = class(TObject)
    // This nested type actually defines a doubly linked list of <T>
    type
        TItemPtr = ^T;          // Pointer to <T>
        PLLNode  = ^TLLNode;    // Pointer to a linked list node
        TLLNode  = record       // Doubly linked list cell
            Item : T;           // Item /must/ be the first node member
            Next : PLLNode;     // Pointer to the next node
            Prev : PLLNode;     // Pointer to the previous node
        end;
        TLinkedList = record    // Doubly linked list
            First : PLLNode;    // Pointer to the first node in the list
            Last  : PLLNode;    // Pointer to the last node in the list
            Count : Integer;    // Number of nodes in the list
            procedure Add(Item : PLLNode);
            function  ExtractLast : PLLNode;
            procedure Extract(Item : PLLNode);
            procedure FreeAllItems; inline;
        end;
    protected
        FActiveList   : TLinkedList;      // List of active items
        FInactiveList : TLinkedList;      // List of inactive items
        FAcquiredList : TLinkedList;      // List of acquired items
        FCurrent      : TItemPtr;         // The current item
        FCritSection  : TCriticalSection; // For multithread support
        // Enum is called back from the enumerator class
        // Give an item pointer, it returns the next item pointer or nil
        function  Enum(Item : Pointer) : Pointer;
        // GetItem is called back from the enumerator class
        // Get an item from the list. The item is COPYED while all other
        // functions make use of pointers.
        function  GetItem(ItemPt : Pointer) : T;
    public
        constructor Create;
        destructor Destroy; override;
        // Required method to supprt for..in contruct (not thread safe)
        function  GetEnumerator : TCommQueueEnum;
        // Add a new item, reusing an old one of any and return a pointer to it
        function  Add : TItemPtr;
        function  Add(InitCallBack : TInitCallBack) : TItemPtr; overload;
        // Remove an item and return pointer to the next
        // Current is not affected unless it is the one removed
        function  Remove(Item : TItemPtr) : TItemPtr; overload;
        // Remove current item and return a pointer to the next
        // The current becomes the next
        function  Remove : TItemPtr; overload;
        // Return pointer to first item. Update current.
        function  First : TItemPtr;
        // Return pointer to last item. Update current.
        function  Last : TItemPtr;
        // Return pointer to next item. Update current.
        function  Next : TItemPtr; overload;
        // Return a pointer to the item after a given one. Update current.
        function  Next(Item : TItemPtr) : TItemPtr; overload;
        // Return pointer to previous item. Update current.
        function  Previous : TItemPtr; overload;
        // Return a pointer to the item before a given one. Update current.
        function  Previous(Item : TItemPtr) : TItemPtr; overload;
        // Acquire the first item, if any and remove it from the list
        // it must be later be released by calling ReleaseItem
        // Acquire Item doesn't affect current unless it is the one acquired
        function  AcquireItem : TItemPtr;
        // Release an item previously acuired. The item is moved to the
        // inactive list for later reuse
        procedure ReleaseItem(Item : TItemPtr);
        // Free all items in the inactive List
        procedure FreeAllInactiveItems;

        property Current       : TItemPtr      read FCurrent;
        property Count         : Integer       read FActiveList.Count;
        property CountInactive : Integer       read FInactiveList.Count;
        property CountAcquired : Integer       read FAcquiredList.Count;
    end;

    // Class to support for..in construct with the main TCommQueue<T> class
    // Warning: for..in is not really thread safe
    TCommQueueEnum<T : Record> = class
    protected
        FQueue : TCommQueue<T>;
        FIndex : Pointer;
    public
        constructor Create(AQueue : TCommQueue<T>);
        function GetCurrent: T;
        function MoveNext: Boolean;
        property Current: T read GetCurrent;
    end;


The generic class TCommQueue takes one parameter which is the data type you want to use for the items. Instead of I could have used. The difference is that the later allow any data type to be used for T while specifying "record" impose the constraint of have a non nullable data type, mostly a record but also a scalar type such as integer, char and the likes. You cannot use string, array, class nor interface. I imposed such restriction because memory is allocated/freed using new/dispose.

The doubly linked list is declared as a nested data type at line 19. It is a record named TLinkedList. It contains two pointers to each end of the linked list, and a count of items in the list. I added a count because counting elements in a linked list is expensive: you have to iterate thru entire list to count how many node it has. Maintaining a counter is a useful optimization which doesn't cost much.

The nodes or items of the linked list are made of a record named TLLNode defined. As you can see at line 15, it makes use of the parameter type to build the record. Remember, in a doubly linked list, a node is made of the actual data you want in the list plus two pointers to the next and previous node in the list.

Lines 23 to 26 are method declarations for the record. Their implementation handles adding (append) or extracting (remove) nodes. This is only a matter of manipulation the pointers, no data is actually allocated, moved nor freed. This is really fast!

At line 29, 30 and 31 are declared the 3 doubly linked lists we talked before. Since TLinkedList are records, there is no need to allocate/free memory.

Line 31 declares FCurrent a pointer to an item. It is used in conjunction with the methods First, Next, Previous and Last that I implemented to iterate thru the list. Please note that those are NOT thread safe.

For the fun, I implemented an enumerator for the queue. Please refer to my article "Writing an iterator for a container" for details: http://francois-piette.blogspot.be/2012/12/writing-iterator-for-container.html
The enumerator is NOT thread safe and copies the data which is bad for performance.

The real interesting stuff is in methods Add, AcquireItem and ReleaseItem. They are all fully thread safe.

Add will add an item to the queue (Active linked list), reusing an inactive one or allocating a new item. Add returns a pointer to the item. There are two overloaded versions of method add. One without argument and one with a method pointer argument.

The version with the method pointer argument (line 48) is required to be fully thread safe. Allocating a new item in the queue may require initialization of the item. But initialization must take place before the item is visible in the queue otherwise another thread could remove it before it is initialized. The method pointer passed as argument is used as a callback (a kind of event if you like) which will be called within add method just after the new item is allocated or extracted from inactive list and just before it is actually appended to the active list.

A last note about multithreading: we handle linked list pointers in the implementation. To be thread safe, the handling must make sure only one thread at a time can update any of the linked lists. This is why I used a critical section declared at line 33.


Implementation

The implementation is rather boring. It is actually very basic Delphi programming. I give the source below to be complete. Drop a message if you need some more explanations.
Complete source code is available at http://www.overbyte.be/eng/blog_source_code.html


Follow me on Twitter
Follow me on LinkedIn
Follow me on Google+
Visit my website: http://www.overbyte.be

The article is available at:
     http://francois-piette.blogspot.com/2013/02/high-speed-generic-queue-class.html

1 comment:

Chris Nillissen said...

It would be good to see this class expanded to also include some methods to serialise the queue to disk and back again.