Handling private messages and pulses

A resource manager may need to receive and handle pulses, perhaps because an interrupt handler has returned a pulse or some other thread or process has sent a pulse.

The main issue with pulses is that they have to be received as a message. This means that a thread has to explicitly perform a MsgReceive() in order to get the pulse. But unless this pulse is sent to a different channel than the one that the resource manager is using for its main messaging interface, it will be received by the library. Therefore, we need to see how a resource manager can associate a pulse code with a handler routine and communicate that information to the library.

You can use the pulse_attach() function to associate a pulse code with a handler function. When the dispatch layer receives a pulse, it will look up the pulse code and see which associated handler to call to handle the pulse message.

In this example, we create the same resource manager, but this time we also attach a pulse, which is then used as a timer event:

#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>

#define THREAD_POOL_PARAM_T     dispatch_context_t
#include <sys/iofunc.h>
#include <sys/dispatch.h>

static resmgr_connect_funcs_t   connect_func;
static resmgr_io_funcs_t        io_func;
static iofunc_attr_t            attr;

int
timer_tick( message_context_t *ctp, int code, unsigned flags,
            void *handle)
{
    union sigval value = ctp->msg->pulse.value;

    /* Do some useful work whenever the timer expires. */
    printf("received timer event, value %d\n", value.sival_int);

    return 0;
}

int
main(int argc, char **argv) {
    thread_pool_attr_t    pool_attr;
    struct sigevent       event;
    struct _itimer        itime;
    dispatch_t            *dpp;
    thread_pool_t         *tpp;
    int                   timer_id;
    int                   id;

    dpp = dispatch_create();
    if(dpp == NULL) {
        fprintf(stderr, "%s: Unable to allocate dispatch handle.\n", argv[0]);
        return EXIT_FAILURE;
    }

    memset(&pool_attr, 0, sizeof pool_attr);
    pool_attr.handle = dpp;
    /*  We are doing resmgr and pulse-type attaches.
     *
     *  If you're going to use custom messages or pulses with 
     *  the message_attach() or pulse_attach() functions,
     *  then you MUST use the dispatch functions
     *  dispatch_block(), dispatch_handler(), and so on.
     */
    pool_attr.context_alloc = dispatch_context_alloc;
    pool_attr.block_func = dispatch_block; 
    pool_attr.unblock_func = dispatch_unblock;
    pool_attr.handler_func = dispatch_handler;
    pool_attr.context_free = dispatch_context_free;
    pool_attr.lo_water = 2;
    pool_attr.hi_water = 4;
    pool_attr.increment = 1;
    pool_attr.maximum = 50;

    tpp = thread_pool_create(&pool_attr, POOL_FLAG_EXIT_SELF);
    if(tpp == NULL) {
        fprintf(stderr, "%s: Unable to initialize thread pool.\n",argv[0]);
        return EXIT_FAILURE;
    }

    iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_func, _RESMGR_IO_NFUNCS,
                     &io_func);
    iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
        
    id = resmgr_attach(dpp, NULL, "/dev/sample", _FTYPE_ANY, 0, &connect_func, &io_func,
                       &attr);
    if(id == -1) {
        fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
        return EXIT_FAILURE;
    }

    /* Initialize an event structure, and attach a pulse to it */
    event.sigev_code = pulse_attach(dpp, MSG_FLAG_ALLOC_PULSE, 0, &timer_tick, NULL);
    if(event.sigev_code == -1) {
        fprintf(stderr, "Unable to attach timer pulse.\n");
         return EXIT_FAILURE;
    }

    /* Connect to our channel */
    event.sigev_coid = message_connect(dpp, MSG_FLAG_SIDE_CHANNEL);
    if(event.sigev_coid == -1) {
        fprintf(stderr, "Unable to attach to channel.\n");
        return EXIT_FAILURE;
    }

    event.sigev_notify = SIGEV_PULSE;
    event.sigev_priority = -1;
    /* We could create several timers and use different sigev values for each */
    event.sigev_value.sival_int = 0;

    timer_id = TimerCreate(CLOCK_MONOTONIC, &event);
    if(timer_id == -1) {;
        fprintf(stderr, "Unable to attach channel and connection.\n");
        return EXIT_FAILURE;
    }

    /* And now set up our timer to fire every second */
    itime.nsec = 1000000000;
    itime.interval_nsec = 1000000000;
    TimerSettime(timer_id, 0, &itime, NULL);

    /* Never returns */
    thread_pool_start(tpp);
    return EXIT_SUCCESS;
}

We can either define our own pulse code (e.g., #define OurPulseCode 57) and pass it to pulse_attach(), or we can specify MSG_FLAG_ALLOC_PULSE in the flags argument to ask the function to dynamically generate a pulse code.

You can also use the path that a resource manager registers to get a connection ID (coid) that you can use with MsgSend() to send messages to your resource manager. This means that you don't have to use read() and write() to interact with a resource manager.

This example consists of client and server programs. Let's begin with the server:

/*
 * ResMgr and Message Server Process
 */

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>

resmgr_connect_funcs_t  ConnectFuncs;
resmgr_io_funcs_t       IoFuncs;
iofunc_attr_t           IoFuncAttr;


typedef struct
{
    uint16_t msg_no;
    char     msg_data[255];
} server_msg_t;


int message_callback( message_context_t *ctp, int type, unsigned flags, 
                      void *handle )
{
    server_msg_t *msg;
    int num;
    char msg_reply[255];

    /* Cast a pointer to the message data */
    msg = (server_msg_t *)ctp->msg;

    /* Print some useful information about the message */
    printf( "\n\nServer Got Message:\n" );
    printf( "  type: %d\n" , type );
    printf( "  data: %s\n\n", msg->msg_data );

    /* Build the reply message */
    num = type - _IO_MAX;
    snprintf( msg_reply, 254, "Server got message code: _IO_MAX + %d", num );
   
    /* Send a reply to the waiting (blocked) client */ 
    MsgReply( ctp->rcvid, EOK, msg_reply, strlen( msg_reply ) + 1 );

    return 0;
}

int main( int argc, char **argv )
{
    dispatch_t           *dpp;
    dispatch_context_t   *ctp, *ctp_ret;
    int                  resmgr_id, message_id;

    /* Create the dispatch interface */
    dpp = dispatch_create();
    if( dpp == NULL )
    {
        fprintf( stderr, "dispatch_create() failed: %s\n", 
                 strerror( errno ) );
        return EXIT_FAILURE;
    }

    /* Set up the default I/O functions to handle open/read/write/... */
    iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,
                      _RESMGR_IO_NFUNCS, &IoFuncs );

    /* Set up the attribute for the entry in the filesystem */
    iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );

    resmgr_id = resmgr_attach( dpp, NULL, "serv", _FTYPE_ANY, 
                               0, &ConnectFuncs, &IoFuncs, &IoFuncAttr );
    if( resmgr_id == -1 )
    {
        fprintf( stderr, "resmgr_attach() failed: %s\n", strerror( errno ) );
        return EXIT_FAILURE;
    }

    /* Attach a callback (handler) for two message types */
    message_id = message_attach( dpp, NULL, _IO_MAX + 1,
                                 _IO_MAX + 2, message_callback, NULL );
    if( message_id == -1 )
    {
        fprintf( stderr, "message_attach() failed: %s\n", strerror( errno ) );
        return EXIT_FAILURE;
    }

    /* Set up a context for the dispatch layer to use */
    ctp = dispatch_context_alloc( dpp );
    if( ctp == NULL )
    {
        fprintf( stderr, "dispatch_context_alloc() failed: %s\n", 
                 strerror( errno ) );
        return EXIT_FAILURE;
    }


    /* Get and process messages */
    while( 1 )
    {
        ctp_ret = dispatch_block( ctp );
        if( ctp_ret )
        {
            dispatch_handler( ctp );
        }
        else
        {
            fprintf( stderr, "dispatch_block() failed: %s\n", 
                     strerror( errno ) );
            return EXIT_FAILURE;
        }
    }

    return EXIT_SUCCESS;
}

When the server calls resmgr_attach(), it registers the filesystem entry serv. Since the server doesn't specify an absolute path, the entry appears in the directory where the server was run. This gives us a filesystem entry that can be opened and closed, but generally behaves the same as /dev/null.

We call message_attach() to tell the dispatch layer that we'll be handling our own messages in addition to the standard I/O and connection messages handled by the resmgr layer. All incoming messages must have an unsigned 16-bit integer at the start indicating the message type. Note that the range 0x0 to _IO_MAX is reserved for the OS. We set up our message_callback() routine to handle messages of type _IO_MAX + 1 and _IO_MAX + 2. You can specify a pointer to arbitrary data to pass to the callback, but we don't need that, so we set it to NULL.

When a message of the appropriate type is received, our message_callback() routine is invoked. We get the message type passed in via the type parameter. The actual message data can be found in ctp->msg. When the message comes in, the server prints the message type and the string that was sent from the client. It then prints the offset from _IO_MAX of the message type, and then formats a reply string and sends the reply back to the client via ctp->rcvid using MsgReply().

The client is much simpler:

/* 
 * Message Client Process 
 */ 

#include <stdio.h> 
#include <unistd.h> 
#include <stdlib.h> 
#include <fcntl.h> 
#include <errno.h> 
#include <string.h> 
#include <sys/neutrino.h> 
#include <sys/iofunc.h> 
#include <sys/dispatch.h> 

typedef struct 
{ 
    uint16_t msg_no; 
    char msg_data[255]; 
} client_msg_t; 

int main( int argc, char **argv ) 
{ 
    int fd; 
    int c; 
    client_msg_t msg; 
    long ret; 
    int num; 
    char msg_reply[255]; 
    
    num = 1; 
    
    /* Process any command-line arguments */ 
    while( ( c = getopt( argc, argv, "n:" ) ) != -1 ) 
    { 
        if( c == 'n' ) 
        { 
            num = strtol( optarg, 0, 0 ); 
        } 
    } 
    /* Open a connection to the server (fd == coid) */ 
    fd = open( "serv", O_RDWR ); 
    if( fd == -1 ) 
    { 
        fprintf( stderr, "Unable to open server connection: %s\n", 
            strerror( errno ) ); 
        return EXIT_FAILURE; 
    } 
    
    /* Clear the memory for the msg and the reply */ 
    memset( &msg, 0, sizeof( msg ) ); 
    memset( &msg_reply, 0, sizeof( msg_reply ) ); 
    
    /* Set up the message data to send to the server */ 
    msg.msg_no = _IO_MAX + num; 
    snprintf( msg.msg_data, 254, "client %d requesting reply.", getpid() ); 
    
    printf( "client: msg_no: _IO_MAX + %d\n", num ); 
    fflush( stdout ); 
    
    /* Send the data to the server and get a reply */ 
    ret = MsgSend( fd, &msg, sizeof( msg ), msg_reply, 255 ); 
    if( ret == -1 ) 
    { 
        fprintf( stderr, "Unable to MsgSend() to server: %s\n", strerror( errno ) ); 
        return EXIT_FAILURE; 
    } 
    
    /* Print out the reply data */ 
    printf( "client: server replied: %s\n", msg_reply ); 
    
    close( fd ); 
    
    return EXIT_SUCCESS; 
} 
Note: Remember that since the server registers a relative pathname, the client must be run from the same directory as the server.

The client uses the open() function to get a coid (the server's default resmgr setup takes care of all of this on the server side), and performs a MsgSend() to the server based on this coid, and then waits for the reply. When the reply comes back, the client prints the reply data.

You can give the client the command-line option -n offset to specify the offset from _IO_MAX to use for the message type. If you give anything other than 1 or 2 as the offset, the MsgSend() fails, since the server hasn't set up handlers for those messages.