Changeset 135:fd179b075769 in finroc_plugins_data_ports


Ignore:
Timestamp:
30.09.2018 15:31:15 (10 months ago)
Author:
Max Reichardt <mreichardt@…>
Branch:
17.03
Parents:
132:29734e7e64af (diff), 134:fae78f2f7787 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Phase:
public
Message:

Merge with 14.08

Files:
16 edited

Legend:

Unmodified
Added
Removed
  • api/tGenericPortImplementation.cpp

    r123 r135  
    190190 
    191191 
     192void tGenericPortImplementation::DequeueAllBuffers(core::tAbstractPort& port, std::vector<tPortDataPointer<const rrlib::rtti::tGenericObject>>& result_buffer) 
     193{ 
     194  result_buffer.clear(); 
     195  if (IsCheaplyCopiedType(port.GetDataType())) 
     196  { 
     197    auto buffers = static_cast<tCheapCopyPort&>(port).DequeueAllRaw(); 
     198    while (!buffers.Empty()) 
     199    { 
     200      auto buffer = buffers.PopFront(); 
     201      tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false> pointer(buffer->locked_buffer.release(), false); 
     202      result_buffer.emplace_back(std::move(pointer)); 
     203    } 
     204  } 
     205  else 
     206  { 
     207    auto buffers = static_cast<standard::tStandardPort&>(port).DequeueAllRaw(); 
     208    while (!buffers.Empty()) 
     209    { 
     210      auto buffer = buffers.PopFront(); 
     211      tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false> pointer(buffer->locked_buffer.release(), false); 
     212      result_buffer.emplace_back(std::move(pointer)); 
     213    } 
     214  } 
     215} 
     216 
    192217void tGenericPortImplementation::SetPullRequestHandler(core::tAbstractPort& port, tPullRequestHandler<rrlib::rtti::tGenericObject>* pull_request_handler) 
    193218{ 
  • api/tGenericPortImplementation.cpp

    r134 r135  
    7070// Implementation 
    7171//---------------------------------------------------------------------- 
    72 namespace internal 
    73 { 
    74  
    75 template <typename T> 
    76 struct tBoundsSetter 
    77 { 
    78   static void SetBounds(core::tAbstractPort& port, const rrlib::rtti::tGenericObject& min, const rrlib::rtti::tGenericObject& max) 
    79   { 
    80     typedef api::tBoundedPort<T, api::tPortImplementationTypeTrait<T>::type> tBoundedPort; 
    81     if (typeid(port) != typeid(tBoundedPort)) 
    82     { 
    83       FINROC_LOG_PRINT(ERROR, "Cannot set bounds for port ", rrlib::rtti::tDataType<T>().GetName(), ". It is not a bounded port."); 
    84       return; 
    85     } 
    86     static_cast<tBoundedPort&>(port).SetBounds(tBounds<T>(min.GetData<T>(), max.GetData<T>())); 
    87   } 
    88 }; 
    89  
    90 struct tNoBoundsSetter 
    91 { 
    92   static void SetBounds(core::tAbstractPort& port, const rrlib::rtti::tGenericObject& min, const rrlib::rtti::tGenericObject& max) 
    93   { 
    94     FINROC_LOG_PRINT(ERROR, "Cannot set bounds for type ", port.GetDataType().GetName()); 
    95   } 
    96 }; 
    97  
    98  
    99 template <typename T> 
    100 class tGenericPortImplementationTyped : public tGenericPortImplementation 
     72namespace 
     73{ 
     74 
     75class tGenericPortImplementationCheapCopy : public tGenericPortImplementation 
    10176{ 
    10277public: 
    10378 
    104   /*! Should methods dealing with bounds be available? */ 
    105   enum { cBOUNDABLE = IsBoundable<T>::value }; 
    106  
    107   /*! Class that contains actual implementation of most functionality */ 
    108   typedef api::tPortImplementation<T, api::tPortImplementationTypeTrait<T>::type> tImplementation; 
    109  
    110   typedef typename tImplementation::tPortBase tPortBase; 
     79  typedef tCheapCopyPort tPortBase; 
    11180 
    11281  virtual core::tAbstractPort* CreatePort(const common::tAbstractDataPortCreationInfo& creation_info) override 
    11382  { 
    114     tPort<T> port(creation_info); 
    115     return port.GetWrapped(); 
     83    return new tPortBase(creation_info); 
    11684  } 
    11785 
    11886  virtual void Get(core::tAbstractPort& port, rrlib::rtti::tGenericObject& result, rrlib::time::tTimestamp& timestamp) override 
    11987  { 
    120     tImplementation::CopyCurrentPortValue(static_cast<tPortBase&>(port), result.GetData<T>(), timestamp); 
     88    static_cast<tPortBase&>(port).CopyCurrentValueToGenericObject(result, timestamp); 
    12189  } 
    12290 
     
    143111  virtual void Publish(core::tAbstractPort& port, const rrlib::rtti::tGenericObject& data, const rrlib::time::tTimestamp& timestamp) override 
    144112  { 
    145     tImplementation::CopyAndPublish(static_cast<tPortBase&>(port), data.GetData<T>(), timestamp); 
    146   } 
    147  
    148   virtual void SetBounds(core::tAbstractPort& port, const rrlib::rtti::tGenericObject& min, const rrlib::rtti::tGenericObject& max) override 
    149   { 
    150     std::conditional<cBOUNDABLE, tBoundsSetter<T>, tNoBoundsSetter>::type::SetBounds(port, min, max); 
    151   } 
    152 }; 
    153  
    154  
    155 class tGenericPortImplementationCheapCopy : public tGenericPortImplementation 
    156 { 
    157 public: 
    158  
    159   typedef tCheapCopyPort tPortBase; 
    160  
    161   virtual core::tAbstractPort* CreatePort(const common::tAbstractDataPortCreationInfo& creation_info) override 
    162   { 
    163     return new tPortBase(creation_info); 
    164   } 
    165  
    166   virtual void Get(core::tAbstractPort& port, rrlib::rtti::tGenericObject& result, rrlib::time::tTimestamp& timestamp) override 
    167   { 
    168     static_cast<tPortBase&>(port).CopyCurrentValueToGenericObject(result, timestamp); 
    169   } 
    170  
    171   virtual tPortDataPointer<const rrlib::rtti::tGenericObject> GetPointer(core::tAbstractPort& abstract_port, tStrategy strategy) override 
    172   { 
    173     tPortBase& port = static_cast<tPortBase&>(abstract_port); 
    174     if ((strategy == tStrategy::DEFAULT && port.PushStrategy()) || strategy == tStrategy::NEVER_PULL || definitions::cSINGLE_THREADED) 
    175     { 
    176       tPortDataPointer<rrlib::rtti::tGenericObject> buffer = this->GetUnusedBuffer(port); 
    177       rrlib::time::tTimestamp timestamp; 
    178       port.CopyCurrentValueToGenericObject(*buffer.Get(), timestamp, strategy); 
    179       buffer.SetTimestamp(timestamp); 
    180       return std::move(buffer); 
    181     } 
    182 #ifndef RRLIB_SINGLE_THREADED 
    183     else 
    184     { 
    185       auto buffer_pointer = port.GetPullRaw(strategy == tStrategy::PULL_IGNORING_HANDLER_ON_THIS_PORT); 
    186       return tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false>(buffer_pointer.release(), false); 
    187     } 
    188 #endif 
    189   } 
    190  
    191   virtual void Publish(core::tAbstractPort& port, const rrlib::rtti::tGenericObject& data, const rrlib::time::tTimestamp& timestamp) override 
    192   { 
    193113#ifndef RRLIB_SINGLE_THREADED 
    194114    assert(data.GetType() == port.GetDataType()); 
     
    196116    if (thread_local_pools) 
    197117    { 
    198       typename optimized::tThreadLocalBufferPools::tBufferPointer buffer = thread_local_pools->GetUnusedBuffer(static_cast<tPortBase&>(port).GetCheaplyCopyableTypeIndex()); 
     118      typename optimized::tThreadLocalBufferPools::tBufferPointer buffer = thread_local_pools->GetUnusedBuffer(static_cast<tPortBase&>(port).GetCheaplyCopiedTypeBufferPoolIndex(), port.GetDataType()); 
    199119      buffer->SetTimestamp(timestamp); 
    200120      buffer->GetObject().DeepCopyFrom(data); 
    201121      common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataThreadLocalBuffer> publish_operation(buffer.release(), true); 
    202       publish_operation.Execute<false, tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port)); 
     122      publish_operation.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port)); 
    203123    } 
    204124    else 
    205125    { 
    206       typename optimized::tCheapCopyPort::tUnusedManagerPointer buffer(optimized::tGlobalBufferPools::Instance().GetUnusedBuffer(static_cast<tPortBase&>(port).GetCheaplyCopyableTypeIndex()).release()); 
     126      typename optimized::tCheapCopyPort::tUnusedManagerPointer buffer(optimized::tGlobalBufferPools::Instance().GetUnusedBuffer(static_cast<tPortBase&>(port).GetCheaplyCopiedTypeBufferPoolIndex(), port.GetDataType()).release()); 
    207127      buffer->SetTimestamp(timestamp); 
    208128      buffer->GetObject().DeepCopyFrom(data); 
    209129      common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataGlobalBuffer> publish_operation(buffer); 
    210       publish_operation.Execute<false, tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port)); 
     130      publish_operation.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port)); 
    211131    } 
    212132#else 
     
    260180}; 
    261181 
    262 template <typename T> 
    263 static void CheckCreateImplementationForType(rrlib::rtti::tType type) 
    264 { 
    265   if (type.GetRttiName() == typeid(T).name()) 
    266   { 
    267     type.AddAnnotation<tGenericPortImplementation>(new internal::tGenericPortImplementationTyped<T>()); 
    268   } 
    269 } 
    270  
    271 } // namespace internal 
    272  
    273 void tGenericPortImplementation::CreateImplementations() 
    274 { 
    275   static rrlib::thread::tMutex mutex; 
    276   static int16_t initialized_types = 0; 
    277   rrlib::thread::tLock lock(mutex); 
    278  
    279   for (; initialized_types < rrlib::rtti::tType::GetTypeCount(); initialized_types++) 
    280   { 
    281     rrlib::rtti::tType type = rrlib::rtti::tType::GetType(initialized_types); 
    282     if (IsDataFlowType(type)) 
    283     { 
    284       // typed implementations for certain types 
    285       internal::CheckCreateImplementationForType<int8_t>(type); 
    286       internal::CheckCreateImplementationForType<int16_t>(type); 
    287       internal::CheckCreateImplementationForType<int>(type); 
    288       internal::CheckCreateImplementationForType<long int>(type); 
    289       internal::CheckCreateImplementationForType<long long int>(type); 
    290       internal::CheckCreateImplementationForType<uint8_t>(type); 
    291       internal::CheckCreateImplementationForType<uint16_t>(type); 
    292       internal::CheckCreateImplementationForType<unsigned int>(type); 
    293       internal::CheckCreateImplementationForType<unsigned long int>(type); 
    294       internal::CheckCreateImplementationForType<unsigned long long int>(type); 
    295       internal::CheckCreateImplementationForType<double>(type); 
    296       internal::CheckCreateImplementationForType<float>(type); 
    297       internal::CheckCreateImplementationForType<char>(type);  // is neither int8_t nor uint8_t 
    298       internal::CheckCreateImplementationForType<numeric::tNumber>(type); 
    299  
    300       if (!type.GetAnnotation<tGenericPortImplementation>()) 
    301       { 
    302         assert((type.GetTypeTraits() & rrlib::rtti::trait_flags::cIS_INTEGRAL) == 0 || type.GetRttiName() == typeid(bool).name()); 
    303         if (IsCheaplyCopiedType(type)) 
    304         { 
    305           type.AddAnnotation<tGenericPortImplementation>(new internal::tGenericPortImplementationCheapCopy()); 
    306         } 
    307         else 
    308         { 
    309           type.AddAnnotation<tGenericPortImplementation>(new internal::tGenericPortImplementationStandard()); 
    310         } 
    311       } 
    312     } 
    313   } 
    314 } 
     182 
     183tGenericPortImplementationCheapCopy cINSTANCE_CHEAP_COPY; 
     184tGenericPortImplementationStandard cINSTANCE_STANDARD; 
     185 
     186} // anonymous namespace 
     187 
     188tGenericPortImplementation* tGenericPortImplementation::cIMPLEMENTATION_STANDARD = &cINSTANCE_STANDARD; 
     189tGenericPortImplementation* tGenericPortImplementation::cIMPLEMENTATION_CHEAP_COPY = &cINSTANCE_CHEAP_COPY; 
     190 
    315191 
    316192void tGenericPortImplementation::DequeueAllBuffers(core::tAbstractPort& port, std::vector<tPortDataPointer<const rrlib::rtti::tGenericObject>>& result_buffer) 
  • api/tGenericPortImplementation.h

    r123 r135  
    9191 
    9292  /*! 
     93   * Dequeue all elements currently in input queue 
     94   * 
     95   * \param result_buffer Vector to fill with dequeued buffers (any contents in vector when calling this method are discarded) 
     96   */ 
     97  void DequeueAllBuffers(core::tAbstractPort& port, std::vector<tPortDataPointer<const rrlib::rtti::tGenericObject>>& result_buffer); 
     98 
     99  /*! 
    93100   * Gets Port's current value 
    94101   * 
  • api/tGenericPortImplementation.h

    r134 r135  
    4040// External includes (system with <>, local with "") 
    4141//---------------------------------------------------------------------- 
    42 #include "rrlib/rtti/tTypeAnnotation.h" 
    4342 
    4443//---------------------------------------------------------------------- 
     
    7473 * Implementations for tGenericPort. 
    7574 */ 
    76 class tGenericPortImplementation : public rrlib::rtti::tTypeAnnotation 
     75class tGenericPortImplementation 
    7776{ 
    7877//---------------------------------------------------------------------- 
     
    101100   * Gets Port's current value 
    102101   * 
    103    * (Note that numbers and "cheap copy" types also have a method: T GetValue();  (defined in tPortParent<T>)) 
    104    * 
     102   * \param port Wrapped port to operate on 
    105103   * \param result Buffer to (deep) copy port's current value to 
    106    * (Using this get()-variant is more efficient when using CC types, but can be extremely costly with large data types) 
     104   * \param timestamp Object to store timestamp in 
     105   * 
     106   * (Using this get()-variant is more efficient when using cheaply-copied types, but can be costly with large data types) 
    107107   */ 
    108108  virtual void Get(core::tAbstractPort& port, rrlib::rtti::tGenericObject& result, rrlib::time::tTimestamp& timestamp) = 0; 
     
    111111   * Gets Port's current value buffer 
    112112   * 
     113   * \param port Wrapped port to operate on 
    113114   * \param strategy Strategy to use for get operation 
    114115   * \return Buffer with port's current value with read lock. 
     
    117118 
    118119  /*! 
     120   * \param port Wrapped port to operate on 
    119121   * \return Port's default value (NULL if none has been set) 
    120122   */ 
     
    130132  static tGenericPortImplementation* GetImplementation(const rrlib::rtti::tType& type) 
    131133  { 
    132     tGenericPortImplementation* annotation = type.GetAnnotation<tGenericPortImplementation>(); 
    133     if (!annotation) 
    134     { 
    135       CreateImplementations(); 
    136       annotation = type.GetAnnotation<tGenericPortImplementation>(); 
    137     } 
    138     return annotation; 
    139   } 
    140  
    141   /*! 
     134    return IsCheaplyCopiedType(type) ? cIMPLEMENTATION_CHEAP_COPY : cIMPLEMENTATION_STANDARD; 
     135  } 
     136 
     137  /*! 
     138   * \param port Wrapped port to operate on 
    142139   * \return Unused buffer. 
     140   * 
    143141   * Buffers to be published using this port should be acquired using this function. 
    144142   * The buffer might contain old data, so it should be cleared prior to using. 
    145    * 
    146    * Note: The returned buffer is always of the port's actual buffer type (e.g. no int, double etc. - but tNumber) 
    147143   */ 
    148144  inline tPortDataPointer<rrlib::rtti::tGenericObject> GetUnusedBuffer(core::tAbstractPort& port) 
     
    153149      if (optimized::tThreadLocalBufferPools::Get()) 
    154150      { 
    155         return tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false>(optimized::tThreadLocalBufferPools::Get()->GetUnusedBuffer(cc_port.GetCheaplyCopyableTypeIndex()).release(), true); 
     151        return tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false>(optimized::tThreadLocalBufferPools::Get()->GetUnusedBuffer(cc_port.GetCheaplyCopiedTypeBufferPoolIndex(), port.GetDataType()).release(), true); 
    156152      } 
    157153      else 
    158154      { 
    159         return tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false>(optimized::tGlobalBufferPools::Instance().GetUnusedBuffer(cc_port.GetCheaplyCopyableTypeIndex()).release(), true); 
     155        return tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false>(optimized::tGlobalBufferPools::Instance().GetUnusedBuffer(cc_port.GetCheaplyCopiedTypeBufferPoolIndex(), port.GetDataType()).release(), true); 
    160156      } 
    161157    } 
     
    170166   * Should only be called on output ports. 
    171167   * 
     168   * \param port Wrapped port to operate on 
    172169   * \param data Data to publish. It will be deep-copied. 
    173    * This publish()-variant is efficient when using CC types, but can be extremely costly with large data types) 
     170   * \param timestamp Timestamp to attach to data 
     171   * 
     172   * This publish()-variant is efficient when using cheaply-copied types, but can be costly with large data types) 
    174173   */ 
    175174  virtual void Publish(core::tAbstractPort& port, const rrlib::rtti::tGenericObject& data, const rrlib::time::tTimestamp& timestamp) = 0; 
     175 
     176  /*! 
     177   * Publish Data Buffer. This data will be forwarded to any connected ports. 
     178   * Should only be called on output ports. 
     179   * 
     180   * \param port Wrapped port to operate on 
     181   * \param data_buffer Data to publish. 
     182   * 
     183   * This publish()-variant is efficient with all data types. 
     184   */ 
     185  inline void Publish(core::tAbstractPort& port, tPortDataPointer<rrlib::rtti::tGenericObject>& data_buffer) 
     186  { 
     187    if (IsCheaplyCopiedType(port.GetDataType())) 
     188    { 
     189      tCheapCopyPort& cc_port = static_cast<tCheapCopyPort&>(port); 
     190#ifndef RRLIB_SINGLE_THREADED 
     191      if (optimized::tThreadLocalBufferPools::Get()) 
     192      { 
     193        common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataThreadLocalBuffer> publish_operation(static_cast<optimized::tThreadLocalBufferManager*>(data_buffer.implementation.Release()), true); 
     194        publish_operation.Execute<tChangeStatus::CHANGED, false, false>(cc_port); 
     195      } 
     196      else 
     197      { 
     198        optimized::tCheapCopyPort::tUnusedManagerPointer pointer(static_cast<optimized::tCheaplyCopiedBufferManager*>(data_buffer.implementation.Release())); 
     199        common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataGlobalBuffer> publish_operation(pointer); 
     200        publish_operation.Execute<tChangeStatus::CHANGED, false, false>(cc_port); 
     201      } 
     202#else 
     203      cc_port.Publish(*data_buffer, data_buffer.GetTimestamp()); 
     204#endif 
     205    } 
     206    else 
     207    { 
     208      standard::tStandardPort::tUnusedManagerPointer buffer(static_cast<standard::tPortBufferManager*>(data_buffer.implementation.Release())); 
     209      assert(buffer->IsUnused()); 
     210      static_cast<standard::tStandardPort&>(port).Publish(buffer); 
     211    } 
     212  } 
    176213 
    177214  /*! 
     
    179216   * (This is not thread-safe and must only be done in "pause mode") 
    180217   * 
    181    * \param b New Bounds 
     218   * \param port Wrapped port to operate on 
     219   * \param min Minimum value 
     220   * \param max Maximum value 
    182221   */ 
    183222  virtual void SetBounds(core::tAbstractPort& port, const rrlib::rtti::tGenericObject& min, const rrlib::rtti::tGenericObject& max) = 0; 
    184223 
    185224  /*! 
     225   * \param port Wrapped port to operate on 
    186226   * \param pull_request_handler Object that handles any incoming pull requests - null if there is none (typical case) 
    187227   */ 
     
    190230private: 
    191231 
    192   /*! Creates implementation for data types that weren't annotated yet */ 
    193   static void CreateImplementations(); 
     232  /*! The two available implementations */ 
     233  static tGenericPortImplementation* cIMPLEMENTATION_STANDARD, *cIMPLEMENTATION_CHEAP_COPY; 
    194234 
    195235}; 
  • common/tAbstractDataPort.h

    r121 r135  
    193193  { 
    194194    SetFlag(tFlag::DEFAULT_ON_DISCONNECT, value); 
     195  } 
     196 
     197  /*! 
     198   * Adjusts maximum queue length 
     199   * 
     200   * \param max_queue_length Maximum number of elements in queue (a value of -1 indicates that the queue has (virtually) no size limit). 
     201   */ 
     202  void SetMaxQueueLength(int max_queue_length) 
     203  { 
     204    SetMaxQueueLengthImplementation(max_queue_length); 
    195205  } 
    196206 
     
    344354 
    345355  /*! 
     356   * Adjusts maximum queue length 
     357   * 
     358   * \param max_queue_length Maximum number of elements in queue (a value of -1 indicates that the queue has (virtually) no size limit). 
     359   */ 
     360  virtual void SetMaxQueueLengthImplementation(int max_queue_length) = 0; 
     361 
     362  /*! 
    346363   * Push initial value to destination port 
    347364   * (checks etc. have been done by tAbstractDataPort class) 
  • common/tAbstractDataPort.h

    r133 r135  
    6363//---------------------------------------------------------------------- 
    6464 
     65template <typename TPort, typename TPublishingData> 
     66class tPublishOperation; 
     67 
     68template <typename TPort, typename TPublishingData, typename TManager> 
     69class tPullOperation; 
     70 
    6571//---------------------------------------------------------------------- 
    6672// Class declaration 
     
    8187 
    8288  /*! 
     89   * Adds type to group of numeric types that can always be converted implicitly when connecting ports of such types. 
     90   * This temporary mechanism is introduced to retain backward-compatibility with Finroc 14.08 - where all these types used tNumber type in port backend and could be connected. 
     91   * It is planned to remove this mechanism in future Finroc releases - and a respective warning is displayed for connections that need to be changed. 
     92   * 
     93   * \param type Data type to add 
     94   */ 
     95  static void AddNumericTypeToLegacyImplicitConversion(const rrlib::rtti::tType& type) 
     96  { 
     97    tAbstractPort::AddNumericTypeToLegacyImplicitConversion(type); 
     98  } 
     99 
     100  /*! 
     101   * Set current value to default value 
     102   */ 
     103  virtual void ApplyDefaultValue() = 0; 
     104 
     105  /*! 
    83106   * Forwards current data to specified port (publishes the data via this port) 
    84107   * 
     
    110133    return GetMaxQueueLengthImplementation(); 
    111134  } 
    112  
    113   /*! 
    114    * \return Minimum Network Update Interval (only-port specific one; -1 if there's no specific setting for port) 
    115    */ 
    116   inline rrlib::time::tDuration GetMinNetUpdateInterval() const 
    117   { 
    118     return std::chrono::milliseconds(min_net_update_time); 
    119   } 
    120  
    121   /*! 
    122    * \return Minimum Network Update Interval (only-port specific one; -1 if there's no specific setting for port) 
    123    */ 
    124   inline int16_t GetMinNetUpdateIntervalRaw() const 
    125   { 
    126     return min_net_update_time; 
    127   } 
    128  
    129   /*! 
    130    * (Helper function for network functions) 
    131    * Look for minimal port-specific minimal network update interval 
    132    * at all connected ports. 
    133    * 
    134    * \return result - -1 if no port has specific setting 
    135    */ 
    136   int16_t GetMinNetworkUpdateIntervalForSubscription() const; 
    137135 
    138136  /*! 
     
    182180 
    183181  /*! 
    184    * \return Is data to this port pushed or pulled (in reverse direction)? 
    185    */ 
    186   inline bool ReversePushStrategy() const 
    187   { 
    188     return GetFlag(tFlag::PUSH_STRATEGY_REVERSE); 
    189   } 
    190  
    191   /*! 
    192182   * \param new_value New value for custom changed flag (for use by custom API - not used/accessed by core port classes.) 
    193183   */ 
     
    198188 
    199189  /*! 
     190   * \param value Whether DEFAULT_ON_DISCONNECT flag should be set 
     191   */ 
     192  void SetDefaultOnDisconnect(bool value) 
     193  { 
     194    SetFlag(tFlag::DEFAULT_ON_DISCONNECT, value); 
     195  } 
     196 
     197  /*! 
    200198   * Adjusts maximum queue length 
    201199   * 
     
    208206 
    209207  /*! 
    210    * \param interval Minimum Network Update Interval 
    211    */ 
    212   void SetMinNetUpdateInterval(rrlib::time::tDuration& interval); 
    213   void SetMinNetUpdateIntervalRaw(int16_t interval); 
    214  
    215   /*! 
    216208   * \param listener New ports Listener 
    217209   * 
     
    224216 
    225217  /*! 
     218   * Set whether port is in hijacked mode. 
     219   * In this mode, application will no longer publish data to/via this port. 
     220   * Only 'browser' publishes will. E.g. data_playback plugin uses these. 
     221   * 
     222   * \param Whether port should be in hijacked state 
     223   */ 
     224  void SetHijacked(bool hijacked); 
     225 
     226  /*! 
    226227   * Set whether data should be pushed or pulled 
    227228   * 
     
    230231  void SetPushStrategy(bool push); 
    231232 
    232   /*! 
    233    * Set whether data should be pushed or pulled in reverse direction 
    234    * 
    235    * \param push Push data? 
    236    */ 
    237   void SetReversePushStrategy(bool push); 
    238  
    239233//---------------------------------------------------------------------- 
    240234// Protected methods 
     
    243237 
    244238  virtual ~tAbstractDataPort(); 
     239 
     240  virtual core::tConnector* CreateConnector(tAbstractPort& destination, const core::tConnectOptions& connect_options) override; 
    245241 
    246242  /*! 
     
    276272   * Does this port "want" to receive a value via push strategy? 
    277273   * 
    278    * \param cReverse direction? (typically we push forward) 
    279    * \param change_constant If this is about an initial push, this should be CHANGED_INITIAL - otherwise CHANGED 
     274   * \tparam cCHANGE_CONSTANT change_constant If this is about an initial push, this should be CHANGED_INITIAL - otherwise CHANGED 
    280275   * \return Answer 
    281276   * 
     
    283278   * (Standard implementation for this) 
    284279   */ 
    285   template <bool cREVERSE, tChangeStatus cCHANGE_CONSTANT> 
     280  template <tChangeStatus cCHANGE_CONSTANT> 
    286281  inline bool WantsPush() const 
    287282  { 
    288     // The compiler should optimize branches away 
    289     if (cREVERSE) 
    290     { 
    291       if (cCHANGE_CONSTANT == tChangeStatus::CHANGED_INITIAL) 
    292       { 
    293         return GetFlag(tFlag::PUSH_STRATEGY_REVERSE) && CountOutgoingConnections() <= 1; 
    294       } 
    295       else 
    296       { 
    297         return GetFlag(tFlag::PUSH_STRATEGY_REVERSE); 
    298       } 
    299     } 
    300     else if (cCHANGE_CONSTANT == tChangeStatus::CHANGED_INITIAL) 
     283    if (cCHANGE_CONSTANT == tChangeStatus::CHANGED_INITIAL) 
    301284    { 
    302285      // We don't want initial pushes to ports with multiple inputs 
     
    314297private: 
    315298 
     299  template <typename TPort, typename TPublishingData> 
     300  friend class tPublishOperation; 
     301 
     302  template <typename TPort, typename TPublishingData, typename TManager> 
     303  friend class tPullOperation; 
     304 
     305 
    316306  /*! Has port changed since last reset? (see constants above) */ 
    317307  std::atomic<int8_t> changed; 
     
    332322  int16_t strategy; 
    333323 
    334   /*! Minimum network update interval. Value < 0 means default for this type */ 
    335   int16_t min_net_update_time; 
    336  
    337324  /*! Listener(s) of port value changes */ 
    338325  common::tPortListenerRaw* port_listener; 
     
    347334  static core::tAbstractPortCreationInfo AdjustPortCreationInfo(const tAbstractDataPortCreationInfo& create_info); 
    348335 
    349   virtual void ConnectionAdded(tAbstractPort& partner, bool partner_is_destination) override; 
    350  
    351   virtual void ConnectionRemoved(tAbstractPort& partner, bool partner_is_destination) override; 
    352  
    353   /*! 
    354    * Should be called in situations where there might need to be an initial push 
    355    * (e.g. connecting or strategy change) 
    356    * 
    357    * \param target Potential Target port 
    358    */ 
    359   void ConsiderInitialReversePush(tAbstractDataPort& target); 
    360  
    361336  /*! 
    362337   * Forward current strategy to source ports (helper for above - and possibly variations of above) 
     
    386361 
    387362  /*! 
    388    * Push initial value to the specified port 
    389    * (checks etc. have been done by AbstractDataPort class) 
    390    * 
    391    * \param target Port to push data to 
    392    * \param reverse Is this a reverse push? 
    393    */ 
    394   virtual void InitialPushTo(tAbstractPort& target, bool reverse) = 0; 
     363   * Push initial value to destination port 
     364   * (checks etc. have been done by tAbstractDataPort class) 
     365   * 
     366   * \param connector Connector connecting both ports 
     367   */ 
     368  virtual void InitialPushTo(core::tConnector& connector) = 0; 
     369 
     370  virtual void OnConnect(tAbstractPort& partner, bool partner_is_destination) override; 
     371 
     372  virtual void OnDisconnect(tAbstractPort& partner, bool partner_is_destination) override; 
     373 
     374  virtual void OnNetworkConnectionLoss() override; 
    395375}; 
    396376 
  • optimized/tCheapCopyPort.cpp

    r131 r135  
    519519} 
    520520 
    521 //void tCheapCopyPort::SetMaxQueueLengthImpl(int length) 
    522 //{ 
    523 //  assert((GetFlag(tFlag::HAS_QUEUE) && queue != NULL)); 
    524 //  assert((!IsOutputPort())); 
    525 //  assert((length >= 1)); 
    526 //  queue->SetMaxLength(length); 
    527 //} 
     521void tCheapCopyPort::SetMaxQueueLengthImplementation(int length) 
     522{ 
     523  if (!input_queue) 
     524  { 
     525    FINROC_LOG_PRINT(WARNING, "Cannot set queue length for port without queue"); 
     526  } 
     527  else 
     528  { 
     529    input_queue->SetMaxQueueLength(length); 
     530  } 
     531} 
    528532 
    529533void tCheapCopyPort::SetPullRequestHandler(tPullRequestHandlerRaw* pull_request_handler_) 
  • optimized/tCheapCopyPort.cpp

    r133 r135  
    7878static rrlib::rtti::tGenericObject* CreateDefaultValue(const common::tAbstractDataPortCreationInfo& creation_info) 
    7979{ 
    80   if (creation_info.DefaultValueSet() || creation_info.flags.Get(core::tFrameworkElement::tFlag::DEFAULT_ON_DISCONNECT)) 
    81   { 
    82     rrlib::rtti::tGenericObject* result = creation_info.data_type.CreateInstanceGeneric(); 
     80  if (creation_info.DefaultValueSet() || creation_info.flags.Get(core::tFrameworkElement::tFlag::DEFAULT_ON_DISCONNECT) || creation_info.flags.Get(core::tFrameworkElement::tFlag::VOLATILE) || creation_info.flags.Get(core::tFrameworkElement::tFlag::NETWORK_ELEMENT)) 
     81  { 
     82    rrlib::rtti::tGenericObject* result = creation_info.data_type.CreateGenericObject(); 
    8383    if (creation_info.DefaultValueSet()) 
    8484    { 
     
    9595tCheapCopyPort::tCheapCopyPort(common::tAbstractDataPortCreationInfo creation_info) : 
    9696  common::tAbstractDataPort(creation_info), 
    97   cheaply_copyable_type_index(RegisterPort(creation_info.data_type)), 
     97  cheaply_copied_type_buffer_pool_index(RegisterPort(creation_info.data_type)), 
    9898  default_value(internal::CreateDefaultValue(creation_info)), 
    9999  current_value(0), 
    100100  standard_assign(!GetFlag(tFlag::NON_STANDARD_ASSIGN) && (!GetFlag(tFlag::HAS_QUEUE))), 
    101101  input_queue(), 
    102   pull_request_handler(NULL), 
    103   unit(creation_info.unit) 
     102  pull_request_handler(NULL) 
    104103{ 
    105104  if ((!IsDataFlowType(GetDataType())) || (!IsCheaplyCopiedType(GetDataType()))) 
     
    110109 
    111110  // Initialize value 
    112   tCheaplyCopiedBufferManager* initial = tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release(); 
     111  tCheaplyCopiedBufferManager* initial = tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release(); 
    113112  assert(initial->GetObject().GetType() == GetDataType()); 
    114113  initial->InitReferenceCounter(1); 
     
    123122  else 
    124123  { 
    125     std::unique_ptr<rrlib::rtti::tGenericObject> object_with_default_value(GetDataType().CreateInstanceGeneric()); 
     124    std::unique_ptr<rrlib::rtti::tGenericObject> object_with_default_value(GetDataType().CreateGenericObject()); 
    126125    initial->GetObject().DeepCopyFrom(*object_with_default_value); 
    127126  } 
     
    142141tCheapCopyPort::~tCheapCopyPort() 
    143142{ 
     143  UnregisterPort(GetDataType()); 
    144144  tTaggedBufferPointer cur_pointer = current_value.exchange(0); 
    145145  tPortBufferUnlocker unlocker; 
     
    154154    return; 
    155155  } 
    156   // Publish(*default_value); TODO 
     156 
     157  tUnusedManagerPointer buffer(tGlobalBufferPools::Instance().GetUnusedBuffer(GetCheaplyCopiedTypeBufferPoolIndex(), GetDataType()).release()); 
     158  buffer->GetObject().DeepCopyFrom(*default_value); 
     159  buffer->SetTimestamp(rrlib::time::cNO_TIME); 
     160  BrowserPublishRaw(buffer, true); 
    157161} 
    158162 
     
    167171      if (change_constant == tChangeStatus::CHANGED_INITIAL) 
    168172      { 
    169         data.Execute<false, tChangeStatus::CHANGED_INITIAL, true, true>(*this); 
     173        data.Execute<tChangeStatus::CHANGED_INITIAL, true, true>(*this); 
    170174      } 
    171175      else 
    172176      { 
    173         data.Execute<false, tChangeStatus::CHANGED, true, true>(*this); 
     177        data.Execute<tChangeStatus::CHANGED, true, true>(*this); 
    174178      } 
    175179    } 
     
    178182      if (change_constant == tChangeStatus::CHANGED_INITIAL) 
    179183      { 
    180         data.Execute<false, tChangeStatus::CHANGED_INITIAL, true, false>(*this); 
     184        data.Execute<tChangeStatus::CHANGED_INITIAL, true, false>(*this); 
    181185      } 
    182186      else 
    183187      { 
    184         data.Execute<false, tChangeStatus::CHANGED, true, false>(*this); 
     188        data.Execute<tChangeStatus::CHANGED, true, false>(*this); 
    185189      } 
    186190    } 
     
    193197      if (change_constant == tChangeStatus::CHANGED_INITIAL) 
    194198      { 
    195         data.Execute<false, tChangeStatus::CHANGED_INITIAL, true, true>(*this); 
     199        data.Execute<tChangeStatus::CHANGED_INITIAL, true, true>(*this); 
    196200      } 
    197201      else 
    198202      { 
    199         data.Execute<false, tChangeStatus::CHANGED, true, true>(*this); 
     203        data.Execute<tChangeStatus::CHANGED, true, true>(*this); 
    200204      } 
    201205    } 
     
    204208      if (change_constant == tChangeStatus::CHANGED_INITIAL) 
    205209      { 
    206         data.Execute<false, tChangeStatus::CHANGED_INITIAL, true, false>(*this); 
     210        data.Execute<tChangeStatus::CHANGED_INITIAL, true, false>(*this); 
    207211      } 
    208212      else 
    209213      { 
    210         data.Execute<false, tChangeStatus::CHANGED, true, false>(*this); 
     214        data.Execute<tChangeStatus::CHANGED, true, false>(*this); 
    211215      } 
    212216    } 
     
    217221void tCheapCopyPort::CallPullRequestHandler(tPublishingDataGlobalBuffer& publishing_data) 
    218222{ 
    219   tUnusedManagerPointer result = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release()); 
     223  tUnusedManagerPointer result = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release()); 
    220224  if (pull_request_handler->RawPullRequest(*this, *result)) 
    221225  { 
     
    226230void tCheapCopyPort::CallPullRequestHandler(tPublishingDataThreadLocalBuffer& publishing_data) 
    227231{ 
    228   tUnusedManagerPointer result = tUnusedManagerPointer(tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index).release()); 
     232  tUnusedManagerPointer result = tUnusedManagerPointer(tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release()); 
    229233  if (pull_request_handler->RawPullRequest(*this, *result)) 
    230234  { 
     
    260264  { 
    261265    tLockingManagerPointer dc = PullValueRaw(strategy == tStrategy::PULL_IGNORING_HANDLER_ON_THIS_PORT); 
    262     buffer.DeepCopyFrom(dc->GetObject(), NULL); 
     266    buffer.DeepCopyFrom(dc->GetObject()); 
    263267    timestamp = dc->GetTimestamp(); 
    264268  } 
     
    275279    { 
    276280      common::tPublishOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer> data(static_cast<tThreadLocalBufferManager*>(current_buffer.GetPointer()), false); 
    277       data.Execute<false, tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 
     281      data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 
    278282      return; 
    279283    } 
     
    281285    // there obviously will not arrive any buffer from current thread in the meantime 
    282286 
    283     auto unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index); 
     287    auto unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()); 
    284288    for (; ;) 
    285289    { 
     
    291295      { 
    292296        common::tPublishOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer> data(static_cast<tThreadLocalBufferManager*>(current_buffer.GetPointer()), true); 
    293         data.Execute<false, tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 
     297        data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 
    294298        return; 
    295299      } 
     
    299303  else 
    300304  { 
    301     tUnusedManagerPointer unused_manager = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release()); 
     305    tUnusedManagerPointer unused_manager = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release()); 
    302306    CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL); 
    303307    common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(unused_manager); 
    304     data.Execute<false, tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 
     308    data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 
    305309  } 
    306310} 
     
    376380// 
    377381 
    378 void tCheapCopyPort::InitialPushTo(tAbstractPort& target, bool reverse) 
     382void tCheapCopyPort::InitialPushTo(core::tConnector& connector) 
    379383{ 
    380384  // this is a one-time event => use global buffer 
    381   tUnusedManagerPointer unused_manager(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release()); 
     385  tUnusedManagerPointer unused_manager(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release()); 
    382386  CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL); 
    383   common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(unused_manager); 
    384   tCheapCopyPort& target_port = static_cast<tCheapCopyPort&>(target); 
    385   if (reverse) 
    386   { 
    387     common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer>::Receive<true, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 
    388   } 
    389   else 
    390   { 
    391     common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer>::Receive<false, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 
     387  if (typeid(connector) == typeid(common::tConversionConnector)) 
     388  { 
     389    static_cast<common::tConversionConnector&>(connector).Publish(unused_manager->GetObject(), unused_manager->GetTimestamp(), tChangeStatus::CHANGED_INITIAL); 
     390  } 
     391  else 
     392  { 
     393    common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(unused_manager); 
     394    tCheapCopyPort& target_port = static_cast<tCheapCopyPort&>(connector.Destination()); 
     395    common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer>::Receive<tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 
    392396  } 
    393397} 
     
    400404    if (current_buffer->GetThreadLocalOrigin()) 
    401405    { 
    402       tUnusedManagerPointer unused_manager = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release()); 
     406      tUnusedManagerPointer unused_manager = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release()); 
    403407      CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL); 
    404408      publishing_data.Init(unused_manager); 
     
    429433  // there obviously will not arrive any buffer from current thread in the meantime 
    430434 
    431   auto unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index); 
     435  auto unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()); 
    432436  for (; ;) 
    433437  { 
     
    471475} 
    472476 
    473 void tCheapCopyPort::NotifyDisconnect() 
    474 { 
    475   if (GetFlag(tFlag::DEFAULT_ON_DISCONNECT)) 
    476   { 
    477     ApplyDefaultValue(); 
    478   } 
    479 } 
    480  
    481477tCheapCopyPort::tLockingManagerPointer tCheapCopyPort::PullValueRaw(bool ignore_pull_request_handler_on_this_port) 
    482478{ 
     
    510506  if (!default_value) 
    511507  { 
    512     default_value.reset(new_default.GetType().CreateInstanceGeneric()); 
     508    default_value.reset(new_default.GetType().CreateGenericObject()); 
    513509  } 
    514510  else if (default_value->GetType() != new_default.GetType()) 
  • optimized/tCheapCopyPort.h

    r131 r135  
    953953  tLockingManagerPointer PullValueRaw(bool ignore_pull_request_handler_on_this_port = false); 
    954954 
    955 //  virtual void SetMaxQueueLengthImpl(int length); 
     955  virtual void SetMaxQueueLengthImplementation(int length) override; 
    956956 
    957957  /*! 
  • optimized/tCheapCopyPort.h

    r133 r135  
    6565template<typename T> 
    6666class tPort; 
    67  
    68 namespace common 
    69 { 
    70 template <typename TPort, typename TPublishingData, typename TManager> 
    71 class tPullOperation; 
    72 } 
    7367 
    7468namespace optimized 
     
    172166  virtual ~tCheapCopyPort(); 
    173167 
    174   /*! 
    175    * Set current value to default value 
    176    */ 
    177   void ApplyDefaultValue(); 
     168  virtual void ApplyDefaultValue() override; 
    178169 
    179170  /*! 
     
    311302   * \return Returns data type's 'cheaply copyable type index' 
    312303   */ 
    313   inline uint32_t GetCheaplyCopyableTypeIndex() const 
    314   { 
    315     return cheaply_copyable_type_index; 
     304  inline uint32_t GetCheaplyCopiedTypeBufferPoolIndex() const 
     305  { 
     306    return cheaply_copied_type_buffer_pool_index; 
    316307  } 
    317308 
     
    377368    return PullValueRaw(ignore_pull_request_handler_on_this_port); 
    378369  } 
    379  
    380   /*! 
    381    * \return Unit of port 
    382    */ 
    383   inline tUnit GetUnit() 
    384   { 
    385     return unit; 
    386   } 
    387  
    388   virtual void NotifyDisconnect(); // TODO: why is this virtual? 
    389370 
    390371//  /*! 
     
    494475    } 
    495476 
     477    const rrlib::rtti::tGenericObject& GetObject() 
     478    { 
     479      return published_buffer->GetObject(); 
     480    } 
     481    const rrlib::time::tTimestamp& GetTimestamp() 
     482    { 
     483      return published_buffer->GetTimestamp(); 
     484    } 
     485 
    496486    /*! 
    497487     * Reinitializes publishing data with adjusted buffer 
     
    587577    } 
    588578 
     579    const rrlib::rtti::tGenericObject& GetObject() 
     580    { 
     581      return published_buffer->GetObject(); 
     582    } 
     583    const rrlib::time::tTimestamp& GetTimestamp() 
     584    { 
     585      return published_buffer->GetTimestamp(); 
     586    } 
     587 
    589588    /*! 
    590589     * Reinitializes publishing data with adjusted buffer 
     
    625624  tUnusedManagerPointer GetUnusedBuffer(tPublishingDataGlobalBuffer& publishing_data) 
    626625  { 
    627     return tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release()); 
     626    return tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release()); 
    628627  } 
    629628  tUnusedManagerPointer GetUnusedBuffer(tPublishingDataThreadLocalBuffer& publishing_data) 
    630629  { 
    631     return tUnusedManagerPointer(tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index).release()); 
     630    return tUnusedManagerPointer(tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copied_type_buffer_pool_index, GetDataType()).release()); 
    632631  } 
    633632 
     
    660659  friend class common::tPullOperation; 
    661660 
    662   /*! 'cheaply copyable type index' of type used in this port */ 
    663   uint32_t cheaply_copyable_type_index; 
     661  /*! Index of buffer pool to use */ 
     662  uint32_t cheaply_copied_type_buffer_pool_index; 
    664663 
    665664  /*! default value - invariant: must never be null if used (must always be copied, too) */ 
     
    704703  /*! Object that handles pull requests - null if there is none (typical case) */ 
    705704  tPullRequestHandlerRaw* pull_request_handler; 
    706  
    707   /*! Unit of port (currently only used for numeric ports) */ 
    708   tUnit unit; 
    709705 
    710706 
     
    804800//    } 
    805801// 
    806 //    tThreadLocalBufferManager* unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index); 
     802//    tThreadLocalBufferManager* unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copied_type_buffer_pool_index); 
    807803//    for (; ;) 
    808804//    { 
     
    831827//  } 
    832828 
    833   virtual void InitialPushTo(tAbstractPort& target, bool reverse) override; 
     829  virtual void InitialPushTo(core::tConnector& connector) override; 
    834830 
    835831  /*! 
     
    974970    } 
    975971  } 
    976 #else 
    977  
    978 public: 
    979   inline tUnit GetUnit() 
    980   { 
    981     return tUnit::cNO_UNIT; 
    982   } 
    983  
    984972#endif // RRLIB_SINGLE_THREADED 
    985973}; 
  • standard/tStandardPort.cpp

    r131 r135  
    309309} 
    310310 
    311 //void tStandardPort::SetMaxQueueLengthImplementation(int length) 
    312 //{ 
    313 //  assert((GetFlag(tFlag::HAS_QUEUE) && queue != NULL)); 
    314 //  assert((!IsOutputPort())); 
    315 //  assert((length >= 1)); 
    316 //  queue->SetMaxLength(length); 
    317 //} 
     311void tStandardPort::SetMaxQueueLengthImplementation(int length) 
     312{ 
     313  if (!input_queue) 
     314  { 
     315    FINROC_LOG_PRINT(WARNING, "Cannot set queue length for port without queue"); 
     316  } 
     317  else 
     318  { 
     319    input_queue->SetMaxQueueLength(length); 
     320  } 
     321} 
    318322 
    319323void tStandardPort::SetPullRequestHandler(tPullRequestHandlerRaw* pull_request_handler_) 
  • standard/tStandardPort.cpp

    r133 r135  
    112112  current_value(0), 
    113113  standard_assign(!GetFlag(tFlag::NON_STANDARD_ASSIGN) && (!GetFlag(tFlag::HAS_QUEUE))), 
     114  compression_active_status(-2), 
     115  data_compressor_mutex("tStandardPort data compressor"), 
    114116  input_queue(), 
    115117  pull_request_handler(NULL) 
     
    166168    return; 
    167169  } 
    168   Publish(default_value); 
     170  default_value->AddLocks(1); 
     171  tLockingManagerPointer pointer(default_value.get()); 
     172  Publish(pointer); 
    169173} 
    170174 
     
    175179    if (change_constant == tChangeStatus::CHANGED_INITIAL) 
    176180    { 
    177       PublishImplementation<false, tChangeStatus::CHANGED_INITIAL, true, true>(data); 
     181      PublishImplementation<tChangeStatus::CHANGED_INITIAL, true, true>(data); 
    178182    } 
    179183    else 
    180184    { 
    181       PublishImplementation<false, tChangeStatus::CHANGED, true, true>(data); 
     185      PublishImplementation<tChangeStatus::CHANGED, true, true>(data); 
    182186    } 
    183187  } 
     
    186190    if (change_constant == tChangeStatus::CHANGED_INITIAL) 
    187191    { 
    188       PublishImplementation<false, tChangeStatus::CHANGED_INITIAL, true, false>(data); 
     192      PublishImplementation<tChangeStatus::CHANGED_INITIAL, true, false>(data); 
    189193    } 
    190194    else 
    191195    { 
    192       PublishImplementation<false, tChangeStatus::CHANGED, true, false>(data); 
     196      PublishImplementation<tChangeStatus::CHANGED, true, false>(data); 
    193197    } 
    194198  } 
     
    214218tPortBufferManager* tStandardPort::CreateDefaultValue(const common::tAbstractDataPortCreationInfo& creation_info, tBufferPool& buffer_pool) 
    215219{ 
    216   if (creation_info.DefaultValueSet() || creation_info.flags.Get(tFlag::DEFAULT_ON_DISCONNECT)) 
     220  if (creation_info.DefaultValueSet() || creation_info.flags.Get(tFlag::DEFAULT_ON_DISCONNECT) || creation_info.flags.Get(tFlag::VOLATILE) || creation_info.flags.Get(tFlag::NETWORK_ELEMENT)) 
    217221  { 
    218222    tPortBufferManager* pdm = buffer_pool.GetUnusedBuffer(creation_info.data_type).release(); 
     
    263267} 
    264268 
    265 void tStandardPort::InitialPushTo(tAbstractPort& target, bool reverse) 
     269void tStandardPort::InitialPushTo(core::tConnector& connector) 
    266270{ 
    267271  tLockingManagerPointer manager = GetCurrentValueRaw(tStrategy::NEVER_PULL); 
    268272  assert(IsReady()); 
    269273 
    270   common::tPublishOperation<tStandardPort, tPublishingData> data(manager, 1000); 
    271   tStandardPort& target_port = static_cast<tStandardPort&>(target); 
    272   if (reverse) 
    273   { 
    274     common::tPublishOperation<tStandardPort, tPublishingData>::Receive<true, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 
     274  if (typeid(connector) == typeid(common::tConversionConnector)) 
     275  { 
     276    static_cast<common::tConversionConnector&>(connector).Publish(manager->GetObject(), manager->GetTimestamp(), tChangeStatus::CHANGED_INITIAL); 
    275277  } 
    276278  else 
    277279  { 
    278     common::tPublishOperation<tStandardPort, tPublishingData>::Receive<false, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 
     280    common::tPublishOperation<tStandardPort, tPublishingData> data(manager, 1000); 
     281    tStandardPort& target_port = static_cast<tStandardPort&>(connector.Destination()); 
     282    common::tPublishOperation<tStandardPort, tPublishingData>::Receive<tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 
    279283  } 
    280284} 
     
    298302} 
    299303 
    300 void tStandardPort::NotifyDisconnect() 
    301 { 
    302   if (GetFlag(tFlag::DEFAULT_ON_DISCONNECT)) 
    303   { 
    304     ApplyDefaultValue(); 
    305   } 
    306 } 
    307  
    308 void tStandardPort::PrintStructure(int indent, std::stringstream& output) const 
    309 { 
    310   tFrameworkElement::PrintStructure(indent, output); 
    311   if (multi_type_buffer_pool) 
    312   { 
    313     multi_type_buffer_pool->PrintStructure(indent + 2, output); 
    314   } 
    315 } 
    316  
    317304tStandardPort::tLockingManagerPointer tStandardPort::PullValueRaw(bool ignore_pull_request_handler_on_this_port) 
    318305{ 
  • standard/tStandardPort.h

    r131 r135  
    588588  tLockingManagerPointer PullValueRaw(bool ignore_pull_request_handler_on_this_port = false); 
    589589 
    590   //virtual void SetMaxQueueLengthImplementation(int length); 
     590  virtual void SetMaxQueueLengthImplementation(int length) override; 
    591591 
    592592  /*! 
  • standard/tStandardPort.h

    r133 r135  
    6969class tPort; 
    7070 
    71 namespace common 
    72 { 
    73 template <typename TPort, typename TPublishingData, typename TManager> 
    74 class tPullOperation; 
    75 } 
    76  
    7771namespace standard 
    7872{ 
     
    165159   * Set current value to default value 
    166160   */ 
    167   void ApplyDefaultValue(); 
     161  virtual void ApplyDefaultValue() override; 
    168162 
    169163  /*! 
     
    253247  virtual tUnusedManagerPointer GetUnusedBufferRaw(const rrlib::rtti::tType& dt); 
    254248 
    255   virtual void NotifyDisconnect(); // TODO: why is this virtual? 
    256  
    257249  /*! 
    258250   * Publish Data Buffer. This data will be forwarded to any connected ports. 
     
    264256  inline void Publish(tUnusedManagerPointer& data) 
    265257  { 
    266     PublishImplementation<false, tChangeStatus::CHANGED, false, false>(data); 
     258    PublishImplementation<tChangeStatus::CHANGED, false, false>(data); 
    267259  } 
    268260  inline void Publish(tLockingManagerPointer& data) 
    269261  { 
    270     PublishImplementation<false, tChangeStatus::CHANGED, false, false>(data); 
     262    PublishImplementation<tChangeStatus::CHANGED, false, false>(data); 
    271263  } 
    272264 
     
    371363    void CheckRecycle() {} 
    372364 
     365    const rrlib::rtti::tGenericObject& GetObject() 
     366    { 
     367      return published_buffer->GetObject(); 
     368    } 
     369    const rrlib::time::tTimestamp& GetTimestamp() 
     370    { 
     371      return published_buffer->GetTimestamp(); 
     372    } 
     373 
    373374    void Init(tPortBufferManager* published) 
    374375    { 
     
    436437  const bool standard_assign; 
    437438 
     439  friend class data_compression::tPlugin; 
     440 
     441  /*! accessed by finroc_plugins_data_compression only: Is compression enabled for this port? [31 bytes rule revision when this was checked, 1 byte enabled?] */ 
     442  std::atomic<uint32_t> compression_active_status; 
     443 
     444  /*! accessed by finroc_plugins_data_compression only: Mutex for data compressors attached to this port */ 
     445  rrlib::thread::tOrderedMutex data_compressor_mutex; 
     446 
    438447  /*! Queue for ports with incoming value queue */ 
    439448  std::unique_ptr<common::tPortQueue<tLockingManagerPointer>> input_queue; 
     
    485494  virtual int GetMaxQueueLengthImplementation() const override; 
    486495 
    487   // quite similar to publish 
    488   virtual void InitialPushTo(tAbstractPort& target, bool reverse) override; 
     496  virtual void InitialPushTo(core::tConnector& connector) override; 
    489497 
    490498  /*! 
     
    529537  } 
    530538 
    531   virtual void PrintStructure(int indent, std::stringstream& output) const override; 
    532  
    533539  /*! 
    534540   * Publish data 
    535541   * 
    536542   * \param data Data to publish 
    537    * \param reverse Value received in reverse direction? 
    538543   * \param changed_constant changedConstant to use 
    539544   */ 
    540   inline void Publish(tUnusedManagerPointer& data, bool reverse, tChangeStatus changed_constant) 
    541   { 
    542     if (!reverse) 
    543     { 
    544       if (changed_constant == tChangeStatus::CHANGED) 
    545       { 
    546         PublishImplementation<false, tChangeStatus::CHANGED, false, false>(data); 
    547       } 
    548       else 
    549       { 
    550         PublishImplementation<false, tChangeStatus::CHANGED_INITIAL, false, false>(data); 
    551       } 
     545  inline void Publish(tUnusedManagerPointer& data, tChangeStatus changed_constant) 
     546  { 
     547    if (changed_constant == tChangeStatus::CHANGED) 
     548    { 
     549      PublishImplementation<tChangeStatus::CHANGED, false, false>(data); 
    552550    } 
    553551    else 
    554552    { 
    555       if (changed_constant == tChangeStatus::CHANGED) 
    556       { 
    557         PublishImplementation<true, tChangeStatus::CHANGED, false, false>(data); 
    558       } 
    559       else 
    560       { 
    561         PublishImplementation<true, tChangeStatus::CHANGED_INITIAL, false, false>(data); 
    562       } 
     553      PublishImplementation<tChangeStatus::CHANGED_INITIAL, false, false>(data); 
    563554    } 
    564555  } 
     
    572563   * 
    573564   * \param data Data buffer 
    574    * \tparam REVERSE Publish in reverse direction? (typical is forward) 
    575565   * \tparam CHANGE_CONSTANT changedConstant to use 
    576566   * \tparam BROWSER_PUBLISH Inform this port's listeners on change and also publish in reverse direction? (only set from BrowserPublish()) 
    577567   */ 
    578   template <bool REVERSE, tChangeStatus CHANGE_CONSTANT, bool BROWSER_PUBLISH, bool NOTIFY_LISTENER_ON_THIS_PORT, typename TDeleter> 
     568  template <tChangeStatus CHANGE_CONSTANT, bool BROWSER_PUBLISH, bool NOTIFY_LISTENER_ON_THIS_PORT, typename TDeleter> 
    579569  inline void PublishImplementation(std::unique_ptr<tPortBufferManager, TDeleter>& data) 
    580570  { 
     
    586576 
    587577    common::tPublishOperation<tStandardPort, tPublishingData> publish_operation(data, 1000); 
    588     publish_operation.Execute<REVERSE, CHANGE_CONSTANT, BROWSER_PUBLISH, NOTIFY_LISTENER_ON_THIS_PORT>(*this); 
     578    publish_operation.Execute<CHANGE_CONSTANT, BROWSER_PUBLISH, NOTIFY_LISTENER_ON_THIS_PORT>(*this); 
    589579  } 
    590580 
  • tGenericPort.h

    r119 r135  
    201201 
    202202  /*! 
     203   * Dequeue all elements currently in input queue 
     204   * 
     205   * \param result_buffer Vector to fill with dequeued buffers (any contents in vector when calling this method are discarded) 
     206   */ 
     207  void DequeueAllBuffers(std::vector<tPortDataPointer<const rrlib::rtti::tGenericObject>>& result_buffer) 
     208  { 
     209    implementation->DequeueAllBuffers(*GetWrapped(), result_buffer); 
     210  } 
     211 
     212  /*! 
    203213   * Gets port's current value 
    204214   * 
  • tGenericPort.h

    r134 r135  
    4848//---------------------------------------------------------------------- 
    4949#include "plugins/data_ports/api/tGenericPortImplementation.h" 
     50#include "plugins/data_ports/tEvent.h" 
    5051 
    5152//---------------------------------------------------------------------- 
     
    102103   * A tQueueSettings argument creates an input queue with the specified settings. 
    103104   * tBounds<T> are port's bounds. 
    104    * tUnit argument is port's unit. 
    105105   * tPortCreationBase argument is copied. This is only allowed as first argument. 
    106106   */ 
     
    124124   * 
    125125   * (It's preferred to add listeners before port is initialized) 
    126    * (Note: Buffer in 'value' always has data type of port backend (e.g. tNumber instead of double) 
    127126   */ 
    128127  template <typename TListener> 
     
    136135   * 
    137136   * (It's preferred to add listeners before port is initialized) 
    138    * (Note: Buffer in 'value' always has data type of port backend (e.g. tNumber instead of double) 
    139137   */ 
    140138  template <typename TListener> 
     
    151149  template <typename TListener> 
    152150  void AddPortListenerSimple(TListener& listener); 
     151 
     152  /*! 
     153   * Apply default value to port 
     154   * (not particularly efficient) 
     155   * 
     156   * apply_type_default_if_no_port_default_defined If no default value was set in constructor, use default of data type? (otherwise prints a warning) 
     157   */ 
     158  void ApplyDefault(bool apply_type_default_if_no_port_default_defined) 
     159  { 
     160    if (GetDefaultValue() || (!apply_type_default_if_no_port_default_defined)) 
     161    { 
     162      static_cast<common::tAbstractDataPort&>(*GetWrapped()).ApplyDefaultValue(); 
     163    } 
     164    else 
     165    { 
     166      tPortDataPointer<rrlib::rtti::tGenericObject> buffer = GetUnusedBuffer(); 
     167      std::unique_ptr<rrlib::rtti::tGenericObject> default_buffer(buffer->GetType().CreateGenericObject()); 
     168      buffer->DeepCopyFrom(*default_buffer); 
     169      BrowserPublish(buffer); 
     170    } 
     171  } 
    153172 
    154173  /*! 
     
    213232   * \return Buffer with port's current value with read lock. 
    214233   * 
    215    * Note: Buffer always has data type of port backend (e.g. tNumber instead of double) 
    216234   * If this is not desired, use pass-by-value-Get Operation above. 
    217235   */ 
     
    230248 
    231249  /*! 
    232    * Note: Buffer always has data type of port backend (e.g. tNumber instead of double) 
    233250   * If this is not desired, use pass-by-value-Publish Operation below. 
    234251   * 
     
    271288 
    272289  /*! 
     290   * Publish Data Buffer. This data will be forwarded to any connected ports. 
     291   * Should only be called on output ports. 
     292   * 
     293   * \param buffer Buffer with data to publish - typically acquired with GetUnuserBuffer() 
     294   */ 
     295  inline void Publish(tPortDataPointer<rrlib::rtti::tGenericObject>& buffer) 
     296  { 
     297    implementation->Publish(*GetWrapped(), buffer); 
     298  } 
     299 
     300  /*! 
    273301   * Set new bounds 
    274302   * (This is not thread-safe and must only be done in "pause mode") 
    275303   * 
    276    * \param b New Bounds 
     304   * \param min Minimum value 
     305   * \param max Maximum value 
    277306   */ 
    278307  inline void SetBounds(const rrlib::rtti::tGenericObject& min, const rrlib::rtti::tGenericObject& max) 
     
    294323   * 
    295324   * \param wrap Type-less tAbstractPort to wrap as tGenericPort 
    296    * \param use_backend_type_only Use only the internal data type that used in the port backend? (e.g. tNumber instead of double; relevant e.g. for Get() and Publish() methods) 
    297    */ 
    298   static tGenericPort Wrap(core::tAbstractPort& wrap, bool use_backend_type_only = false) 
     325   */ 
     326  static tGenericPort Wrap(core::tAbstractPort& wrap) 
    299327  { 
    300328    if (!IsDataFlowType(wrap.GetDataType())) 
     
    304332    tGenericPort port; 
    305333    port.SetWrapped(&wrap); 
    306     port.implementation = api::tGenericPortImplementation::GetImplementation( 
    307                             ((!use_backend_type_only) && wrap.GetWrapperDataType() != NULL) ? wrap.GetWrapperDataType() : wrap.GetDataType()); 
     334    port.implementation = api::tGenericPortImplementation::GetImplementation(wrap.GetDataType()); 
    308335    return port; 
    309336  } 
     337 
     338  static tGenericPort Wrap(core::tAbstractPort& wrap, bool unused) __attribute__((deprecated)) 
     339  { 
     340    return Wrap(wrap); 
     341  } 
     342 
    310343 
    311344//---------------------------------------------------------------------- 
Note: See TracChangeset for help on using the changeset viewer.