Subscribing to interim data

Updated: April 19, 2023

Subscribers are applications that use interim data that publishers generate.

Subscribing to receiving interim data requires that the publisher and subscriber be somewhat in synchronization with each other. For example, if a subscriber isn't releasing buffers in a timely manner, the publisher can be blocked waiting for a buffer to be available for publishing.

In general when you're subscribing to interim data, do the following:

  1. Open a connection to the interim data unit of interest. See Opening and closing a connection to an interim data unit.
  2. Access and process, or record the interim data that you receive. See Accessing sensor data and status and Recording data from a sensor.
  3. Close the connection to the interim data unit. See Opening and closing a connection to an interim data unit.

Subscribing to receive interim data is very similar to how you would access sensor data from a logical sensor unit (i.e., from a sensor device that's connected to your system). The main difference is that you are calling sensor_interim_data_open() instead of sensor_open() when you request a handle to the unit. Note also that the unit you're requesting a handle to is a unit (i.e., sensor_interim_data_unit_t) that you configure in the interim data configuration file, not the sensor configuration file. The interim data configuration file lists the interim data units that are used when publishers have interim data to publish; the sensor configuration file lists the sensor units that correspond to a sensor device that's connected to your system (e.g., a USB camera).

Once you have a handle to the interim data unit of interest from calling, you can use this handle to register a callback function or a sigevent against a sensor event that lets you know there's interim data available for processing. The same Sensor API functions are used to register callback functions and events regardless of whether the sensor events are from a sensor or an interim data unit. For information on how to register for sensor events, see Accessing sensor data and status.

A subscriber example

Let's look at at example where you register a callback function for the Sensor library to invoke when interim data is available for the interim data unit of your interest. Firstly, you need to define a callback function that the Sensor library invokes when the your sensor event of interest occurs. This function's signature is defined by the function prototype sensor_data_callback_t. For example:

...
#define DEFAULT_CONSUME_LATENCY_US  50
static uint32_t consumeLatency = DEFAULT_CONSUME_LATENCY_US;
#define DEFAULT_BUFFERS_CNT         100
static int bufCnt = DEFAULT_BUFFERS_CNT;
static int receivedBuffers = 0;
...
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  condvar = PTHREAD_COND_INITIALIZER;
...
/* Example user data struct for user extension */
typedef struct {
    uint16_t        top;
    uint16_t        left;
    uint16_t        width;
    uint16_t        height;
} userDataBox_t;
...
static bool publishedDataCallback(sensor_handle_t handle,
                                 sensor_buffer_t* buf,
                                 void* arg)
{
    userDataBox_t           *box;

    pthread_mutex_lock(&mutex);
    receivedBuffers++;
    printf("Received buffer %d: time %" PRIx64 " format %d size %" PRIu64 " ptr %p\n",
            receivedBuffers, buf->timestamp, buf->format, buf->size, buf->data);
    if (buf->format == SENSOR_FORMAT_USER_DATA) {
        box = (userDataBox_t*) buf->info.user_data.reserved;
        printf("user data: data_size %d data_id %d box %x %x %x %x data %d %d %d %d\n",
               buf->info.user_data.data_size, buf->info.user_data.data_id,
               box->top, box->left, box->width, box->height,
               buf->data[0], buf->data[1],
               buf->data[2], buf->data[3]);
    }

    usleep(consumeLatency);
    if (receivedBuffers == bufCnt) {
        // send signal to stop streaming
        pthread_cond_signal (&condvar);
    }
    pthread_mutex_unlock (&mutex);
    return true;
}
...
      

In this application example, you would call sensor_interim_data_open() to request a handle to the interim data unit that you want to receive interim data from. For example:

...
int err;
sensor_interim_data_unit_t unit = INTERIM_DATA_UNIT_1;
sensor_handle_t handle = NULL;
...
err = sensor_interim_data_open(unit, &handle);
...
      

With the handle to the interim data unit that's publishing the interim data, you'll register your callback function for a sensor event. In the subscriber's case, usually the sensor event of interest is the SENSOR_EVENT_STREAM_DATA event. This event occurs when there is data available from the associated sensor unit. Call the sensor_register_data_callback() function to register your callback. For example:

sensor_error_t err;
sensor_handle_t handle = NULL;
...
err = sensor_register_data_callback(handle,
                                    SENSOR_EVENT_STREAM_DATA,
                                    SENSOR_EVENTMODE_READONLY,
                                    publishedDataCallback,
                                    NULL);
...
       

Now, when interim data is available from INTERIM_DATA_UNIT_1, the Sensor library invokes the callback function publishedDataCallback() to access the interim data. Ensure that the publisher and subscriber are in agreement on the format of the interim data.

When the subscriber is finished receiving interim data from the interim data unit, it disables the callback function and closes the connection to the sensor data unit. For example:

sensor_error_t err;
sensor_handle_t handle = NULL;
...
err = sensor_disable_data_callback(handle, SENSOR_EVENT_STREAM_DATA, sensor_data_callback);
...
err = sensor_interim_data_close(handle);
...