Handling private messages and pulses
A thread or process may send a pulse, and a resource manager must know how to receive and handle it.
The main issue with pulses is that they have to be received as a message. This means that a thread has to explicitly perform a MsgReceive() in order to get the pulse. But unless this pulse is sent to a different channel than the one that the resource manager is using for its main messaging interface, it will be received by the library. Therefore, we need to see how a resource manager can associate a pulse code with a handler routine and communicate that information to the library.
You can use the pulse_attach() function to associate a pulse code with a handler function. When the dispatch layer receives a pulse, it will look up the pulse code and see which associated handler to call to handle the pulse message.
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#define THREAD_POOL_PARAM_T dispatch_context_t
#include <sys/iofunc.h>
#include <sys/dispatch.h>
static resmgr_connect_funcs_t connect_func;
static resmgr_io_funcs_t io_func;
static iofunc_attr_t attr;
int timer_tick( message_context_t *ctp, int code, unsigned flags, void *handle)
{
union sigval value = ctp->msg->pulse.value;
/* Do some useful work whenever the timer expires. */
printf("received timer event, value %d\n", value.sival_int);
return 0;
}
int main(int argc, char **argv)
{
thread_pool_attr_t pool_attr;
struct sigevent event;
struct _itimer itime;
dispatch_t *dpp;
thread_pool_t *tpp;
int timer_id;
int id;
dpp = dispatch_create();
if(dpp == NULL) {
fprintf(stderr, "%s: Unable to allocate dispatch handle.\n", argv[0]);
return EXIT_FAILURE;
}
memset(&pool_attr, 0, sizeof pool_attr);
pool_attr.handle = dpp;
/* We are doing resmgr and pulse-type attaches.
*
* If you're going to use custom messages or pulses with
* the message_attach() or pulse_attach() functions, then you MUST use
* the dispatch functions dispatch_block(), dispatch_handler(), and so on.
*/
pool_attr.context_alloc = dispatch_context_alloc;
pool_attr.block_func = dispatch_block;
pool_attr.unblock_func = dispatch_unblock;
pool_attr.handler_func = dispatch_handler;
pool_attr.context_free = dispatch_context_free;
pool_attr.lo_water = 2;
pool_attr.hi_water = 4;
pool_attr.increment = 1;
pool_attr.maximum = 50;
tpp = thread_pool_create(&pool_attr, POOL_FLAG_EXIT_SELF);
if(tpp == NULL) {
fprintf(stderr, "%s: Unable to initialize thread pool.\n",argv[0]);
return EXIT_FAILURE;
}
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_func,
_RESMGR_IO_NFUNCS, &io_func);
iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
id = resmgr_attach(dpp, NULL, "/dev/sample", _FTYPE_ANY, 0,
&connect_func, &io_func, &attr);
if(id == -1) {
fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
return EXIT_FAILURE;
}
/* Initialize an event structure, and attach a pulse to it */
event.sigev_code = pulse_attach(dpp, MSG_FLAG_ALLOC_PULSE, 0,
&timer_tick, NULL);
if(event.sigev_code == -1) {
fprintf(stderr, "Unable to attach timer pulse.\n");
return EXIT_FAILURE;
}
/* Connect to our channel */
event.sigev_coid = message_connect(dpp, MSG_FLAG_SIDE_CHANNEL);
if(event.sigev_coid == -1) {
fprintf(stderr, "Unable to attach to channel.\n");
return EXIT_FAILURE;
}
event.sigev_notify = SIGEV_PULSE;
event.sigev_priority = -1;
/* We could create several timers and use different sigev values for each */
event.sigev_value.sival_int = 0;
timer_id = TimerCreate(CLOCK_MONOTONIC, &event);
if(timer_id == -1) {;
fprintf(stderr, "Unable to attach channel and connection.\n");
return EXIT_FAILURE;
}
/* And now set up our timer to fire every second */
itime.nsec = 1000000000;
itime.interval_nsec = 1000000000;
TimerSettime(timer_id, 0, &itime, NULL);
/* Never returns */
thread_pool_start(tpp);
return EXIT_SUCCESS;
}
We can either define our own pulse code (e.g., #define OurPulseCode 57
) and pass it to
pulse_attach(),
or we can specify MSG_FLAG_ALLOC_PULSE in the flags argument
to ask the function to dynamically generate a pulse code.
You can also use the path that a resource manager registers to get a connection ID (coid) that you can use with MsgSend() to send messages to your resource manager. This means that you don't have to use read() and write() to interact with a resource manager.
/*
* ResMgr and Message Server Process
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
resmgr_connect_funcs_t ConnectFuncs;
resmgr_io_funcs_t IoFuncs;
iofunc_attr_t IoFuncAttr;
typedef struct
{
uint16_t msg_no;
char msg_data[255];
} server_msg_t;
int message_callback( message_context_t *ctp, int type,
unsigned flags, void *handle )
{
server_msg_t *msg;
int num;
char msg_reply[255];
/* Cast a pointer to the message data */
msg = (server_msg_t *)ctp->msg;
/* Print some useful information about the message */
printf( "\n\nServer Got Message:\n" );
printf( " type: %d\n" , type );
printf( " data: %s\n\n", msg->msg_data );
/* Build the reply message */
num = type - _IO_MAX;
snprintf( msg_reply, 254, "Server got message code: _IO_MAX + %d", num );
/* Send a reply to the waiting (blocked) client */
MsgReply( ctp->rcvid, EOK, msg_reply, strlen( msg_reply ) + 1 );
return 0;
}
int main( int argc, char **argv )
{
dispatch_t *dpp;
dispatch_context_t *ctp, *ctp_ret;
int resmgr_id, message_id;
/* Create the dispatch interface */
dpp = dispatch_create();
if( dpp == NULL )
{
fprintf( stderr, "dispatch_create() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Set up the default I/O functions to handle open/read/write/... */
iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,
_RESMGR_IO_NFUNCS, &IoFuncs );
/* Set up the attribute for the entry in the filesystem */
iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );
resmgr_id = resmgr_attach( dpp, NULL, "serv", _FTYPE_ANY,
0, &ConnectFuncs, &IoFuncs, &IoFuncAttr );
if( resmgr_id == -1 )
{
fprintf( stderr, "resmgr_attach() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Attach a callback (handler) for two message types */
message_id = message_attach( dpp, NULL, _IO_MAX + 1,
_IO_MAX + 2, message_callback, NULL );
if( message_id == -1 )
{
fprintf( stderr, "message_attach() failed: %s\n", strerror( errno ) );
return EXIT_FAILURE;
}
/* Set up a context for the dispatch layer to use */
ctp = dispatch_context_alloc( dpp );
if( ctp == NULL )
{
fprintf( stderr, "dispatch_context_alloc() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
/* Get and process messages */
while( 1 )
{
ctp_ret = dispatch_block( ctp );
if( ctp_ret )
{
dispatch_handler( ctp );
}
else
{
fprintf( stderr, "dispatch_block() failed: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
}
return EXIT_SUCCESS;
}
When the server calls resmgr_attach(), it registers the filesystem entry serv. Since the server doesn't specify an absolute path, the entry appears in the directory where the server was run. This gives us a filesystem entry that can be opened and closed, but generally behaves the same as /dev/null.
We call
message_attach()
to tell the dispatch layer that we'll be handling our own messages in addition to
the standard I/O and connection messages handled by the resmgr layer.
All incoming messages must have an unsigned 16-bit integer at the start indicating the message type.
Note that the range 0x0
to _IO_MAX is reserved for the OS.
We set up our message_callback() routine
to handle messages of type _IO_MAX + 1 and _IO_MAX + 2.
You can specify a pointer to arbitrary data to pass to
the callback, but we don't need that, so we set it to NULL.
When a message of the appropriate type is received, our message_callback() routine is invoked. We get the message type passed in via the type parameter. The actual message data can be found in ctp->msg. When the message comes in, the server prints the message type and the string that was sent from the client. It then prints the offset from _IO_MAX of the message type, and then formats a reply string and sends the reply back to the client via ctp->rcvid using MsgReply().
/*
* Message Client Process
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <sys/neutrino.h>
#include <sys/iofunc.h>
#include <sys/dispatch.h>
typedef struct
{
uint16_t msg_no;
char msg_data[255];
} client_msg_t;
int main( int argc, char **argv )
{
int fd;
int c;
client_msg_t msg;
long ret;
int num;
char msg_reply[255];
num = 1;
/* Process any command-line arguments */
while( ( c = getopt( argc, argv, "n:" ) ) != -1 )
{
if( c == 'n' )
{
num = strtol( optarg, 0, 0 );
}
}
/* Open a connection to the server (fd == coid) */
fd = open( "serv", O_RDWR );
if( fd == -1 )
{
fprintf( stderr, "Unable to open server connection: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
/* Clear the memory for the msg and the reply */
memset( &msg, 0, sizeof( msg ) );
memset( &msg_reply, 0, sizeof( msg_reply ) );
/* Set up the message data to send to the server */
msg.msg_no = _IO_MAX + num;
snprintf( msg.msg_data, 254, "client %d requesting reply.", getpid() );
printf( "client: msg_no: _IO_MAX + %d\n", num );
fflush( stdout );
/* Send the data to the server and get a reply */
ret = MsgSend( fd, &msg, sizeof( msg ), msg_reply, 255 );
if( ret == -1 )
{
fprintf( stderr, "Unable to MsgSend() to server: %s\n",
strerror( errno ) );
return EXIT_FAILURE;
}
/* Print out the reply data */
printf( "client: server replied: %s\n", msg_reply );
close( fd );
return EXIT_SUCCESS;
}
The client uses the open() function to get a coid (the server's default resmgr setup takes care of all of this on the server side), and performs a MsgSend() to the server based on this coid, and then waits for the reply. When the reply comes back, the client prints the reply data.
You can give the client the command-line option -n offset to specify the offset from _IO_MAX to use for the message type. If you give anything other than 1 or 2 as the offset, the MsgSend() fails, since the server hasn't set up handlers for those messages.