Changeset 80:6aa87abcfc35 in finroc_plugins_data_ports


Ignore:
Timestamp:
06.01.2015 16:01:04 (5 years ago)
Author:
Max Reichardt <mreichardt@…>
Branch:
default
Phase:
public
Message:

Adapted to changes in finroc_core. Made ApplyDefault() port functions virtual and moved handling of network connection loss (applying default value when flag is set) to tAbstractDataPort base class. Added unit test for this functionality. Fixed ApplyDefault functions.

Files:
8 edited

Legend:

Unmodified
Added
Removed
  • common/tAbstractDataPort.cpp

    r71 r80  
    102102} 
    103103 
    104 void tAbstractDataPort::ConnectionAdded(tAbstractPort& partner, bool partner_is_destination) 
     104void tAbstractDataPort::ConsiderInitialReversePush(tAbstractDataPort& target) 
     105{ 
     106  if (IsReady() && target.IsReady()) 
     107  { 
     108    if (ReversePushStrategy() && CountOutgoingConnections() == 1) 
     109    { 
     110      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", target.GetQualifiedName(), " to ", GetQualifiedName()); 
     111      target.InitialPushTo(*this, true); 
     112    } 
     113  } 
     114} 
     115 
     116void tAbstractDataPort::ForwardStrategy(int16_t strategy2, tAbstractDataPort* push_wanter) 
     117{ 
     118  for (auto it = IncomingConnectionsBegin(); it != IncomingConnectionsEnd(); ++it) 
     119  { 
     120    tAbstractDataPort& port = static_cast<tAbstractDataPort&>(*it); 
     121    if (push_wanter || port.GetStrategy() != strategy2) 
     122    { 
     123      port.PropagateStrategy(push_wanter, NULL); 
     124    } 
     125  } 
     126} 
     127 
     128int16_t tAbstractDataPort::GetMinNetworkUpdateIntervalForSubscription() const 
     129{ 
     130  int16_t result = std::numeric_limits<short>::max(); 
     131  int16_t t = 0; 
     132 
     133  for (auto it = OutgoingConnectionsBegin(); it != OutgoingConnectionsEnd(); ++it) 
     134  { 
     135    tAbstractDataPort& port = static_cast<tAbstractDataPort&>(*it); 
     136    if (port.GetStrategy() > 0) 
     137    { 
     138      if ((t = port.min_net_update_time) >= 0 && t < result) 
     139      { 
     140        result = t; 
     141      } 
     142    } 
     143  } 
     144  for (auto it = IncomingConnectionsBegin(); it != IncomingConnectionsEnd(); ++it) 
     145  { 
     146    tAbstractDataPort& port = static_cast<tAbstractDataPort&>(*it); 
     147    if (port.GetFlag(tFlag::PUSH_STRATEGY_REVERSE)) 
     148    { 
     149      if ((t = port.min_net_update_time) >= 0 && t < result) 
     150      { 
     151        result = t; 
     152      } 
     153    } 
     154  } 
     155 
     156  return result == std::numeric_limits<short>::max() ? -1 : result; 
     157} 
     158 
     159int16_t tAbstractDataPort::GetStrategyRequirement() const 
     160{ 
     161  if (GetFlag(tFlag::PUSH_STRATEGY)) 
     162  { 
     163    if (GetFlag(tFlag::USES_QUEUE)) 
     164    { 
     165      int qlen = GetMaxQueueLength(); 
     166      return static_cast<int16_t>((qlen > 0 ? std::min<int>(qlen, std::numeric_limits<int16_t>::max()) : std::numeric_limits<int16_t>::max())); 
     167    } 
     168    else 
     169    { 
     170      return 1; 
     171    } 
     172  } 
     173  else 
     174  { 
     175    return static_cast<int16_t>((IsInputPort() || IsConnected() ? 0 : -1)); 
     176  } 
     177} 
     178 
     179void tAbstractDataPort::OnConnect(tAbstractPort& partner, bool partner_is_destination) 
    105180{ 
    106181  if (!dynamic_cast<tAbstractDataPort*>(&partner)) 
     
    118193} 
    119194 
    120 void tAbstractDataPort::ConnectionRemoved(tAbstractPort& partner, bool partner_is_destination) 
     195void tAbstractDataPort::OnDisconnect(tAbstractPort& partner, bool partner_is_destination) 
    121196{ 
    122197  if (partner_is_destination) 
     
    134209    this->PropagateStrategy(NULL, NULL); 
    135210  } 
    136 } 
    137  
    138 void tAbstractDataPort::ConsiderInitialReversePush(tAbstractDataPort& target) 
    139 { 
    140   if (IsReady() && target.IsReady()) 
    141   { 
    142     if (ReversePushStrategy() && CountOutgoingConnections() == 1) 
    143     { 
    144       FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", target.GetQualifiedName(), " to ", GetQualifiedName()); 
    145       target.InitialPushTo(*this, true); 
    146     } 
    147   } 
    148 } 
    149  
    150 void tAbstractDataPort::ForwardStrategy(int16_t strategy2, tAbstractDataPort* push_wanter) 
    151 { 
    152   for (auto it = IncomingConnectionsBegin(); it != IncomingConnectionsEnd(); ++it) 
    153   { 
    154     tAbstractDataPort& port = static_cast<tAbstractDataPort&>(*it); 
    155     if (push_wanter || port.GetStrategy() != strategy2) 
    156     { 
    157       port.PropagateStrategy(push_wanter, NULL); 
    158     } 
    159   } 
    160 } 
    161  
    162 int16_t tAbstractDataPort::GetMinNetworkUpdateIntervalForSubscription() const 
    163 { 
    164   int16_t result = std::numeric_limits<short>::max(); 
    165   int16_t t = 0; 
    166  
    167   for (auto it = OutgoingConnectionsBegin(); it != OutgoingConnectionsEnd(); ++it) 
    168   { 
    169     tAbstractDataPort& port = static_cast<tAbstractDataPort&>(*it); 
    170     if (port.GetStrategy() > 0) 
    171     { 
    172       if ((t = port.min_net_update_time) >= 0 && t < result) 
    173       { 
    174         result = t; 
    175       } 
    176     } 
    177   } 
    178   for (auto it = IncomingConnectionsBegin(); it != IncomingConnectionsEnd(); ++it) 
    179   { 
    180     tAbstractDataPort& port = static_cast<tAbstractDataPort&>(*it); 
    181     if (port.GetFlag(tFlag::PUSH_STRATEGY_REVERSE)) 
    182     { 
    183       if ((t = port.min_net_update_time) >= 0 && t < result) 
    184       { 
    185         result = t; 
    186       } 
    187     } 
    188   } 
    189  
    190   return result == std::numeric_limits<short>::max() ? -1 : result; 
    191 } 
    192  
    193 int16_t tAbstractDataPort::GetStrategyRequirement() const 
    194 { 
    195   if (GetFlag(tFlag::PUSH_STRATEGY)) 
    196   { 
    197     if (GetFlag(tFlag::USES_QUEUE)) 
    198     { 
    199       int qlen = GetMaxQueueLength(); 
    200       return static_cast<int16_t>((qlen > 0 ? std::min<int>(qlen, std::numeric_limits<int16_t>::max()) : std::numeric_limits<int16_t>::max())); 
    201     } 
    202     else 
    203     { 
    204       return 1; 
    205     } 
    206   } 
    207211  else 
    208212  { 
    209     return static_cast<int16_t>((IsInputPort() || IsConnected() ? 0 : -1)); 
     213    this->OnNetworkConnectionLoss(); 
     214  } 
     215} 
     216 
     217void tAbstractDataPort::OnNetworkConnectionLoss() 
     218{ 
     219  if (GetFlag(tFlag::DEFAULT_ON_DISCONNECT)) 
     220  { 
     221    ApplyDefaultValue(); 
    210222  } 
    211223} 
  • common/tAbstractDataPort.h

    r59 r80  
    7979 
    8080  tAbstractDataPort(const tAbstractDataPortCreationInfo& create_info); 
     81 
     82  /*! 
     83   * Set current value to default value 
     84   */ 
     85  virtual void ApplyDefaultValue() = 0; 
    8186 
    8287  /*! 
     
    337342  static core::tAbstractPortCreationInfo AdjustPortCreationInfo(const tAbstractDataPortCreationInfo& create_info); 
    338343 
    339   virtual void ConnectionAdded(tAbstractPort& partner, bool partner_is_destination) override; 
    340  
    341   virtual void ConnectionRemoved(tAbstractPort& partner, bool partner_is_destination) override; 
    342  
    343344  /*! 
    344345   * Should be called in situations where there might need to be an initial push 
     
    376377   */ 
    377378  virtual void InitialPushTo(tAbstractPort& target, bool reverse) = 0; 
     379 
     380  virtual void OnConnect(tAbstractPort& partner, bool partner_is_destination) override; 
     381 
     382  virtual void OnDisconnect(tAbstractPort& partner, bool partner_is_destination) override; 
     383 
     384  virtual void OnNetworkConnectionLoss() override; 
    378385}; 
    379386 
  • optimized/tCheapCopyPort.cpp

    r69 r80  
    154154    return; 
    155155  } 
    156   // Publish(*default_value); TODO 
     156 
     157  tUnusedManagerPointer buffer(tGlobalBufferPools::Instance().GetUnusedBuffer(GetCheaplyCopyableTypeIndex()).release()); 
     158  buffer->GetObject().DeepCopyFrom(*default_value); 
     159  buffer->SetTimestamp(rrlib::time::cNO_TIME); 
     160  BrowserPublishRaw(buffer, true); 
    157161} 
    158162 
     
    471475} 
    472476 
    473 void tCheapCopyPort::NotifyDisconnect() 
    474 { 
    475   if (GetFlag(tFlag::DEFAULT_ON_DISCONNECT)) 
    476   { 
    477     ApplyDefaultValue(); 
    478   } 
    479 } 
    480  
    481477tCheapCopyPort::tLockingManagerPointer tCheapCopyPort::PullValueRaw(bool ignore_pull_request_handler_on_this_port) 
    482478{ 
  • optimized/tCheapCopyPort.h

    r69 r80  
    172172  virtual ~tCheapCopyPort(); 
    173173 
    174   /*! 
    175    * Set current value to default value 
    176    */ 
    177   void ApplyDefaultValue(); 
     174  virtual void ApplyDefaultValue() override; 
    178175 
    179176  /*! 
     
    386383  } 
    387384 
    388   virtual void NotifyDisconnect(); // TODO: why is this virtual? 
    389  
    390385//  /*! 
    391386//   * Publish data 
  • optimized/tSingleThreadedCheapCopyPortGeneric.h

    r69 r80  
    125125  virtual ~tSingleThreadedCheapCopyPortGeneric(); 
    126126 
    127   /*! 
    128    * Set current value to default value 
    129    */ 
    130   void ApplyDefaultValue(); 
     127  virtual void ApplyDefaultValue() override; 
    131128 
    132129  /*! 
  • standard/tStandardPort.cpp

    r69 r80  
    166166    return; 
    167167  } 
    168   Publish(default_value); 
     168  default_value->AddLocks(1); 
     169  tLockingManagerPointer pointer(default_value.get()); 
     170  Publish(pointer); 
    169171} 
    170172 
     
    295297    publishing_data.AddLock(); 
    296298    input_queue->Enqueue(tLockingManagerPointer(publishing_data.published_buffer)); 
    297   } 
    298 } 
    299  
    300 void tStandardPort::NotifyDisconnect() 
    301 { 
    302   if (GetFlag(tFlag::DEFAULT_ON_DISCONNECT)) 
    303   { 
    304     ApplyDefaultValue(); 
    305299  } 
    306300} 
  • standard/tStandardPort.h

    r59 r80  
    165165   * Set current value to default value 
    166166   */ 
    167   void ApplyDefaultValue(); 
     167  virtual void ApplyDefaultValue() override; 
    168168 
    169169  /*! 
     
    252252 
    253253  virtual tUnusedManagerPointer GetUnusedBufferRaw(const rrlib::rtti::tType& dt); 
    254  
    255   virtual void NotifyDisconnect(); // TODO: why is this virtual? 
    256254 
    257255  /*! 
  • tests/test_collection.cpp

    r68 r80  
    238238} 
    239239 
     240template <typename T> 
     241void TestNetworkConnectionLoss(const T& default_value, const T& publish_value) 
     242{ 
     243  core::tFrameworkElement* parent = new core::tFrameworkElement(&core::tRuntimeEnvironment::GetInstance(), "TestNetworkConnectionLoss"); 
     244 
     245  tOutputPort<T> output_port("Output Port", parent); 
     246  tInputPort<T> input_port_no_explicit_default("Input Port No Explicit Default", parent, core::tFrameworkElement::tFlag::DEFAULT_ON_DISCONNECT); 
     247  tInputPort<T> input_port_explicit_default("Input Port Explicit Default", parent, core::tFrameworkElement::tFlag::DEFAULT_ON_DISCONNECT, default_value); 
     248  tInputPort<T> input_port_deferred_default("Input Port Deferred Default", parent, core::tFrameworkElement::tFlag::DEFAULT_ON_DISCONNECT); 
     249  input_port_deferred_default.SetDefault(default_value); 
     250  output_port.ConnectTo(input_port_no_explicit_default); 
     251  output_port.ConnectTo(input_port_explicit_default); 
     252  output_port.ConnectTo(input_port_deferred_default); 
     253  parent->Init(); 
     254 
     255  output_port.Publish(publish_value); 
     256  RRLIB_UNIT_TESTS_EQUALITY(publish_value, *input_port_no_explicit_default.GetPointer()); 
     257  RRLIB_UNIT_TESTS_EQUALITY(publish_value, *input_port_explicit_default.GetPointer()); 
     258  RRLIB_UNIT_TESTS_EQUALITY(publish_value, *input_port_deferred_default.GetPointer()); 
     259  input_port_no_explicit_default.GetWrapped()->NotifyOfNetworkConnectionLoss(); 
     260  input_port_explicit_default.GetWrapped()->NotifyOfNetworkConnectionLoss(); 
     261  input_port_deferred_default.GetWrapped()->NotifyOfNetworkConnectionLoss(); 
     262  RRLIB_UNIT_TESTS_EQUALITY(T(), *input_port_no_explicit_default.GetPointer()); 
     263  RRLIB_UNIT_TESTS_EQUALITY(default_value, *input_port_explicit_default.GetPointer()); 
     264  RRLIB_UNIT_TESTS_EQUALITY(default_value, *input_port_deferred_default.GetPointer()); 
     265  output_port.Publish(publish_value); 
     266  RRLIB_UNIT_TESTS_EQUALITY(publish_value, *input_port_no_explicit_default.GetPointer()); 
     267  RRLIB_UNIT_TESTS_EQUALITY(publish_value, *input_port_explicit_default.GetPointer()); 
     268  RRLIB_UNIT_TESTS_EQUALITY(publish_value, *input_port_deferred_default.GetPointer()); 
     269  output_port.DisconnectAll(); 
     270  RRLIB_UNIT_TESTS_EQUALITY(T(), *input_port_no_explicit_default.GetPointer()); 
     271  RRLIB_UNIT_TESTS_EQUALITY(default_value, *input_port_explicit_default.GetPointer()); 
     272  RRLIB_UNIT_TESTS_EQUALITY(default_value, *input_port_deferred_default.GetPointer()); 
     273 
     274  parent->ManagedDelete(); 
     275} 
     276 
    240277class DataPortsTestCollection : public rrlib::util::tUnitTestSuite 
    241278{ 
     
    251288    TestPortListeners<int>(1); 
    252289    TestPortListeners<std::string>("test"); 
     290    TestNetworkConnectionLoss<int>(4, 7); 
     291    TestNetworkConnectionLoss<std::string>("default_value", "published_value"); 
    253292 
    254293    tThreadLocalBufferManagement local_buffers; 
     
    256295    TestPortQueues<int>(1, 2, 3); 
    257296    TestPortListeners<int>(1); 
     297    TestNetworkConnectionLoss<int>(4, 7); 
    258298  } 
    259299}; 
Note: See TracChangeset for help on using the changeset viewer.