Changeset 107:8c041cbf9eb1 in finroc_plugins_data_ports


Ignore:
Timestamp:
07.04.2017 02:28:56 (2 years ago)
Author:
Max Reichardt <mreichardt@…>
Branch:
17.03
Phase:
public
Rebase:
63396335313931643163343164663033333831393261643164623431643565383331653032323536
Message:

Adds support for type conversion operations. Adapts to changes in finroc_core and rrlibs. Removes superseded 'MinNetUpdateInterval' from data_ports (this is a transport-specific parameter now set per connector).

Files:
2 added
14 edited

Legend:

Unmodified
Added
Removed
  • api/tPortDataPointerImplementation.h

    r101 r107  
    404404  } 
    405405 
    406   void Deserialize(rrlib::serialization::tInputStream& stream) 
    407   { 
    408     if (stream.ReadBoolean()) 
    409     { 
    410       rrlib::rtti::tType type; 
    411       stream >> type; 
    412       if ((!Get()) || Get()->GetType() != type) 
    413       { 
    414         tPortDataPointerImplementation<rrlib::rtti::tGenericObject, false> buffer = tDeserializationScope::GetUnusedBuffer(type); 
    415         std::swap(*this, buffer); 
    416       } 
    417       Get()->Deserialize(stream); 
    418       rrlib::time::tTimestamp timestamp; 
    419       stream >> timestamp; 
    420       SetTimestamp(timestamp); 
    421     } 
    422     else 
    423     { 
    424       *this = tPortDataPointerImplementation(); 
    425     } 
    426   } 
    427  
    428406  inline rrlib::rtti::tGenericObject* Get() const 
    429407  { 
     
    454432  } 
    455433 
    456   void Serialize(rrlib::serialization::tOutputStream& stream) const 
    457   { 
    458     stream.WriteBoolean(Get()); 
    459     if (Get()) 
    460     { 
    461       stream << Get()->GetType(); 
    462       Get()->Serialize(stream); 
    463       stream << GetTimestamp(); 
    464     } 
    465   } 
    466  
    467434  inline void SetTimestamp(const rrlib::time::tTimestamp& timestamp) 
    468435  { 
  • common/tAbstractDataPort.cpp

    r106 r107  
    3939//---------------------------------------------------------------------- 
    4040#include "plugins/data_ports/type_traits.h" 
     41#include "plugins/data_ports/common/tConversionConnector.h" 
    4142 
    4243//---------------------------------------------------------------------- 
     
    7778  custom_changed_flag(tChangeStatus::CHANGED_INITIAL), 
    7879  strategy(-1), 
    79   min_net_update_time(create_info.min_net_update_interval), 
    80   port_listener(NULL) 
     80  port_listener(nullptr) 
    8181{ 
    8282} 
     
    9393{ 
    9494  core::tAbstractPortCreationInfo result = create_info; 
    95   assert(result.data_type != NULL); 
     95  assert(result.data_type); 
    9696  if (IsCheaplyCopiedType(result.data_type)) 
    9797  { 
     
    108108    if (ReversePushStrategy() && CountOutgoingConnections() == 1) 
    109109    { 
    110       FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", target.GetQualifiedName(), " to ", GetQualifiedName()); 
     110      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", target, " to ", (*this)); 
    111111      target.InitialPushTo(*this, true); 
    112112    } 
    113113  } 
     114} 
     115 
     116core::tConnector* tAbstractDataPort::CreateConnector(tAbstractPort& destination, const core::tConnectOptions& connect_options) 
     117{ 
     118  if (connect_options.conversion_operations.Size() == 0 && this->GetDataType() == destination.GetDataType()) 
     119  { 
     120    return tAbstractPort::CreateConnector(destination, connect_options); 
     121  } 
     122  return new tConversionConnector(*this, destination, connect_options); 
    114123} 
    115124 
     
    121130    if (push_wanter || port.GetStrategy() != strategy2) 
    122131    { 
    123       port.PropagateStrategy(push_wanter, NULL); 
    124     } 
    125   } 
    126 } 
    127  
    128 int16_t tAbstractDataPort::GetMinNetworkUpdateIntervalForSubscription() const 
    129 { 
    130   int16_t result = std::numeric_limits<short>::max(); 
    131   int16_t t = 0; 
    132  
    133   for (auto it = OutgoingConnectionsBegin(); it != OutgoingConnectionsEnd(); ++it) 
    134   { 
    135     tAbstractDataPort& port = static_cast<tAbstractDataPort&>(it->Destination()); 
    136     if (port.GetStrategy() > 0) 
    137     { 
    138       if ((t = port.min_net_update_time) >= 0 && t < result) 
    139       { 
    140         result = t; 
    141       } 
    142     } 
    143   } 
    144   for (auto it = IncomingConnectionsBegin(); it != IncomingConnectionsEnd(); ++it) 
    145   { 
    146     tAbstractDataPort& port = static_cast<tAbstractDataPort&>(it->Source()); 
    147     if (port.GetFlag(tFlag::PUSH_STRATEGY_REVERSE)) 
    148     { 
    149       if ((t = port.min_net_update_time) >= 0 && t < result) 
    150       { 
    151         result = t; 
    152       } 
    153     } 
    154   } 
    155  
    156   return result == std::numeric_limits<short>::max() ? -1 : result; 
     132      port.PropagateStrategy(push_wanter, nullptr); 
     133    } 
     134  } 
    157135} 
    158136 
     
    186164  if (partner_is_destination) 
    187165  { 
    188     (static_cast<tAbstractDataPort&>(partner)).PropagateStrategy(NULL, this); 
     166    (static_cast<tAbstractDataPort&>(partner)).PropagateStrategy(nullptr, this); 
    189167 
    190168    // check whether we need an initial reverse push 
     
    206184    } 
    207185 
    208     static_cast<tAbstractDataPort&>(partner).PropagateStrategy(NULL, NULL); 
    209     this->PropagateStrategy(NULL, NULL); 
     186    static_cast<tAbstractDataPort&>(partner).PropagateStrategy(nullptr, nullptr); 
     187    this->PropagateStrategy(nullptr, nullptr); 
    210188  } 
    211189  else 
     
    264242      if (IsReady() && push_wanter->IsReady() && (!GetFlag(tFlag::NO_INITIAL_PUSHING)) && (!push_wanter->GetFlag(tFlag::NO_INITIAL_PUSHING))) 
    265243      { 
    266         FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial push from ", GetQualifiedName(), " to ", push_wanter->GetQualifiedName()); 
     244        FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial push from ", (*this), " to ", (*push_wanter)); 
    267245        InitialPushTo(*push_wanter, false); 
    268246      } 
     
    309287  } 
    310288  SetFlag(tFlag::HIJACKED_PORT, hijacked); 
    311   PropagateStrategy(NULL, NULL); 
    312 } 
    313  
    314 void tAbstractDataPort::SetMinNetUpdateInterval(rrlib::time::tDuration& new_interval) 
    315 { 
    316   int64_t ms = std::chrono::duration_cast<std::chrono::milliseconds>(new_interval).count(); 
    317   int16_t interval = std::min<int64_t>(ms < 0 ? 0 : ms, std::numeric_limits<int16_t>::max()); // adjust value to valid range 
    318   SetMinNetUpdateIntervalRaw(interval); 
    319 } 
    320  
    321 void tAbstractDataPort::SetMinNetUpdateIntervalRaw(int16_t new_interval) 
    322 { 
    323   tLock lock(GetStructureMutex()); 
    324   if (min_net_update_time != new_interval) 
    325   { 
    326     min_net_update_time = new_interval; 
    327     PublishUpdatedInfo(core::tRuntimeListener::tEvent::CHANGE); 
    328   } 
     289  PropagateStrategy(nullptr, nullptr); 
    329290} 
    330291 
     
    337298  } 
    338299  SetFlag(tFlag::PUSH_STRATEGY, push); 
    339   PropagateStrategy(NULL, NULL); 
     300  PropagateStrategy(nullptr, nullptr); 
    340301} 
    341302 
     
    356317      if (port.IsReady()) 
    357318      { 
    358         FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", port.GetQualifiedName(), " to ", GetQualifiedName()); 
     319        FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", port, " to ", (*this)); 
    359320        port.InitialPushTo(*this, true); 
    360321        break; 
  • common/tAbstractDataPort.h

    r93 r107  
    6363//---------------------------------------------------------------------- 
    6464 
     65template <typename TPort, typename TPublishingData> 
     66class tPublishOperation; 
     67 
     68template <typename TPort, typename TPublishingData, typename TManager> 
     69class tPullOperation; 
     70 
    6571//---------------------------------------------------------------------- 
    6672// Class declaration 
     
    117123 
    118124  /*! 
    119    * \return Minimum Network Update Interval (only-port specific one; -1 if there's no specific setting for port) 
    120    */ 
    121   inline rrlib::time::tDuration GetMinNetUpdateInterval() const 
    122   { 
    123     return std::chrono::milliseconds(min_net_update_time); 
    124   } 
    125  
    126   /*! 
    127    * \return Minimum Network Update Interval (only-port specific one; -1 if there's no specific setting for port) 
    128    */ 
    129   inline int16_t GetMinNetUpdateIntervalRaw() const 
    130   { 
    131     return min_net_update_time; 
    132   } 
    133  
    134   /*! 
    135    * (Helper function for network functions) 
    136    * Look for minimal port-specific minimal network update interval 
    137    * at all connected ports. 
    138    * 
    139    * \return result - -1 if no port has specific setting 
    140    */ 
    141   int16_t GetMinNetworkUpdateIntervalForSubscription() const; 
    142  
    143   /*! 
    144125   * \param listener Listener to remove 
    145126   */ 
     
    201182    custom_changed_flag = new_value; 
    202183  } 
    203  
    204   /*! 
    205    * \param interval Minimum Network Update Interval 
    206    */ 
    207   void SetMinNetUpdateInterval(rrlib::time::tDuration& interval); 
    208   void SetMinNetUpdateIntervalRaw(int16_t interval); 
    209184 
    210185  /*! 
     
    247222 
    248223  virtual ~tAbstractDataPort(); 
     224 
     225  virtual core::tConnector* CreateConnector(tAbstractPort& destination, const core::tConnectOptions& connect_options) override; 
    249226 
    250227  /*! 
     
    318295private: 
    319296 
     297  template <typename TPort, typename TPublishingData> 
     298  friend class tPublishOperation; 
     299 
     300  template <typename TPort, typename TPublishingData, typename TManager> 
     301  friend class tPullOperation; 
     302 
     303 
    320304  /*! Has port changed since last reset? (see constants above) */ 
    321305  std::atomic<int8_t> changed; 
     
    336320  int16_t strategy; 
    337321 
    338   /*! Minimum network update interval. Value < 0 means default for this type */ 
    339   int16_t min_net_update_time; 
    340  
    341322  /*! Listener(s) of port value changes */ 
    342323  common::tPortListenerRaw* port_listener; 
  • common/tPublishOperation.h

    r106 r107  
    4545// Internal includes with "" 
    4646//---------------------------------------------------------------------- 
     47#include "plugins/data_ports/common/tConversionConnector.h" 
    4748 
    4849//---------------------------------------------------------------------- 
     
    131132      for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it) 
    132133      { 
    133         TPort& destination_port = static_cast<TPort&>(it->Destination()); 
    134         if (destination_port.template WantsPush<REVERSE, CHANGE_CONSTANT>()) 
    135         { 
    136           Receive<REVERSE, CHANGE_CONSTANT>(*this, destination_port, port); 
     134        if (static_cast<const common::tAbstractDataPort&>(it->Destination()).template WantsPush<REVERSE, CHANGE_CONSTANT>()) 
     135        { 
     136          if (it->Flags().Get(core::tConnectionFlag::CONVERSION)) 
     137          { 
     138            static_cast<const tConversionConnector&>(*it).Publish(this->GetObject(), CHANGE_CONSTANT); 
     139          } 
     140          else 
     141          { 
     142            TPort& destination_port = static_cast<TPort&>(it->Destination()); 
     143            Receive<REVERSE, CHANGE_CONSTANT>(*this, destination_port, port); 
     144          } 
    137145        } 
    138146      } 
     
    144152      for (auto it = port.IncomingConnectionsBegin(); it != port.IncomingConnectionsEnd(); ++it) 
    145153      { 
    146         TPort& destination_port = static_cast<TPort&>(it->Source()); 
    147         if (destination_port.template WantsPush<true, CHANGE_CONSTANT>()) 
    148         { 
     154        if (static_cast<const common::tAbstractDataPort&>(it->Source()).template WantsPush<true, CHANGE_CONSTANT>() && (!it->Flags().Get(core::tConnectionFlag::CONVERSION))) 
     155        { 
     156          TPort& destination_port = static_cast<TPort&>(it->Source()); 
    149157          Receive<true, CHANGE_CONSTANT>(*this, destination_port, port); 
    150158        } 
     
    176184      for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it) 
    177185      { 
    178         TPort& destination_port = static_cast<TPort&>(it->Destination()); 
    179         if (destination_port.template WantsPush<false, CHANGE_CONSTANT>()) 
    180         { 
    181           Receive<false, CHANGE_CONSTANT>(publishing_data, destination_port, port); 
     186        if (static_cast<const common::tAbstractDataPort&>(it->Destination()).template WantsPush<REVERSE, CHANGE_CONSTANT>()) 
     187        { 
     188          if (it->Flags().Get(core::tConnectionFlag::CONVERSION)) 
     189          { 
     190            static_cast<const tConversionConnector&>(*it).Publish(publishing_data.GetObject(), CHANGE_CONSTANT); 
     191          } 
     192          else 
     193          { 
     194            TPort& destination_port = static_cast<TPort&>(it->Destination()); 
     195            Receive<false, CHANGE_CONSTANT>(publishing_data, destination_port, port); 
     196          } 
    182197        } 
    183198      } 
     
    186201      for (auto it = port.IncomingConnectionsBegin(); it != port.IncomingConnectionsEnd(); ++it) 
    187202      { 
    188         TPort& destination_port = static_cast<TPort&>(it->Source()); 
    189         if (&destination_port != &origin && destination_port.template WantsPush<true, CHANGE_CONSTANT>()) 
    190         { 
     203        if (&it->Source() != &origin && static_cast<const common::tAbstractDataPort&>(it->Source()).template WantsPush<true, CHANGE_CONSTANT>() && (!it->Flags().Get(core::tConnectionFlag::CONVERSION))) 
     204        { 
     205          TPort& destination_port = static_cast<TPort&>(it->Source()); 
    191206          Receive<true, CHANGE_CONSTANT>(publishing_data, destination_port, port); 
    192207        } 
     
    209224  static void PrintWarning(TPort& port, const char* warning) 
    210225  { 
    211     FINROC_LOG_PRINT_STATIC(WARNING, "Port '", port.GetQualifiedName(), "' ", warning); 
     226    FINROC_LOG_PRINT_STATIC(WARNING, "Port '", port, "' ", warning); 
    212227  } 
    213228 
  • common/tPullOperation.h

    r106 r107  
    134134    for (auto it = port.IncomingConnectionsBegin(); it != port.IncomingConnectionsEnd(); ++it) 
    135135    { 
     136      if (it->Flags().Get(core::tConnectionFlag::CONVERSION)) 
     137      { 
     138        continue; 
     139      } 
     140 
    136141      ExecuteImplementation(static_cast<TPort&>(it->Source()), false); 
    137142      typename TPort::tTaggedBufferPointer::tStorage tagged_pointer_raw = this->published_buffer_tagged_pointer; 
  • make.xml

    r101 r107  
    1515      standard/* 
    1616      optimized/* 
    17     </sources> 
    18   </library> 
    1917 
    20   <library name="api"> 
    21     <sources> 
    2218      tGenericPort.h 
    2319      tInputPort.h 
  • optimized/tCheapCopyPort.h

    r101 r107  
    6565template<typename T> 
    6666class tPort; 
    67  
    68 namespace common 
    69 { 
    70 template <typename TPort, typename TPublishingData, typename TManager> 
    71 class tPullOperation; 
    72 } 
    7367 
    7468namespace optimized 
     
    481475    } 
    482476 
     477    const rrlib::rtti::tGenericObject& GetObject() 
     478    { 
     479      return published_buffer->GetObject(); 
     480    } 
     481 
    483482    /*! 
    484483     * Reinitializes publishing data with adjusted buffer 
     
    572571        published_buffer = NULL; 
    573572      } 
     573    } 
     574 
     575    const rrlib::rtti::tGenericObject& GetObject() 
     576    { 
     577      return published_buffer->GetObject(); 
    574578    } 
    575579 
  • optimized/tPullRequestHandlerRaw.h

    r46 r107  
    4040// External includes (system with <>, local with "") 
    4141//---------------------------------------------------------------------- 
     42#include "rrlib/util/tNoncopyable.h" 
    4243 
    4344//---------------------------------------------------------------------- 
     
    7677public: 
    7778 
     79  virtual ~tPullRequestHandlerRaw() = default; 
     80 
    7881  /*! 
    7982   * Called whenever a pull request is intercepted 
  • optimized/tSingleThreadedCheapCopyPortGeneric.h

    r99 r107  
    107107    tPublishingData(const tCurrentValueBuffer& value) : value(&value) {} 
    108108 
     109    const rrlib::rtti::tGenericObject& GetObject() 
     110    { 
     111      return *value->data; 
     112    } 
     113 
    109114    template <typename T> 
    110115    T Value() 
  • standard/tStandardPort.cpp

    r88 r107  
    302302} 
    303303 
    304 void tStandardPort::PrintStructure(int indent, std::stringstream& output) const 
    305 { 
    306   tFrameworkElement::PrintStructure(indent, output); 
    307   if (multi_type_buffer_pool) 
    308   { 
    309     multi_type_buffer_pool->PrintStructure(indent + 2, output); 
    310   } 
    311 } 
    312  
    313304tStandardPort::tLockingManagerPointer tStandardPort::PullValueRaw(bool ignore_pull_request_handler_on_this_port) 
    314305{ 
  • standard/tStandardPort.h

    r88 r107  
    6969class tPort; 
    7070 
    71 namespace common 
    72 { 
    73 template <typename TPort, typename TPublishingData, typename TManager> 
    74 class tPullOperation; 
    75 } 
    76  
    7771namespace standard 
    7872{ 
     
    369363    void CheckRecycle() {} 
    370364 
     365    const rrlib::rtti::tGenericObject& GetObject() 
     366    { 
     367      return published_buffer->GetObject(); 
     368    } 
     369 
    371370    void Init(tPortBufferManager* published) 
    372371    { 
     
    534533    } 
    535534  } 
    536  
    537   virtual void PrintStructure(int indent, std::stringstream& output) const override; 
    538535 
    539536  /*! 
  • tGenericPort.h

    r106 r107  
    328328    port.SetWrapped(&wrap); 
    329329    port.implementation = api::tGenericPortImplementation::GetImplementation( 
    330                             ((!use_backend_type_only) && wrap.GetWrapperDataType() != NULL) ? wrap.GetWrapperDataType() : wrap.GetDataType()); 
     330                            ((!use_backend_type_only) && wrap.GetWrapperDataType()) ? wrap.GetWrapperDataType() : wrap.GetDataType()); 
    331331    return port; 
    332332  } 
  • tPort.h

    r101 r107  
    290290  } 
    291291 
    292   /*! 
    293    * \param interval Minimum Network Update Interval 
    294    */ 
    295   inline void SetMinNetUpdateInterval(rrlib::time::tDuration new_interval) 
    296   { 
    297     GetWrapped()->SetMinNetUpdateInterval(new_interval); 
    298   } 
    299  
    300292//  /* 
    301293//   * Set default value 
  • tPortDataPointer.h

    r103 r107  
    231231 
    232232  template <typename U> 
    233   friend rrlib::serialization::tOutputStream& operator << (rrlib::serialization::tOutputStream& stream, const tPortDataPointer<U>& data); 
     233  friend typename std::enable_if < (!std::is_same<typename std::remove_const<U>::type, rrlib::rtti::tGenericObject>::value), rrlib::serialization::tOutputStream >::type& operator << (rrlib::serialization::tOutputStream& stream, const tPortDataPointer<U>& data); 
    234234  template <typename U> 
    235   friend rrlib::serialization::tInputStream& operator >> (rrlib::serialization::tInputStream& stream, tPortDataPointer<U>& data); 
     235  friend typename std::enable_if < (!std::is_same<typename std::remove_const<U>::type, rrlib::rtti::tGenericObject>::value), rrlib::serialization::tInputStream >::type& operator >> (rrlib::serialization::tInputStream& stream, tPortDataPointer<U>& data); 
    236236 
    237237  friend class data_compression::tPlugin; 
     
    243243 
    244244template <typename T> 
    245 rrlib::serialization::tOutputStream& operator << (rrlib::serialization::tOutputStream& stream, const tPortDataPointer<T>& data) 
     245typename std::enable_if < (!std::is_same<typename std::remove_const<T>::type, rrlib::rtti::tGenericObject>::value), rrlib::serialization::tOutputStream >::type& operator << (rrlib::serialization::tOutputStream& stream, const tPortDataPointer<T>& data) 
    246246{ 
    247247  data.implementation.Serialize(stream); 
     
    250250 
    251251template <typename T> 
    252 rrlib::serialization::tInputStream& operator >> (rrlib::serialization::tInputStream& stream, tPortDataPointer<T>& data) 
     252typename std::enable_if < (!std::is_same<typename std::remove_const<T>::type, rrlib::rtti::tGenericObject>::value), rrlib::serialization::tInputStream >::type& operator >> (rrlib::serialization::tInputStream& stream, tPortDataPointer<T>& data) 
    253253{ 
    254254  data.implementation.Deserialize(stream); 
Note: See TracChangeset for help on using the changeset viewer.