Changeset 187:da63d2c59a8a in finroc_plugins_tcp


Ignore:
Timestamp:
05.05.2020 00:10:53 (12 months ago)
Author:
Max Reichardt <mreichardt@…>
Branch:
17.03
Children:
188:054a5d87d050, 189:a6cc7cd79aef
Parents:
184:675dfcc4d494 (diff), 186:48a9973a8bde (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Phase:
public
Message:

Merge with 14.08

Files:
2 edited

Legend:

Unmodified
Added
Removed
  • internal/tPeerImplementation.h

    r178 r187  
    4242#include <boost/asio/ip/tcp.hpp> 
    4343#include <boost/asio/deadline_timer.hpp> 
     44#include <boost/asio/io_service.hpp> 
    4445#include <set> 
    4546 
  • internal/tPeerImplementation.h

    r186 r187  
    4040// External includes (system with <>, local with "") 
    4141//---------------------------------------------------------------------- 
    42 #include <map> 
    4342#include <boost/asio/ip/tcp.hpp> 
    4443#include <boost/asio/deadline_timer.hpp> 
    4544#include <boost/asio/io_service.hpp> 
     45#include <set> 
     46 
    4647#include "rrlib/thread/tThread.h" 
    47 #include "core/tFrameworkElement.h" 
    4848 
    4949//---------------------------------------------------------------------- 
    5050// Internal includes with "" 
    5151//---------------------------------------------------------------------- 
    52 #include "plugins/network_transport/structure_info/tFrameworkElementInfo.h" 
    53 #include "plugins/tcp/common/tRemoteTypes.h" 
    54 //#include "plugins/tcp/internal/tNetworkPortInfo.h" 
    5552#include "plugins/tcp/internal/tPeerInfo.h" 
    56 #include "plugins/tcp/internal/tPortBufferChangeEvent.h" 
    57 #include "plugins/tcp/internal/tSerializedStructureChange.h" 
    5853 
    5954//---------------------------------------------------------------------- 
     
    7065// Forward declarations / typedefs / enums 
    7166//---------------------------------------------------------------------- 
    72  
    73 class tPlugin; 
    7467class tServer; 
     68class tConnection; 
    7569 
    7670//---------------------------------------------------------------------- 
     
    8175 * Implementation of different variants of TCP peer. 
    8276 */ 
    83 class tPeerImplementation : public core::tRuntimeListener 
     77class tPeerImplementation : public tTCPPlugin 
    8478{ 
    8579 
    86   typedef rrlib::buffer_pools::tBufferPool < tPortBufferChangeEvent, rrlib::concurrent_containers::tConcurrency::MULTIPLE_READERS, 
    87           rrlib::buffer_pools::management::QueueBased, rrlib::buffer_pools::deleting::CollectGarbage, 
    88           rrlib::buffer_pools::recycling::UseOwnerStorageInBuffer > tPortBufferChangeEventPool; 
    89  
    9080//---------------------------------------------------------------------- 
    9181// Public methods and typedefs 
    9282//---------------------------------------------------------------------- 
    9383public: 
    94  
    95   typedef typename tPortBufferChangeEventPool::tPointer tChangeEventPointer; 
    9684 
    9785  /*! 
     
    9987   * \param Options for peer creation 
    10088   */ 
    101   tPeerImplementation(core::tFrameworkElement& framework_element, const tOptions& options); 
     89  tPeerImplementation(); 
    10290 
    10391  /*! Shuts peer down */ 
     
    11199  /*! Starts actively connecting to the specified network */ 
    112100  void Connect(); 
    113  
    114   /*! 
    115    * Connect local port to port in remote runtime environment using TCP network transport plugin. 
    116    * 
    117    * \param local_port Local port to connect 
    118    * \param remote_runtime_uuid UUID of remote runtime 
    119    * \param remote_port_handle Handle of remote port 
    120    * \param remote_port_link Link of port in remote runtime environment 
    121    * \param disconnect If 'false' the ports are connected - if 'true' the ports are disconnected 
    122    * \return Returns error message if connecting failed. On success an empty string is returned. 
    123    */ 
    124   std::string Connect(core::tAbstractPort& local_port, const std::string& remote_runtime_uuid, int remote_port_handle, const std::string remote_port_link, bool disconnect); 
    125101 
    126102  /*! 
     
    148124 
    149125  /*! 
    150    * Gets remote part with specified UUID. 
    151    * If no such part has been registered yet, creates a new one. 
     126   * Obtains remote runtime with specified UUID. 
     127   * If no such runtime has been registered yet, creates a new one. 
    152128   * 
    153    * \param uuid UUID of remote part 
     129   * \param uuid UUID of remote runtime 
    154130   * \param peer_type Peer type 
    155131   * \param peer_name Name of peer. Will be displayed in tooling and status messages. Does not need to be unique. Typically the program/process name. 
    156    * \param address IP address of remote part 
     132   * \param address IP address of remote runtime 
    157133   * \param never_forget Is this a remote peer to never forget? 
    158    * \return Pointer to remote part 
    159    */ 
    160   tRemotePart* GetRemotePart(const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget); 
     134   * \return Pointer to remote runtime 
     135   */ 
     136  network_transport::generic_protocol::tRemoteRuntime* GetOrCreateRemoteRuntime(std::shared_ptr<tConnection>& connection, const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget); 
     137 
     138  virtual network_transport::generic_protocol::tRemoteRuntime* GetRemoteRuntime(const network_transport::generic_protocol::tUriConnectorData& connector) override; 
    161139 
    162140  /*! 
     
    168146  } 
    169147 
     148  virtual void Init(tXmlNode* config_node) override; 
     149 
    170150  /*! 
    171151   * Reference to Boost asio IO service 
     
    174154  { 
    175155    return *io_service; 
    176   } 
    177  
    178   void PortChanged(data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject>& value, tNetworkPortInfo* info, data_ports::tChangeContext& change_context) 
    179   { 
    180     tChangeEventPointer event_buffer = port_buffer_change_event_buffers.GetUnusedBuffer(); 
    181     if (!event_buffer) 
    182     { 
    183       event_buffer = port_buffer_change_event_buffers.AddBuffer(std::unique_ptr<tPortBufferChangeEvent>(new tPortBufferChangeEvent())); 
    184     } 
    185     event_buffer->new_value = std::move(value); 
    186     event_buffer->network_port_info = info; 
    187     event_buffer->change_type = change_context.ChangeType(); 
    188     incoming_port_buffer_changes.Enqueue(event_buffer); 
    189156  } 
    190157 
     
    202169 
    203170  /*! 
     171   * Processes incoming structure changes from local runtime 
     172   */ 
     173  void ProcessLocalRuntimeStructureChanges() 
     174  { 
     175    tTCPPlugin::ProcessLocalRuntimeStructureChanges(); 
     176  } 
     177 
     178  /*! 
    204179   * Called in a regular interval to do things like establishing new connections 
    205180   */ 
    206181  void ProcessLowPriorityTasks(); 
    207  
    208   /*! 
    209    * Processes all enqueued runtime change events in TCP thread 
    210    * and distributes them to all connections that are interested. 
    211    */ 
    212   void ProcessRuntimeChangeEvents(); 
    213182 
    214183  /*! 
     
    219188 
    220189  /*! 
    221    * Serializes shared ports and returns that in memory buffer 
    222    * 
    223    * \param connection_type_encoder Type encoder object of connection to serialize shared ports for 
    224    */ 
    225   rrlib::serialization::tMemoryBuffer SerializeSharedPorts(common::tRemoteTypes& connection_type_encoder); 
    226  
    227  
    228   /*! 
    229    * \return True as soon as peer also serves clients interested in complete application structure 
    230    */ 
    231   bool ServesStructure() const 
    232   { 
    233     return serve_structure.load(); 
    234   } 
    235  
    236   /*! 
    237190   * Mark the peer list as changed. 
    238191   * This will cause the peer list to be sent to all connected peers. 
     
    248201  void StartServer(); 
    249202 
    250   void StartServingStructure() 
    251   { 
    252     serve_structure.store(true); 
     203  /*! 
     204   * \return Unused buffer for initialization of prototype streams 
     205   */ 
     206  rrlib::serialization::tStackMemoryBuffer<16>& UnusedInitializationBuffer() 
     207  { 
     208    return unused_initialization_buffer; 
    253209  } 
    254210 
     
    258214private: 
    259215 
     216  friend class tTCPPluginInstance; 
    260217  friend class tServer; 
    261218  friend class tTCPThread; 
     
    269226  friend struct tConnectorTask; 
    270227 
    271   /*! Framework element associated with server */ 
    272   core::tFrameworkElement& framework_element; 
    273  
    274   /*! Options that peer was created with */ 
    275   tOptions create_options; 
    276  
    277   /*! Vector containing all network addresses this peer should try to connect to */ 
     228  /*! Unused buffer for initialization of prototype streams (must not be destroyed before connections are) */ 
     229  rrlib::serialization::tStackMemoryBuffer<16> unused_initialization_buffer; 
     230 
     231  /*! Vector containing all network addresses this peer should currently try to connect to */ 
    278232  std::vector<std::string> connect_to; 
     233 
     234  /*! Vector containing all network addresses this peer already connected to (should not be added to connect_to again) */ 
     235  std::set<std::string> connected_to; 
    279236 
    280237  /*! Info on this peer */ 
     
    286243   * connection to remote part is established. 
    287244   */ 
    288   std::vector<std::unique_ptr<tPeerInfo>> other_peers; 
     245  std::vector<std::shared_ptr<tPeerInfo>> other_peers; 
    289246 
    290247  /*! Revision of peer information */ 
     
    306263  tServer* server; 
    307264 
    308   /*! Cached info on ports shared by this peer */ 
    309   std::map<core::tFrameworkElement::tHandle, rrlib::serialization::tFixedBuffer> shared_ports; 
    310  
    311   /*! Mutex for 'shared_ports' access */ 
    312   rrlib::thread::tMutex shared_ports_mutex; 
    313  
    314   /*! 
    315    * True as soon as peer also serves clients interested in complete application structure. 
    316    * This may be enabled later, as this can cause quite a lot of overhead if done during 
    317    * construction and startup of large (MCA2) applications. 
    318    */ 
    319   std::atomic<bool> serve_structure; 
    320  
    321   /*! 
    322    * Concurrent queue with incoming structure changes. 
    323    * Queue is filled when runtime changes occur (structure mutex is acquired by this thread). 
    324    * Queue is processed when TCP thread calls ProcessIncomingStructureChanges() 
    325    */ 
    326   rrlib::concurrent_containers::tQueue < std::unique_ptr<tSerializedStructureChange>, rrlib::concurrent_containers::tConcurrency::SINGLE_READER_AND_WRITER, 
    327         rrlib::concurrent_containers::tDequeueMode::ALL > incoming_structure_changes; 
    328  
    329265  /*! 
    330266   * Actively connect to specified network? 
     
    333269  bool actively_connect; 
    334270 
    335   /*! Ports to check subscriptions for */ 
    336   std::vector<tFrameworkElementHandle> pending_subscription_checks; 
    337  
    338   /*! Mutex for 'pending_subscription_checks' access */ 
    339   rrlib::thread::tMutex pending_subscription_checks_mutex; 
    340  
    341   /*! Copy for TCP thread */ 
    342   std::vector<tFrameworkElementHandle> pending_subscription_checks_copy; 
    343  
    344271  /*! True when event loop is running (ProcessEvents() is regularly called by TCP thread) */ 
    345272  bool event_loop_running; 
    346  
    347   /*! Concurrent queue with incoming port value changes */ 
    348   rrlib::concurrent_containers::tQueue < tChangeEventPointer, rrlib::concurrent_containers::tConcurrency::MULTIPLE_WRITERS, 
    349         rrlib::concurrent_containers::tDequeueMode::ALL > incoming_port_buffer_changes; 
    350  
    351   /*! Buffer pool with port value change event buffers */ 
    352   tPortBufferChangeEventPool port_buffer_change_event_buffers; 
    353  
    354   /*! List with deleted RPC ports - so that buffer pools created in connection for these ports can be deleted */ 
    355   std::vector<core::tFrameworkElement::tHandle> deleted_rpc_ports; 
    356  
    357   /*! Mutex for 'deleted_rpc_ports' access */ 
    358   rrlib::thread::tMutex deleted_rpc_ports_mutex; 
    359273 
    360274 
     
    368282  void AddPeerAddresses(tPeerInfo& existing_peer, const std::vector<boost::asio::ip::address>& addresses); 
    369283 
     284  virtual bool Create(core::tAbstractPort& owner_port, const rrlib::uri::tURI& uri, const rrlib::uri::tURIElements& uri_elements, const core::tUriConnectOptions& connect_options) override; 
     285 
    370286  /*! 
    371287   * Scans current peer list and adds missing addresses 
     
    374290  void InferMissingAddresses(); 
    375291 
    376   /*! Is provided element a shared port (to be announced to other peers)? */ 
    377   static bool IsSharedPort(core::tFrameworkElement& framework_element); 
    378  
    379   virtual void OnEdgeChange(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target) override; 
    380   virtual void OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element) override; 
    381  
    382   /*! Handles runtime changes from callbacks - and forwards info to 'incoming_structure_changes' queue */ 
    383   void ProcessRuntimeChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element, bool edge_change); 
     292  virtual void OnStartServingStructure() override; 
    384293 
    385294  /*! 
     
    393302  void StartThread(); 
    394303 
    395   /*! 
    396    * Called whenever a new edge is added and updates network connection info if necessary 
    397    */ 
    398   void UpdateNetworkConnectionInfo(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target, bool& target_port_changed); 
    399304}; 
    400305 
Note: See TracChangeset for help on using the changeset viewer.