This is a two-process example demonstrating simple asynchronous message delivery between a client and server process.
Both asynch_client.c and asynch_server.c files share the same header, asynch_server.h, to find each other using the registered name, and a defined message type for the query:
#define RECV_NAME "ASYNCH_RECEIVER" #define GET_ACHID 2 // message type for async chid query: no data; reply is 32-bit chid
The following file (asynch_server.c) is the server code that demonstrates asynchronous messaging. It registers a name with name_attach() so that the client(s) can find it, and answers one query from a client to get its asynchronous channel ID.
This server uses a MsgReceive() as a blocking point, and gets a pulse from the kernel whenever there are messages available to receive on the asynchronous channel, allowing the server to handle both synchronous and asynchronous messages easily.
#include <stdlib.h> #include <stdio.h> #include <errno.h> #include <sys/siginfo.h> #include <sys/neutrino.h> #include <sys/dispatch.h> #include <sys/asyncmsg.h> #include <unistd.h> #include <sys/trace.h> #include "asynch_server.h" #define PROGNAME "asynch_server: " /* structure for receive buffer for MsgReceive * currently only query message or pulse. */ union recv_msgs { struct _pulse pulse; uint16_t type; } recv_buf; /* pulse code for asynch message notification from kernel * this is a user pulse code, chosen here -- it need not * be this value or any particular value */ #define PULSE_ASYNCH_EVENT (_PULSE_CODE_MINAVAIL + 5) int main(int argc, char *argv[]) { name_attach_t *att; int rcvid; struct _msg_info msg_info; struct sigevent sigev; int self_coid; int achid; struct _asyncmsg_get_header *agh, *agh1; /* register my name so client can find me */ att = name_attach(NULL,RECV_NAME, 0 ); if (NULL == att ) { perror(PROGNAME "name_attach()"); exit(EXIT_FAILURE); } /* create a connection to the synchronous channel created by the * name_attach() call. Will use this to specify where the pulses * flagging async data available should be delivered. */ self_coid = ConnectAttach( 0, 0, att->chid, _NTO_SIDE_CHANNEL, 0 ); if( -1 == self_coid ) { perror(PROGNAME "ConnectAttach"); exit( EXIT_FAILURE ); } /* and fill in the event structure to describe a priority 10 pulse * to be delivered to my synchronous channel */ SIGEV_PULSE_INIT( &sigev, self_coid, 10, PULSE_ASYNCH_EVENT, 0 ); /* create an asynchronous channel with automatic buffering * it will not block an asyncmsg_get() call if there are no messages available * it will receive up to 10 messages of up to 1024 bytes each at once * I will get a pulse notification when the queue of available messages goes * from empty to non-empty */ achid = asyncmsg_channel_create( _NTO_CHF_ASYNC_NONBLOCK, 0666, 1024, 10, &sigev, NULL ); if( -1 == achid ) { perror( "asyncmsg_channel_create"); exit( EXIT_FAILURE ); } while(1) { rcvid = MsgReceive( att->chid, &recv_buf, sizeof (recv_buf), &msg_info ); if( -1 == rcvid ) { perror(PROGNAME "MsgReceive failed"); continue; } if ( 0 == rcvid ) { /* we received a pulse */ printf("got a pulse\n"); switch( recv_buf.pulse.code ) { /* system disconnect pulse */ case _PULSE_CODE_DISCONNECT: ConnectDetach( recv_buf.pulse.scoid ); printf(PROGNAME "disconnect from a client %X\n", recv_buf.pulse.scoid); break; /* our pulse - we've got one or more messages */ case PULSE_ASYNCH_EVENT: /* get one or more messages from our channel */ agh = asyncmsg_get( achid ); if (NULL == agh ) { perror("went to get a message, but nothing was there"); } else { /* the async receive header is, actually, a linked list of headers * if multiple messages have been received at once, so we need to * walk the list, looking at each header and message in turn */ while( agh ) { printf("the message came from %d in %d parts\n", agh->info.pid, agh->parts); printf("the data is '%s'\n", (char *)(agh->iov->iov_base)); agh1 = agh; agh = agh1->next; /* free this message */ asyncmsg_free( agh1 ); } } break; default: printf(PROGNAME "unexpected pulse code: %d\n", recv_buf.pulse.code ); break; } continue; } /* not an error, not a pulse, therefor a message */ if ( recv_buf.type == _IO_CONNECT ) { /* _IO_CONNECT because someone did a name_open() to us, must EOK it.*/ MsgReply( rcvid, EOK, NULL, 0 ); continue; } if ( recv_buf.type > _IO_BASE && recv_buf.type <= _IO_MAX ) { /* unexpected system message,probably qconn, error it */ MsgError( rcvid, ENOSYS ); continue; } switch( recv_buf.type ) { /* here our client asked for our asynchronous channel id * reply with it */ case GET_ACHID: printf("got request for my achid\n"); MsgReply(rcvid, 0, &achid, sizeof(achid )); break; default: /* some other expect message */ printf(PROGNAME "expect message type: %d\n", recv_buf.type); MsgError(rcvid, ENOSYS ); break; } } }
The following file, asynch_client.c, first locates the server using name_locate(), then sends a query to get the server's asynchronous channel ID. Next, it creates an asynchronous connection, and sends a message every second, with a callback attached to print out that the message was sent.
#include <stdlib.h> #include <stdio.h> #include <errno.h> #include <sys/siginfo.h> #include <sys/neutrino.h> #include <sys/dispatch.h> #include <sys/asyncmsg.h> #include <unistd.h> #include <sys/trace.h> #include "asynch_server.h" #define PROGNAME "asynch_client: " /* callback is run after each message has been delivered to the server. It is * registered at connect time, and called by a thread created during the * connect processing. */ int callback(int err, void *cmsg, unsigned handle) { static int num_callback_run = 0; ++num_callback_run; printf("Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle); printf("run the %dth time by %d tid\n", num_callback_run, pthread_self()); return 0; } int main(int argc, char *argv[]) { int server_coid, a_coid; short msg; int a_chid; struct _asyncmsg_connection_attr aca; struct _server_info sinfo; int count = 1; /* look for server */ server_coid = name_open( RECV_NAME, 0 ); while( server_coid == -1 ) { sleep(1); server_coid = name_open( RECV_NAME, 0 ); } /* need server's pid, use this to get server info including pid */ ConnectServerInfo( 0, server_coid, &sinfo ); /* send message to server request the asynchronous channel id */ msg = GET_ACHID; if (MsgSend( server_coid, &msg, sizeof( msg ), &a_chid, sizeof(a_chid) )) { perror(PROGNAME "MsgSend"); exit( EXIT_FAILURE ); } /* setup array of send buffers */ memset(&aca, 0, sizeof(aca)); aca.buffer_size = 2048; aca.max_num_buffer = 5; /* don't locally accumulate more than 5 buffers */ aca.trigger_num_msg = 1; /* deliver each message as it is sent */ aca.call_back = callback; /* call this function after messages are sent */ /* create my asynchronous connection to the server */ a_coid = asyncmsg_connect_attach( 0, sinfo.pid, a_chid, 0, 0, &aca ); if( -1 == a_coid ) { perror( "async connect"); printf("server pid is %d\n", sinfo.pid ); exit( EXIT_FAILURE ); } while(1) { int ret, len; char buf[80]; len = sprintf(buf, "message #%d", count)+1; printf("sending %dth time\n", count ); /* deliver message to server, the count value will be passed into * my callback as the "handle" */ ret = asyncmsg_put( a_coid, buf, len, count, NULL ); if( -1 == ret ) { perror( "put"); exit( EXIT_FAILURE ); } count++; sleep(1); } }