Acquiring data

Now that the shared memory ring buffer is set up, we can enter a polling loop and acquire data from the various devices.

This involves the following steps:

Notice that we sign the signature if we haven't done so yet. This ensures that there is at least one valid sample before we declare the shared memory area okay to use.

Here are the pieces of the DAQ thread that do the above steps (error messages in code not shown):

daq_thread (void *not_used)
{
  ...
  // calculate the *maximum* transfer size
  ai = di = 0;
  for (i = 0; i < nadios; i++) {
    if (adios [i].ndi > di) {
      di = adios [i].ndi;
    }
    if (adios [i].nai > ai) {
      ai = adios [i].nai;
    }
  }

  // allocate a buffer which we never free
  xfersize = ai * 2 + di / 8;

  c = malloc (sizeof (*c) + xfersize);
  if (c == NULL) {
    // trash the process; no real use in continuing
    // without this thread
    exit (EXIT_FAILURE);
  }
...

We have the adios array in memory, and that tells us the number of analog and digital input points. We use this in our calculation to come up with a transfer size (xfersize) that represents the maximum transfer size. The transfer size calculated may be much bigger than actually required, because we've summed all of the data I/O points, rather than trying to figure out the biggest transfer size required per card. In smaller systems, this won't be a problem because we're talking about only a few hundred bytes. In a large system, you may wish to revisit this piece of code and calculate a more appropriate transfer size. The xfersize variable gets overwritten later, so it's safe to modify it in the above code.

Within the loop, we perform some head and tail manipulations in a local copy. That's because we don't want to move the head and tail pointers in shared memory until the data set is in place. In case the shared memory region is full (as it will be once it initially fills up — it never actually "drains"), we do need to adjust the tail pointer in the shared memory region as soon as possible, so we do so immediately.

...

  // now loop forever, acquiring samples
  while (1) {
    // do one sample

    // adjust the head in a local working copy
    // while we futz with the data
    head = daq -> head + 1;
    if (head >= optS) {
      head = 0;
    }

    // adjust the tail in an atomic manner
    // so that it's always valid
    if (daq -> tail == head) {
      tail = daq -> tail + 1;
      if (tail >= optS) {
        tail = 0;
      }
      daq -> tail = tail;
    }
...

Notice how the daq -> tail member is adjusted after we calculate the correct version of tail in a local copy. This is done in order to present an atomic update of the tail index. Otherwise, we'd have a potentially out-of-range value of tail in shared memory after we incremented it, and before we looped it back around to zero.

Note: There's another window of failure here. Theoretically, the client of the shared memory interface and ADIOS should maintain a mutex (or semaphore) to control access to the shared memory. That's because it's possible that, if the client requires the full number of sample sets (i.e. the 1000 samples or whatever it's been changed to via -S on the command line), ADIOS could be in the middle of writing out the new data sample set over top of the oldest data set.

I thought about this, and decided not to incur the additional complexity of a synchronization object, and instead informed the end-users of ADIOS that they should make their number of samples bigger than the number of samples they actually require from the shared memory region. While this may appear to be somewhat tacky, in reality it's not that bad owing to the speed at which things happen. In normal operation, the customer needs something like a few hundred samples at most, and these samples are updated at a rate of ten per second. So by extending the number of samples to be much bigger than a few hundred, it would take a significant amount of time (tens of seconds) before the oldest data set reached this about-to-be-overwritten state.

...

    // get the data
    ptr = (void *) (database + head * daq -> element_size);
    dhdr = ptr;
    ClockTime (CLOCK_REALTIME, NULL, &dhdr -> t0ns);
    ptr = dhdr + 1;

...

    /*
     * Here we get the data; I've moved this code into the next
     * para so we can see just the clock manipulation here.
    */

...

    ClockTime (CLOCK_REALTIME, NULL, &dhdr -> t1ns);

    // finally, set the daq -> head to our "working" head now 
    // that the data is stable
    daq -> head = head;
...

The code above illustrates the outer time snapshot and update of the head index. Between the two time snapshots, we acquire the data (see the code below). The point of doing the two ClockTime() snapshots was for performance measuring, statistics, and sanity.

The ClockTime() function gives us the number of nanoseconds since the beginning of time (well, QNX Neutrino's concept of the "beginning of time" anyway). The difference in the value of the members t0ns and t1ns is the amount of time it took to acquire the samples, and t0ns can also be used to determine when the sample acquisition started. This data is stored with each sample set. The performance measurement aspect of this should be obvious — we just determine how long it takes to acquire the samples. The statistics and freshness aspects of this are based on the customer's requirement. They need to know exactly when each sample was taken, so that they can plug these numbers into their proprietary process control formula.

...

    // code removed from section above:
    for (i = 0; i < nadios; i++) {
      c -> i.nais = adios [i].nai;
      c -> i.ndis = adios [i].ndi;
      xfersize = c -> i.nais * 2 + c -> i.ndis / 8;
      sts = devctl (adios [i].fd, DCMD_GET_ADIS,
                    c, xfersize, NULL);
      if (sts != EOK) {
        // code prints an error here...
        exit (EXIT_FAILURE);
      }
      // just memcpy the data from there to shmem
      memcpy (ptr, c -> o.buf, xfersize);
      ptr = (void *) ((char *) ptr + xfersize);
    }
...

Above is the code that went between the two time snapshots. As you can see, we run through our adios array database, calculate an appropriate transfer size for each particular transfer (but still using the bigger transfer buffer we allocated above). The transfer is accomplished by a devctl() to each driver with the DCMD_GET_ADIS command. This command returns a packed analog input and digital input array, which we simply memcpy() into shared memory at the correct place. (We do pointer math with ptr to get it to walk along the data set.)

...

    /*
     *  See if we need to sign the data area.  We do this only 
     *  after at least one sample has been put into shmem.
     */

    if (!memory_signed) {
      memcpy (sig -> signature, ADIOS_SIGNATURE,
              sizeof (sig -> signature));
      memory_signed = 1;
    }

    // wait for the next sample time to occur
    delay (optp);
  }
}

Finally, we sign the shared memory region if we haven't already (to ensure that there's at least one sample before the signature is valid), and we delay until the next sample time.

You may have noticed that we aren't going to be acquiring samples at exactly 10 Hz (or whatever rate the user specifies on the command line). That's because the amount of time spent in accumulating the samples adds to the total delay time. This was not a concern for the customer, and code with a fixed delay (and with slippage) is much easier to implement than code that runs at a fixed period.

If we did want the code to run at a fixed period, then there are a couple of ways to do that:

Both approaches are almost identical as far as timing is concerned. The semaphore approach may also suffer from "lag" if the thread that's hitting the semaphore gets preempted. Both approaches may suffer from the inability to keep up on a large system, or if a higher sample rate were used. Because enabling the next sample (i.e. the sem_wait() or the MsgReceive()) is asynchronously updated with respect to the data acquisition, it's possible that if the data acquisition takes longer than the period, things will start to "back up."

This too can be solved, by draining all events (semaphore or pulses) before continuing to the next sample set. If you count the number of events you've drained, you can get a good idea of how far behind you are lagging, and that can be output to an operator or a log for diagnostics.