Changeset 127:d3065c722bdb in finroc_plugins_blackboard


Ignore:
Timestamp:
28.11.2017 11:24:50 (4 years ago)
Author:
Max Reichardt <max.reichardt@…>
Branch:
14.08
Phase:
public
Message:

Adds strict single-buffered mode option to blackboards that never copies blackboard contents (and, thus, does not provide a read port). This is useful for blackboard contents that are too expensive to copy. Furthermore, fixes an issue that real read locks (necessary in this setup) did not wake up pending calls.

Files:
7 edited

Legend:

Unmodified
Added
Removed
  • definitions.h

    r102 r127  
    7979}; 
    8080 
     81/*! 
     82 * Blackboard buffer mode 
     83 */ 
     84enum class tBlackboardBufferMode 
     85{ 
     86  SINGLE_BUFFERED,                   //!< Blackboard is single-buffered: parallel locks except of parallel read locks block; all blackboard accesses access the same buffer in memory 
     87  MULTI_BUFFERED,                    //!< Blackboard is multi-buffered: blackboard is copied on write access; only parallel write locks block (as changes should be made sequentially so that none of them are lost) 
     88  MULTI_BUFFERED_ON_PARALLEL_ACCESS  //!< Blackboard is single-buffered initially: if parallel locks occur that block and would not block in multi-buffered mode - switches to multi-buffered mode 
     89}; 
     90 
    8191 
    8292//---------------------------------------------------------------------- 
  • internal/tBlackboardServer.h

    r121 r127  
    110110  typedef data_ports::tPortDataPointer<tChangeSet> tChangeSetPointer; 
    111111 
     112  class tReadLockedBufferPointer : public tConstBufferPointer 
     113  { 
     114    friend class tBlackboardServer; 
     115    typedef tConstBufferPointer tBase; 
     116 
     117    tBlackboardServer* process_pending_on_unlock; 
     118 
     119  public: 
     120    template <typename ... TArgs> 
     121    tReadLockedBufferPointer(TArgs && ... args) : 
     122      tBase(std::forward<TArgs>(args)...), 
     123      process_pending_on_unlock(nullptr) 
     124    { 
     125    } 
     126 
     127    tReadLockedBufferPointer(tReadLockedBufferPointer && other) : 
     128      tBase(std::forward<tBase>(other)), 
     129      process_pending_on_unlock(nullptr) 
     130    { 
     131      std::swap(process_pending_on_unlock, other.process_pending_on_unlock); 
     132    } 
     133 
     134    tReadLockedBufferPointer& operator=(tReadLockedBufferPointer && other) 
     135    { 
     136      tBase::operator=(std::forward<tBase>(other)); 
     137      std::swap(process_pending_on_unlock, other.process_pending_on_unlock); 
     138      return *this; 
     139    } 
     140 
     141    ~tReadLockedBufferPointer() 
     142    { 
     143      if (process_pending_on_unlock) 
     144      { 
     145        process_pending_on_unlock->HandleReadUnlock(*this); 
     146      } 
     147    } 
     148  }; 
     149 
     150 
    112151  /*! 
    113152   * \param name Name/Uid of blackboard 
    114153   * \param parent Parent of blackboard server 
    115    * \param multi_buffered Create blackboard server that is in multi_buffered mode initially? 
     154   * \param buffer_mode Buffer mode - whether to use multiple buffers to avoid blocking (at the cost of copying content) 
    116155   * \param elements Initial number of elements 
    117156   * \param shared Share blackboard with other runtime environments? 
    118157   */ 
    119   tBlackboardServer(const std::string& name, core::tFrameworkElement* parent = NULL, bool multi_buffered = false, size_t elements = 0, bool shared = true); 
     158  tBlackboardServer(const std::string& name, core::tFrameworkElement* parent = nullptr, tBlackboardBufferMode buffer_mode = tBlackboardBufferMode::MULTI_BUFFERED_ON_PARALLEL_ACCESS, size_t elements = 0, bool shared = true); 
    120159 
    121160  virtual ~tBlackboardServer() {} 
     
    179218   * \return Future on locked buffer 
    180219   */ 
    181   rpc_ports::tFuture<tConstBufferPointer> ReadLock(const rrlib::time::tDuration& timeout); 
     220  rpc_ports::tFuture<tReadLockedBufferPointer> ReadLock(const rrlib::time::tDuration& timeout); 
    182221 
    183222  /*! 
     
    195234private: 
    196235 
     236  friend class tReadLockedBuffer; 
     237 
    197238  /*! Wraps read and write lock requests for enqueueing */ 
    198239  class tLockRequest 
     
    207248 
    208249    /*! Promise for read lock, if read lock was requested */ 
    209     rpc_ports::tPromise<tConstBufferPointer> read_lock_promise; 
     250    rpc_ports::tPromise<tReadLockedBufferPointer> read_lock_promise; 
    210251 
    211252    /*! Timeout for lock request */ 
     
    224265    {} 
    225266 
    226     tLockRequest(rpc_ports::tPromise<tConstBufferPointer> && read_lock_promise, rrlib::time::tTimestamp timeout_time) : 
     267    tLockRequest(rpc_ports::tPromise<tReadLockedBufferPointer> && read_lock_promise, rrlib::time::tTimestamp timeout_time) : 
    227268      write_lock(false), 
    228269      write_lock_promise(), 
     
    268309  tUnlockFuture unlock_future; 
    269310 
    270   /*! True, if blackboard server is run in single-buffered mode */ 
    271   bool single_buffered; 
     311  /*! Buffer mode that blackboard server is currently using */ 
     312  tBlackboardBufferMode buffer_mode; 
    272313 
    273314 
     
    313354  void ConsiderPublishing() 
    314355  { 
    315     //if ((!single_buffered) || read_port.GetWrapped()->GetStrategy() > 0)  // TODO: Implementation needs to publish on strategy change, too (e.g. change log blackboard) 
     356    if (buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED) 
     357      //if ((!single_buffered) || read_port.GetWrapped()->GetStrategy() > 0)  // TODO: Implementation needs to publish on strategy change, too (e.g. change log blackboard) 
    316358    { 
    317359      assert(!current_buffer->IsUnused()); 
     
    346388  virtual void HandleException(rpc_ports::tFutureStatus exception_type) override; 
    347389 
     390  void HandleReadUnlock(tReadLockedBufferPointer& unlock); 
     391 
    348392  virtual void HandleResponse(tLockedBufferData<tBuffer> call_result) override; 
    349393 
  • internal/tBlackboardServer.hpp

    r121 r127  
    7171 
    7272template <typename T> 
    73 tBlackboardServer<T>::tBlackboardServer(const std::string& name, core::tFrameworkElement* parent, bool multi_buffered, size_t elements, bool shared) : 
     73tBlackboardServer<T>::tBlackboardServer(const std::string& name, core::tFrameworkElement* parent, tBlackboardBufferMode buffer_mode, size_t elements, bool shared) : 
    7474  tAbstractBlackboardServer(parent, name, GenerateConstructorFlags(shared)), 
    7575  read_port("read", this, core::tFrameworkElement::tFlag::FINSTRUCT_READ_ONLY | GenerateConstructorFlags(shared)), 
     
    8181  write_lock(tWriteLock::NONE), 
    8282  unlock_future(), 
    83   single_buffered(!multi_buffered) 
     83  buffer_mode(buffer_mode) 
    8484{ 
    8585  read_port.Init(); 
     
    8989    NewCurrentBuffer(true); 
    9090    rrlib::rtti::ResizeVector(current_buffer->GetObject().GetData<tBuffer>(), elements); 
    91     read_port.GetWrapped()->Publish(current_buffer); 
    92     current_buffer = read_port.GetWrapped()->GetCurrentValueRaw(); 
     91    if (buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED) 
     92    { 
     93      read_port.GetWrapped()->Publish(current_buffer); 
     94      current_buffer = read_port.GetWrapped()->GetCurrentValueRaw(); 
     95    } 
    9396  } 
    9497  assert(!current_buffer->IsUnused()); 
     
    193196 
    194197template <typename T> 
     198void tBlackboardServer<T>::HandleReadUnlock(tReadLockedBufferPointer& unlock) 
     199{ 
     200  if (!unlock) 
     201  { 
     202    HandleException(rpc_ports::tFutureStatus::READY); 
     203    return; 
     204  } 
     205 
     206  rrlib::thread::tLock lock(this->BlackboardMutex()); 
     207  unlock.Reset(); 
     208  if (current_buffer->Unique()) 
     209  { 
     210    this->ProcessPendingLockRequests(); 
     211  } 
     212} 
     213 
     214template <typename T> 
    195215void tBlackboardServer<T>::HandleResponse(tLockedBufferData<tBuffer> unlock_data) 
    196216{ 
     
    229249 
    230250  // Any pending lock requests? 
     251  unlock_data = tLockedBufferData<tBuffer>(); 
    231252  this->ProcessPendingLockRequests(); 
    232253} 
     
    235256void tBlackboardServer<T>::ProcessPendingLockRequests() 
    236257{ 
     258  assert(current_buffer->Unique() || buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED); 
    237259  while (pending_lock_requests.size() > 0) 
    238260  { 
     
    242264      if (lock_request.write_lock) 
    243265      { 
    244         WriteLockImplementation(lock_request.write_lock_promise, lock_request.remote_call); 
    245         pending_lock_requests.pop_front(); 
     266        if (current_buffer->Unique() || buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED) 
     267        { 
     268          WriteLockImplementation(lock_request.write_lock_promise, lock_request.remote_call); 
     269          pending_lock_requests.pop_front(); 
     270        } 
    246271        return; 
    247272      } 
     
    249274      { 
    250275        current_buffer->AddLocks(1); 
    251         tConstBufferPointer pointer_clone(data_ports::standard::tStandardPort::tLockingManagerPointer(current_buffer.get()), *read_port.GetWrapped()); 
     276        tReadLockedBufferPointer pointer_clone(data_ports::standard::tStandardPort::tLockingManagerPointer(current_buffer.get()), *read_port.GetWrapped()); 
     277        pointer_clone.process_pending_on_unlock = this; 
    252278        lock_request.read_lock_promise.SetValue(pointer_clone); 
    253279      } 
     
    258284 
    259285template <typename T> 
    260 rpc_ports::tFuture<typename tBlackboardServer<T>::tConstBufferPointer> tBlackboardServer<T>::ReadLock(const rrlib::time::tDuration& timeout) 
    261 { 
    262   rrlib::thread::tLock lock(this->BlackboardMutex()); 
    263   rpc_ports::tPromise<tConstBufferPointer> promise; 
    264   rpc_ports::tFuture<tConstBufferPointer> future = promise.GetFuture(); 
    265   if (write_lock != tWriteLock::EXCLUSIVE) 
     286rpc_ports::tFuture<typename tBlackboardServer<T>::tReadLockedBufferPointer> tBlackboardServer<T>::ReadLock(const rrlib::time::tDuration& timeout) 
     287{ 
     288  rrlib::thread::tLock lock(this->BlackboardMutex()); 
     289  rpc_ports::tPromise<tReadLockedBufferPointer> promise; 
     290  rpc_ports::tFuture<tReadLockedBufferPointer> future = promise.GetFuture(); 
     291  //if ((buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED && write_lock != tWriteLock::EXCLUSIVE) || (buffer_mode == tBlackboardBufferMode::SINGLE_BUFFERED && current_buffer->Unique())) 
     292  if (write_lock != tWriteLock::EXCLUSIVE && (buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED || pending_lock_requests.empty() /*|| (!pending_lock_requests[0].write_lock)*/)) 
    266293  { 
    267294    assert(!current_buffer->IsUnused()); 
    268295    current_buffer->AddLocks(1); 
    269     tConstBufferPointer pointer_clone(data_ports::standard::tStandardPort::tLockingManagerPointer(current_buffer.get()), *read_port.GetWrapped()); 
     296    tReadLockedBufferPointer pointer_clone(data_ports::standard::tStandardPort::tLockingManagerPointer(current_buffer.get()), *read_port.GetWrapped()); 
     297    pointer_clone.process_pending_on_unlock = this; 
    270298    promise.SetValue(pointer_clone); 
    271299  } 
    272300  else 
    273301  { 
    274     FINROC_LOG_PRINT(DEBUG, "Attempt to read-lock during exclusive write lock. Enabling multi-buffered mode to avoid blocking in such situations in the future."); 
    275     single_buffered = false; 
     302    if (buffer_mode == tBlackboardBufferMode::MULTI_BUFFERED_ON_PARALLEL_ACCESS) 
     303    { 
     304      FINROC_LOG_PRINT(DEBUG, "Attempt to read-lock during exclusive write lock. Enabling multi-buffered mode to avoid blocking in such situations in the future."); 
     305      buffer_mode = tBlackboardBufferMode::MULTI_BUFFERED; 
     306    } 
    276307    if (timeout > rrlib::time::tDuration::zero()) 
    277308    { 
     
    288319  rpc_ports::tPromise<tLockedBuffer<tBuffer>> promise; 
    289320  rpc_ports::tFuture<tLockedBuffer<tBuffer>> future = promise.GetFuture(); 
    290   if (write_lock == tWriteLock::NONE) 
     321  if ((buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED && write_lock == tWriteLock::NONE) || (buffer_mode == tBlackboardBufferMode::SINGLE_BUFFERED && current_buffer->Unique())) 
    291322  { 
    292323    WriteLockImplementation(promise, lock_parameters.IsRemoteCall()); 
     
    320351  else 
    321352  { 
     353    assert(buffer_mode != tBlackboardBufferMode::SINGLE_BUFFERED); 
    322354    write_lock = tWriteLock::ON_COPY; 
    323355    lock_id++; 
  • tBlackboard.h

    r121 r127  
    9696  typedef typename tServer::tBuffer tBuffer; 
    9797 
     98  struct tBlackboardBufferModeParameter 
     99  { 
     100    tBlackboardBufferModeParameter(tBlackboardBufferMode buffer_mode) : buffer_mode(buffer_mode) {} 
     101 
     102    // Constructor for legacy compatibility 
     103    tBlackboardBufferModeParameter(bool multi_buffered) : buffer_mode(multi_buffered ? tBlackboardBufferMode::MULTI_BUFFERED : tBlackboardBufferMode::MULTI_BUFFERED_ON_PARALLEL_ACCESS) {} 
     104 
     105    tBlackboardBufferMode buffer_mode; 
     106  }; 
     107 
     108 
    98109  /*! 
    99110   * Empty constructor for blackboards that are not initialized in 
     
    113124   * \param name Name of blackboard 
    114125   * \param parent Parent of blackboard 
    115    * \param multi_buffered Create multi-buffered blackboard? 
     126   * \param buffer_mode Buffer mode - whether to use multiple buffers to avoid blocking (at the cost of copying content) 
    116127   * \param elements Initial number of elements 
    117128   * \param create_client Create Blackboard client? 
    118    * \param create_read_port Which read ports to create for blackboard 
     129   * \param create_read_port Which read ports to create for blackboard (no read ports are created for single-buffere blackboards) 
    119130   * \param create_write_port_in If not NULL, creates write port in specified port group 
    120131   * \param create_write_port_in2 If not NULL, creates another write port in specified port group 
    121132   */ 
    122133  template <typename TParent> 
    123   tBlackboard(const std::string& name, TParent* parent, bool multi_buffered = false, int elements = 0, bool create_client = true, 
     134  tBlackboard(const std::string& name, TParent* parent, const tBlackboardBufferModeParameter& buffer_mode = tBlackboardBufferMode::MULTI_BUFFERED_ON_PARALLEL_ACCESS, int elements = 0, bool create_client = true, 
    124135              tReadPorts create_read_port = tReadPorts::EXTERNAL, core::tPortGroup* create_write_port_in = default_port_group, core::tPortGroup* create_write_port_in2 = NULL) : 
    125136    wrapped_server(NULL), 
     
    127138    read_port() 
    128139  { 
     140    create_read_port = buffer_mode.buffer_mode == tBlackboardBufferMode::SINGLE_BUFFERED ? tReadPorts::NONE : create_read_port; 
     141 
    129142    // Get/create Framework element to put blackboard stuff beneath 
    130143    core::tFrameworkElement* blackboard_parent = parent->GetChild("Blackboards"); 
     
    135148 
    136149    // Create blackboard server 
    137     wrapped_server = new internal::tBlackboardServer<T>(name, blackboard_parent, multi_buffered, elements, false); 
     150    wrapped_server = new internal::tBlackboardServer<T>(name, blackboard_parent, buffer_mode.buffer_mode, elements, false); 
    138151 
    139152    // Create blackboard client 
  • tBlackboardClient.h

    r121 r127  
    102102  typedef data_ports::tPortDataPointer<tBuffer> tBufferPointer; 
    103103  typedef data_ports::tPortDataPointer<const tBuffer> tConstBufferPointer; 
     104  typedef typename tServer::tReadLockedBufferPointer tReadLockedBufferPointer; 
    104105  typedef std::vector<tChange<T>> tChangeSet; 
    105106  typedef data_ports::tPortDataPointer<tChangeSet> tChangeSetPointer; 
     
    143144 
    144145  /*! 
    145    * Constructor for use tModule and tSenseControlModule. 
     146   * Constructor for use with tModule and tSenseControlModule. 
    146147   * Does NOT connect to global blackboards - but rather uses group's/module's input/output port groups. 
    147148   * 
     
    342343   * \exception rpc_ports::tRPCException is thrown if call fails 
    343344   */ 
    344   inline tConstBufferPointer Read(const rrlib::time::tDuration& timeout = std::chrono::seconds(2)) 
    345   { 
    346     if (read_port && read_port.GetFlag(core::tFrameworkElement::tFlag::PUSH_STRATEGY)) 
    347     { 
    348       return read_port.GetPointer(); 
    349     } 
    350     rpc_ports::tFuture<tConstBufferPointer> future = ReadLock(timeout); 
     345  inline tReadLockedBufferPointer Read(const rrlib::time::tDuration& timeout = std::chrono::seconds(2)) 
     346  { 
     347    rpc_ports::tFuture<tReadLockedBufferPointer> future = ReadLock(timeout); 
    351348    return future.Get(timeout); 
    352349  } 
     
    366363   * \exception rpc_ports::tRPCException is thrown if call fails 
    367364   */ 
    368   rpc_ports::tFuture<tConstBufferPointer> ReadLock(const rrlib::time::tDuration& timeout = std::chrono::seconds(10)); 
     365  rpc_ports::tFuture<tReadLockedBufferPointer> ReadLock(const rrlib::time::tDuration& timeout = std::chrono::seconds(10)); 
    369366 
    370367  /*! 
  • tBlackboardClient.hpp

    r113 r127  
    322322 
    323323template<typename T> 
    324 rpc_ports::tFuture<typename tBlackboardClient<T>::tConstBufferPointer> tBlackboardClient<T>::ReadLock(const rrlib::time::tDuration& timeout) 
     324rpc_ports::tFuture<typename tBlackboardClient<T>::tReadLockedBufferPointer> tBlackboardClient<T>::ReadLock(const rrlib::time::tDuration& timeout) 
    325325{ 
    326326  // Possibly obtain value from read_port 
    327327  if (read_port.GetWrapped() && read_port.GetFlag(core::tFrameworkElement::tFlag::PUSH_STRATEGY)) 
    328328  { 
    329     rpc_ports::tPromise<tConstBufferPointer> promise; 
    330     promise.SetValue(read_port.GetPointer()); 
    331     return promise.GetFuture(); 
     329    auto buffer_pointer = read_port.GetPointer(); 
     330    if (buffer_pointer->size())   // TODO: Heuristic: if blackboard does not have read port, blackboard has size zero; so if size is zero, lock via RPC lock which is always appropriate  (this heuristic can break, when read port was connected to multi-buffered blackboard - and is connected to single-buffered blackboard later); why is read port used at all? over the network this is significantly more efficient (push instead of pull) 
     331    { 
     332      rpc_ports::tPromise<tReadLockedBufferPointer> promise; 
     333      promise.SetValue(std::move(buffer_pointer)); 
     334      return promise.GetFuture(); 
     335    } 
    332336  } 
    333337 
  • tBlackboardReadAccess.h

    r126 r127  
    167167  friend class tBlackboardWriteAccess<T>; 
    168168 
    169   typedef typename tBlackboardClient<T>::tConstBufferPointer tConstBufferPointer; 
     169  typedef typename tBlackboardClient<T>::tReadLockedBufferPointer tReadLockedBufferPointer; 
    170170  typedef typename tBlackboardClient<T>::tBuffer tBuffer; 
    171171 
     
    174174 
    175175  /*! Future for locked buffer */ 
    176   rpc_ports::tFuture<tConstBufferPointer> locked_buffer_future; 
     176  rpc_ports::tFuture<tReadLockedBufferPointer> locked_buffer_future; 
    177177 
    178178  /*! Buffer from locked blackboard */ 
    179   tConstBufferPointer locked_buffer; 
     179  tReadLockedBufferPointer locked_buffer; 
    180180 
    181181  /*! Buffer from locked blackboard - as raw pointer (so that it can set from subclass also) */ 
Note: See TracChangeset for help on using the changeset viewer.