Example 6: Client and server processes

This is a two-process example demonstrating simple asynchronous message delivery between a client and server process.

Both asynch_client.c and asynch_server.c files share the same header, asynch_server.h, to find each other using the registered name, and a defined message type for the query:

#define RECV_NAME "ASYNCH_RECEIVER"
#define GET_ACHID 2 // message type for async chid query: no data; reply is 32-bit chid

The following file (asynch_server.c) is the server code that demonstrates asynchronous messaging. It registers a name with name_attach() so that the client(s) can find it, and answers one query from a client to get its asynchronous channel ID.

This server uses a MsgReceive() as a blocking point, and gets a pulse from the kernel whenever there are messages available to receive on the asynchronous channel, allowing the server to handle both synchronous and asynchronous messages easily.

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sys/siginfo.h>
#include <sys/neutrino.h>
#include <sys/dispatch.h>
#include <sys/asyncmsg.h>
#include <unistd.h>
#include <sys/trace.h>

#include "asynch_server.h"

#define PROGNAME "asynch_server: "

/* structure for receive buffer for MsgReceive
 * currently only query message or pulse. */
union recv_msgs
{
    struct _pulse pulse;
    uint16_t type;
} recv_buf;

/* pulse code for asynch message notification from kernel
 * this is a user pulse code, chosen here -- it need not
 * be this value or any particular value
 */
#define PULSE_ASYNCH_EVENT (_PULSE_CODE_MINAVAIL + 5)

int main(int argc, char *argv[])
{
    name_attach_t *att;
    int rcvid;
    struct _msg_info msg_info;
    struct sigevent sigev;
    int self_coid;
    int achid;
    struct _asyncmsg_get_header *agh, *agh1;

    /* register my name so client can find me */
    att = name_attach(NULL,RECV_NAME, 0 );
    if (NULL == att )
    {
        perror(PROGNAME "name_attach()");
        exit(EXIT_FAILURE);
    }

    /* create a connection to the synchronous channel created by the
     * name_attach() call.  Will use this to specify where the pulses
     * flagging async data available should be delivered.
     */
    
    self_coid = ConnectAttach( 0, 0, att->chid, _NTO_SIDE_CHANNEL, 0 );
    if( -1 == self_coid )
    {
        perror(PROGNAME "ConnectAttach");
        exit( EXIT_FAILURE );
    }
    /* and fill in the event structure to describe a priority 10 pulse
     * to be delivered to my synchronous channel */
     
    SIGEV_PULSE_INIT( &sigev, self_coid, 10, PULSE_ASYNCH_EVENT, 0 );

    /* create an asynchronous channel with automatic buffering
     *   it will not block an asyncmsg_get() call if there are no messages available
     *   it will receive up to 10 messages of up to 1024 bytes each at once
     *   I will get a pulse notification when the queue of available messages goes
     *     from empty to non-empty
     */
     
    achid = asyncmsg_channel_create( _NTO_CHF_ASYNC_NONBLOCK, 0666, 1024, 10, &sigev, NULL );
    
    if( -1 == achid )
    {
        perror( "asyncmsg_channel_create");
        exit( EXIT_FAILURE );
    }
    
    while(1)
    {
        rcvid = MsgReceive( att->chid, &recv_buf, sizeof (recv_buf), &msg_info );
        
    if( -1 == rcvid )
        {
            perror(PROGNAME "MsgReceive failed");
            continue;
        }
        if ( 0 == rcvid )
        {
            /* we received a pulse */
            printf("got a pulse\n");
            switch( recv_buf.pulse.code )
            {
                /* system disconnect pulse */
                
                case _PULSE_CODE_DISCONNECT:
                    ConnectDetach( recv_buf.pulse.scoid );
                    printf(PROGNAME "disconnect from a client %X\n", recv_buf.pulse.scoid);
                    break;
                /* our pulse - we've got one or more messages */
                
                case PULSE_ASYNCH_EVENT:
                    /* get one or more messages from our channel */
                    agh = asyncmsg_get( achid );
                    if (NULL == agh )
                    {
                        perror("went to get a message, but nothing was there");
                    } else
                    {
                       /* the async receive header is, actually, a linked list of headers
                        * if multiple messages have been received at once, so we need to
                        * walk the list, looking at each header and message in turn */
            
                       while( agh )
                       {
                           printf("the message came from %d in %d parts\n", agh->info.pid, agh->parts);
                           printf("the data is '%s'\n", (char *)(agh->iov->iov_base));
                           agh1 = agh;
                           agh = agh1->next;
                           /* free this message */
                           asyncmsg_free( agh1 );
                        }
                     }

                break;
                default:
                   printf(PROGNAME "unexpected pulse code: %d\n", recv_buf.pulse.code );
                   break;
            }
            continue;
        }
        /* not an error, not a pulse, therefor a message */
        
    if ( recv_buf.type == _IO_CONNECT )
        {
            /* _IO_CONNECT because someone did a name_open() to us, must EOK it.*/
            MsgReply( rcvid, EOK, NULL, 0 );
            continue;
        }
        
    if ( recv_buf.type > _IO_BASE && recv_buf.type <= _IO_MAX )
        {
            /* unexpected system message,probably qconn, error it */
            MsgError( rcvid, ENOSYS );
            continue;
        }
    
        switch( recv_buf.type )
        {
           /* here our client asked for our asynchronous channel id
           * reply with it */

           case GET_ACHID:
              printf("got request for my achid\n");
              MsgReply(rcvid, 0, &achid, sizeof(achid ));

              break;
           default:
              /* some other expect message */
              printf(PROGNAME "expect message type: %d\n", recv_buf.type);
              MsgError(rcvid, ENOSYS );
              break;
        }
    }
}

The following file, asynch_client.c, first locates the server using name_locate(), then sends a query to get the server's asynchronous channel ID. Next, it creates an asynchronous connection, and sends a message every second, with a callback attached to print out that the message was sent.

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sys/siginfo.h>
#include <sys/neutrino.h>
#include <sys/dispatch.h>
#include <sys/asyncmsg.h>
#include <unistd.h>
#include <sys/trace.h>

#include "asynch_server.h"

#define PROGNAME "asynch_client: "

/* callback is run after each message has been delivered to the server.  It is
 * registered at connect time, and called by a thread created during the
 * connect processing.
 */
int callback(int err, void *cmsg, unsigned handle)
{
        static int num_callback_run = 0;

    ++num_callback_run;
    printf("Callback: err = %d, msg = %p, handle = %d\n",
           err, cmsg, handle);
    printf("run the %dth time by %d tid\n", num_callback_run, pthread_self());
    return 0;
}

int main(int argc, char *argv[]) {
    int server_coid, a_coid;
    short msg;
    int a_chid;
    struct _asyncmsg_connection_attr aca;
    struct _server_info sinfo;
    int count = 1;

    /* look for server */
    server_coid = name_open( RECV_NAME, 0 );
    while( server_coid == -1 )
    {
      sleep(1);
      server_coid = name_open( RECV_NAME, 0 );
    }

        /* need server's pid, use this to get server info including pid */
        ConnectServerInfo( 0, server_coid, &sinfo );

        /* send message to server request the asynchronous channel id */
        msg = GET_ACHID;
    if (MsgSend( server_coid, &msg, sizeof( msg ), &a_chid, sizeof(a_chid) ))
    {
        perror(PROGNAME "MsgSend");
        exit( EXIT_FAILURE );
    }
    
    /* setup array of send buffers */
    memset(&aca, 0, sizeof(aca));
    aca.buffer_size = 2048;
    aca.max_num_buffer = 5;  /* don't locally accumulate more than 5 buffers */
    aca.trigger_num_msg = 1; /* deliver each message as it is sent */
    aca.call_back = callback; /* call this function after messages are sent */

    /* create my asynchronous connection to the server */
    a_coid = asyncmsg_connect_attach( 0, sinfo.pid, a_chid, 0, 0, &aca );
    if( -1 == a_coid )
    {
        perror( "async connect");
        printf("server pid is %d\n", sinfo.pid );
        exit( EXIT_FAILURE );
    }

    while(1)
    {   int ret, len;
        char buf[80];

        len = sprintf(buf, "message #%d", count)+1;
        printf("sending %dth time\n", count );

        /* deliver message to server, the count value will be passed into
         * my callback as the "handle"
         */
        ret = asyncmsg_put( a_coid, buf, len, count, NULL );
        if( -1 == ret )
        {
                perror( "put");
                exit( EXIT_FAILURE );
        }
        count++;
        sleep(1);
    }
}