wiki:Tutorials/Advanced/Data Ports

Beginner/Data Port Basics explains usage and the most common features of data ports.

This article presents additional relevant features of data ports.

Input Port Queues

Usually, input ports only provide access to the latest value they received. As connected modules may be executed by different threads and with different cycle times, some incoming values may never be processed. In some situations, however, it is advantageous to process all incoming values - and not to miss any. This is, for instance, relevant if incoming values are to be integrated - or to avoid aliasing effects. There are two possibilities to achieve this:

  • Input ports can have queues for incoming buffers.
  • A port listener (see below) can be registered. It will be notified on every incoming value.

There are two types of FIFO queues. They differ in how values are dequeued:

  • Dequeue one value after the other - using the Dequeue() methods in tInputPort
  • Dequeue all elements at once - using the DequeueAll() and DequeueBuffers() methods in tInputPort

The latter is more efficient if the queue typically contains many elements.

Note that the queue API is not yet mature and will likely be optimized in future Finroc versions.

Port Creation

To create an input port with a queue, a tQueueSettings object needs to be passed to its construtor. For example, adding this to a module's initializer list would add a queue to the port input_port:

  input_port(tQueueSettings(false, 100))

The tQueueSettings constructor parameters are documented in the code:

  /*!
   * \param dequeue_all_queue How can elements be dequeued from the port?
   *                          (one element after the other or all elements at once?)
   *                          If this parameter is false, only Dequeue() may be called.
   *                          If this parameter is true, only DequeueAll() and DequeueAllBuffers() may be called.
   *                          (Dequeueing all elements at once is more efficient)
   * \param maximum_queue_length Maximum number of elements in queue.
   *                             A value of -1 indicates that the queue has (virtually) no size limit.
   *                             This is somewhat dangerous: If elements in a queue of unlimited size are
   *                             not fetched, this causes continuous memory allocation for new buffers.
   */

Dequeueing Buffers

Values from queues can be dequeued using the following patterns (we have a std::string port in all code snippets):

Dequeue one value after the other:

tPortDataPointer<const std::string> dequeued_buffer;
while (dequeued_buffer = input_port.Dequeue())
{
  FINROC_LOG_PRINT(USER, "Dequeued ", *dequeued_buffer, " (timestamp: ",
      rrlib::time::ToIsoString(dequeued_buffer.GetTimestamp()), ")");
}

Dequeue all values at once:

tPortBuffers<tPortDataPointer<const std::string>> dequeued_buffers = input_port.DequeueAllBuffers();
while (!dequeued_buffers.Empty())
{
  tPortDataPointer<const std::string> dequeued_buffer = dequeued_buffers.PopFront();
  FINROC_LOG_PRINT(USER, "Dequeued ", *dequeued_buffer, " (timestamp: ",
      rrlib::time::ToIsoString(dequeued_buffer.GetTimestamp()), ")");
}

Dequeueing Buffers - Optional simplified API for cheaply copied types

As with other port operations, there is a simplified API for cheaply copied types (see Beginner/Data Port Basics). We have a double port in the following code snippets.

Dequeue one value after the other:

double dequeued_value;
rrlib::time::tTimestamp optional_timestamp;
while (input_port.Dequeue(dequeued_value, optional_timestamp))
{
  FINROC_LOG_PRINT(USER, "Dequeued ", dequeued_value, " (timestamp: ", 
      rrlib::time::ToIsoString(optional_timestamp), ")");
}

Dequeue all values at once:

tPortBuffers<double> dequeued_buffers = input_port.DequeueAll();
while (!dequeued_buffers.Empty())
{
  FINROC_LOG_PRINT(USER, "Dequeued ", dequeued_buffers.PopFront());
  // Timestamp currently cannot be obtained - use standard API if required
}

Port Listeners

It is possible to add port listeners to input ports. These listeners have a public callback method OnPortChange that will be called whenever a new buffer is received by this port. This can have several advantages:

  • The component reacts immediately to incoming values
  • All incoming values are processed

There are a few things to note:

  • Once added, port listeners cannot be removed. This simplifies the (optimized) implementation a lot - and we did not see relevant use cases for this.
  • OnPortChange is currently always executed by the thread that publishes the incoming buffer - during this publishing operation. Therefore, it should be computationally inexpensive and return quickly.
  • Due to the non-blocking implementation, OnPortChange may be executed by multiple threads at the same time. Access to any variables needs to be safe in this respect.

The port listener can be any class (possibly the module itself). It simply needs a public method OnPortChange - e.g.:

void OnPortChange(const T& value, data_ports::tChangeContext& change_context)
{
  FINROC_LOG_PRINT(USER, "OnPortChange: ", value);
}

change_context contains information on the port this event comes from (the listener may listen to multiple ports), the timestamp, and the change type.

An object of this class can be registered as port listener - e.g.:

input_port.AddPortListener(listener_object);

This variant of port listener is the most common. There are, however, two further variants.

A port listener might not be interested in the new value - only in that a change occured. In this case the OnPortChange may look like this.

void OnPortChange(data_ports::tChangeContext& change_context)
{
  FINROC_LOG_PRINT(USER, "OnPortChange (Simple)");
}
input_port.AddPortListenerSimple(listener_object);

Maybe the buffer received is to be published in another port - or to be stored in a local variable (use std::move for this!) or queue. In this case a tPortDataPointer<T> can be received:

void OnPortChange(data_ports::tPortDataPointer<const T>& value, data_ports::tChangeContext& change_context)
{
  FINROC_LOG_PRINT(USER, "OnPortChange (tPortDataPointer): ", *value);
}
input_port.AddPortListenerForPointer(listener_object);

A running example for port listeners can be found in data_ports/test/test_collection.cpp.

Dynamic Port Construction

Creating ports by adding them as a public member variable to a component is not always sufficiently flexible. Basically, ports can be created anywhere in the code.

There are two situations to distinguish:

  • Creating ports for component interfaces - using the convience port classes defined in the component base class (e.g. tInput and tOutput in the case of tModule). This is the typical use case for component and application developers.
  • Using the standard port classes from finroc_plugins_data_ports. These are relevant for developers of plugins - or to use Finroc ports in some non-Finroc application or framework. If data types are not known at compile time - or come in a variadic template list, the classes are also relevant for component and application developers.

Using the convenience port classes for dynamic component interfaces

The number of input or output ports of a component may depend on some parameter. For example, a component interface may look like this:

//----------------------------------------------------------------------
// Ports (These are the only variables that may be declared public)
//----------------------------------------------------------------------
public:

  tStaticParameter<unsigned int> number_of_output_signals;

  std::vector<tOutput<double>> output_signals;

Name and parent of these ports cannot be determined automatically. Thus, they need to be provided to the constructor.

In the example above, the ports would typically be created in the OnStaticParameterChange method of the component:

while (this->output_signals.size() < this->number_of_output_signals.Get())
{
  // using emplace_back would be shorter - but possibly less clear for illustration
  this->output_signals.push_back(
      tOutput<double>("Output Signal " + std::to_string(output_signals.size()), this));
}

Ports are not deleted automatically when the convenient port (wrapper) classes are. Instead, ManagedDelete needs to be called in order to actually delete a port.

As number_of_output_signals may also be reduced in the example above, OnStaticParameterChange typically also contains a section to delete obsolete ports:

while (this->output_signals.size() > this->number_of_output_signals.Get())
{
  this->output_signals.rbegin()->ManagedDelete();
  this->output_signals.pop_back();
}

Creating ports anywhere

In order to create ports anywhere in the program, the port classes from finroc_plugins_data_ports (tOutputPort, tInputPort, tProxyPort) may be instantiated directly.

They more or less work in the same way as the port classes defined in the component base classes.

This would create an output port in some function:

data_ports::tOutputPort<double> output_port("Output port", parent_element_pointer);

Creating ports of type only known at runtime

There are two ways to achieve this:

  1. using CreatePort from tPortGroup
  2. using tGenericPort

Both are not particularly convenient - as the flags (input, output or proxy port?) need to be set manually:

  • Output ports: EMITS_DATA | OUTPUT_PORT
  • Input ports: ACCEPTS_DATA | PUSH_STRATEGY
  • Proxy output ports: EMITS_DATA | ACCEPTS_DATA | OUTPUT_PORT
  • Proxy input ports: EMITS_DATA | ACCEPTS_DATA

Let type be an instance of rrlib::rtti::tType - which should be the data type of the port. To create a sensor output port of this type at runtime:

(1)

GetSensorOutputs().CreatePort("Output Port Name", type,
    tFlag::EMITS_DATA | tFlag::OUTPUT_PORT);

(2)

data_ports::tGenericPort port("Output Port Name", type, &GetSensorOutputs(),
    tFlag::EMITS_DATA | tFlag::OUTPUT_PORT);

Creating ports from variadic template type lists

To create ports for each type provided in a variadic template type list, the class tPortPack can be used.

Let TSignalTypes be a variadic template type list. The following code would, for instance, create sensor output ports for all the types.

Component header:

  typedef rrlib::util::tTypeList<TSignalTypes...> tSignalTypes;
  ...
//----------------------------------------------------------------------
// Ports (These are the only variables that may be declared public)
//----------------------------------------------------------------------
public:

  data_ports::tPortPack<tSensorOutput, tSignalTypes> output_signals;

Component constructor initializer list:

  output_signals(this, "Output Signal ")

Replicating a set of interface ports in another interface

Sometimes, it is handy to replicate a set of ports of one interface in another (group) interface - for example, when all ports from a module in a group should be available from outside the group.

This can be conveniently achieved with the ConnectByName methods of tPortGroup.

This statement would create equivalent proxy ports for all the module's sensor output ports in the groups' Sensor Output interface - and connect them:

  // in group constructor
  inner_module->GetSensorOutputs().ConnectByName(this->GetSensorOutputs(), true);

The ConnectByName methods provide various parameters - e.g. for prefixing or only selecting a set of ports.

Note on Thread-safety

All methods on ports are thread-safe and may be called by multiple threads concurrently.

Last modified 4 years ago Last modified on 17.12.2014 14:07:15