Multithreaded Resource Managers

This chapter includes:

Multithreaded resource manager example

Let's look at our multithreaded resource manager example in more detail:

#include <errno.h>
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>

/*
 *  define THREAD_POOL_PARAM_T such that we can avoid a compiler
 *  warning when we use the dispatch_*() functions below
 */
#define THREAD_POOL_PARAM_T dispatch_context_t

#include <sys/iofunc.h>
#include <sys/dispatch.h>

static resmgr_connect_funcs_t    connect_funcs;
static resmgr_io_funcs_t         io_funcs;
static iofunc_attr_t             attr;

main(int argc, char **argv)
{
    /* declare variables we'll be using */
    thread_pool_attr_t   pool_attr;
    resmgr_attr_t        resmgr_attr;
    dispatch_t           *dpp;
    thread_pool_t        *tpp;
    dispatch_context_t   *ctp;
    int                  id;

    /* initialize dispatch interface */
    if((dpp = dispatch_create()) == NULL) {
        fprintf(stderr,
                "%s: Unable to allocate dispatch handle.\n",
                argv[0]);
        return EXIT_FAILURE;
    }

    /* initialize resource manager attributes */
    memset(&resmgr_attr, 0, sizeof resmgr_attr);
    resmgr_attr.nparts_max = 1;
    resmgr_attr.msg_max_size = 2048;

    /* initialize functions for handling messages */
    iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 
                     _RESMGR_IO_NFUNCS, &io_funcs);

    /* initialize attribute structure used by the device */
    iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);

    /* attach our device name */
    id = resmgr_attach(
            dpp,            /* dispatch handle        */
            &resmgr_attr,   /* resource manager attrs */
            "/dev/sample",  /* device name            */
            _FTYPE_ANY,     /* open type              */
            0,              /* flags                  */
            &connect_funcs, /* connect routines       */
            &io_funcs,      /* I/O routines           */
            &attr);         /* handle                 */
    if(id == -1) {
        fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
        return EXIT_FAILURE;
    }

    /* initialize thread pool attributes */
    memset(&pool_attr, 0, sizeof pool_attr);
    pool_attr.handle = dpp;
    pool_attr.context_alloc = dispatch_context_alloc;
    pool_attr.block_func = dispatch_block;
    pool_attr.unblock_func = dispatch_unblock;
    pool_attr.handler_func = dispatch_handler;
    pool_attr.context_free = dispatch_context_free;
    pool_attr.lo_water = 2;
    pool_attr.hi_water = 4;
    pool_attr.increment = 1;
    pool_attr.maximum = 50;

    /* allocate a thread pool handle */
    if((tpp = thread_pool_create(&pool_attr, 
                                 POOL_FLAG_EXIT_SELF)) == NULL) {
        fprintf(stderr, "%s: Unable to initialize thread pool.\n",
                argv[0]);
        return EXIT_FAILURE;
    }

    /* start the threads, will not return */
    thread_pool_start(tpp);
}

The thread pool attribute (pool_attr) controls various aspects of the thread pool, such as which functions get called when a new thread is started or dies, the total number of worker threads, the minimum number, and so on.

Thread pool attributes

Here's the _thread_pool_attr structure:

typedef struct _thread_pool_attr {
  THREAD_POOL_HANDLE_T  *handle;
  THREAD_POOL_PARAM_T   *(*block_func)(THREAD_POOL_PARAM_T *ctp);
  void                  (*unblock_func)(THREAD_POOL_PARAM_T *ctp);
  int                   (*handler_func)(THREAD_POOL_PARAM_T *ctp);
  THREAD_POOL_PARAM_T   *(*context_alloc)(
                            THREAD_POOL_HANDLE_T *handle);
  void                  (*context_free)(THREAD_POOL_PARAM_T *ctp);
  pthread_attr_t        *attr;
  unsigned short        lo_water;
  unsigned short        increment;
  unsigned short        hi_water;
  unsigned short        maximum;
  unsigned              reserved[8];
} thread_pool_attr_t;

The functions that you fill into the above structure can be taken from the dispatch layer (dispatch_block(), ...), the resmgr layer (resmgr_block(), ...) or they can be of your own making. If you're not using the resmgr layer functions, then you'll have to define THREAD_POOL_PARAM_T to some sort of context structure for the library to pass between the various functions. By default, it's defined as a resmgr_context_t but since this sample is using the dispatch layer, we needed it to be adispatch_context_t. We defined it prior to doing the includes above since the header files refer to it. THREAD_POOL_PARAM_T

Part of the above structure contains information telling the resource manager library how you want it to handle multiple threads (if at all). During development, you should design your resource manager with multiple threads in mind. But during testing, you'll most likely have only one thread running (to simplify debugging). Later, after you've ensured that the base functionality of your resource manager is stable, you may wish to “turn on” multiple threads and revisit the debug cycle.

The following members control the number of threads that are running:

lo_water
Minimum number of blocked threads.
increment
Number of thread to create at a time to achieve lo_water.
hi_water
Maximum number of blocked threads.
maximum
Total number of threads created at any time.

The important parameters specify the maximum thread count and the increment. The value for maximum should ensure that there's always a thread in a RECEIVE-blocked state. If you're at the number of maximum threads, then your clients will block until a free thread is ready to receive data. The value you specify for increment will cut down on the number of times your driver needs to create threads. It's probably wise to err on the side of creating more threads and leaving them around rather than have them being created/destroyed all the time.

You determine the number of threads you want to be RECEIVE-blocked on the MsgReceive() at any time by filling in the lo_water parameter.

If you ever have fewer than lo_water threads RECEIVE-blocked, the increment parameter specifies how many threads should be created at once, so that at least lo_water number of threads are once again RECEIVE-blocked.

Once the threads are done their processing, they will return to the block function. The hi_water variable specifies an upper limit to the number of threads that are RECEIVE-blocked. Once this limit is reached, the threads will destroy themselves to ensure that no more than hi_water number of threads are RECEIVE-blocked.

To prevent the number of threads from increasing without bounds, the maximum parameter limits the absolute maximum number of threads that will ever run simultaneously.

When threads are created by the resource manager library, they'll have a stack size as specified by the thread_stack_size parameter. If you want to specify stack size or priority, fill in pool_attr.attr with a proper pthread_attr_t pointer.

The thread_pool_attr_t structure contains pointers to several functions:

block_func()
Called by the worker thread when it needs to block waiting for some message.
handler_func()
Called by the thread when it has unblocked because it received a message. This function processes the message.
context_alloc()
Called when a new thread is created. Returns a context that this thread uses to do its work.
context_free()
Free the context when the worker thread exits.
unblock_func()
Called by the library to shutdown the thread pool or change the number of running threads.

Thread pool functions

The library provides the following thread pool functions:

thread_pool_create()
Initializes the pool context. Returns a thread pool handle (tpp) that's used to start the thread pool.
thread_pool_start()
Start the thread pool. This function may or may not return, depending on the flags passed to thread_pool_create().
thread_pool_destroy()
Destroy a thread pool.
thread_pool_control()
Control the number of threads.

Note:

In the example provided in the multithreaded resource managers section, thread_pool_start(tpp) never returns because we set the POOL_FLAG_EXIT_SELF bit. Also, the POOL_FLAG_USE_SELF flag itself never returns, but the current thread becomes part of the thread pool.


If no flags are passed (i.e. 0 instead of any flags), the function returns after the thread pool is created.