Changeset 187:da63d2c59a8a in finroc_plugins_tcp
- Timestamp:
- 05.05.2020 00:10:53 (3 years ago)
- Branch:
- 17.03
- Children:
- 188:054a5d87d050, 189:a6cc7cd79aef
- Parents:
- 184:675dfcc4d494 (diff), 186:48a9973a8bde (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Phase:
- public
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
internal/tPeerImplementation.h
r178 r187 42 42 #include <boost/asio/ip/tcp.hpp> 43 43 #include <boost/asio/deadline_timer.hpp> 44 #include <boost/asio/io_service.hpp> 44 45 #include <set> 45 46 -
internal/tPeerImplementation.h
r186 r187 40 40 // External includes (system with <>, local with "") 41 41 //---------------------------------------------------------------------- 42 #include <map>43 42 #include <boost/asio/ip/tcp.hpp> 44 43 #include <boost/asio/deadline_timer.hpp> 45 44 #include <boost/asio/io_service.hpp> 45 #include <set> 46 46 47 #include "rrlib/thread/tThread.h" 47 #include "core/tFrameworkElement.h"48 48 49 49 //---------------------------------------------------------------------- 50 50 // Internal includes with "" 51 51 //---------------------------------------------------------------------- 52 #include "plugins/network_transport/structure_info/tFrameworkElementInfo.h"53 #include "plugins/tcp/common/tRemoteTypes.h"54 //#include "plugins/tcp/internal/tNetworkPortInfo.h"55 52 #include "plugins/tcp/internal/tPeerInfo.h" 56 #include "plugins/tcp/internal/tPortBufferChangeEvent.h"57 #include "plugins/tcp/internal/tSerializedStructureChange.h"58 53 59 54 //---------------------------------------------------------------------- … … 70 65 // Forward declarations / typedefs / enums 71 66 //---------------------------------------------------------------------- 72 73 class tPlugin;74 67 class tServer; 68 class tConnection; 75 69 76 70 //---------------------------------------------------------------------- … … 81 75 * Implementation of different variants of TCP peer. 82 76 */ 83 class tPeerImplementation : public core::tRuntimeListener77 class tPeerImplementation : public tTCPPlugin 84 78 { 85 79 86 typedef rrlib::buffer_pools::tBufferPool < tPortBufferChangeEvent, rrlib::concurrent_containers::tConcurrency::MULTIPLE_READERS,87 rrlib::buffer_pools::management::QueueBased, rrlib::buffer_pools::deleting::CollectGarbage,88 rrlib::buffer_pools::recycling::UseOwnerStorageInBuffer > tPortBufferChangeEventPool;89 90 80 //---------------------------------------------------------------------- 91 81 // Public methods and typedefs 92 82 //---------------------------------------------------------------------- 93 83 public: 94 95 typedef typename tPortBufferChangeEventPool::tPointer tChangeEventPointer;96 84 97 85 /*! … … 99 87 * \param Options for peer creation 100 88 */ 101 tPeerImplementation( core::tFrameworkElement& framework_element, const tOptions& options);89 tPeerImplementation(); 102 90 103 91 /*! Shuts peer down */ … … 111 99 /*! Starts actively connecting to the specified network */ 112 100 void Connect(); 113 114 /*!115 * Connect local port to port in remote runtime environment using TCP network transport plugin.116 *117 * \param local_port Local port to connect118 * \param remote_runtime_uuid UUID of remote runtime119 * \param remote_port_handle Handle of remote port120 * \param remote_port_link Link of port in remote runtime environment121 * \param disconnect If 'false' the ports are connected - if 'true' the ports are disconnected122 * \return Returns error message if connecting failed. On success an empty string is returned.123 */124 std::string Connect(core::tAbstractPort& local_port, const std::string& remote_runtime_uuid, int remote_port_handle, const std::string remote_port_link, bool disconnect);125 101 126 102 /*! … … 148 124 149 125 /*! 150 * Gets remote partwith specified UUID.151 * If no such parthas been registered yet, creates a new one.126 * Obtains remote runtime with specified UUID. 127 * If no such runtime has been registered yet, creates a new one. 152 128 * 153 * \param uuid UUID of remote part129 * \param uuid UUID of remote runtime 154 130 * \param peer_type Peer type 155 131 * \param peer_name Name of peer. Will be displayed in tooling and status messages. Does not need to be unique. Typically the program/process name. 156 * \param address IP address of remote part132 * \param address IP address of remote runtime 157 133 * \param never_forget Is this a remote peer to never forget? 158 * \return Pointer to remote part 159 */ 160 tRemotePart* GetRemotePart(const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget); 134 * \return Pointer to remote runtime 135 */ 136 network_transport::generic_protocol::tRemoteRuntime* GetOrCreateRemoteRuntime(std::shared_ptr<tConnection>& connection, const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget); 137 138 virtual network_transport::generic_protocol::tRemoteRuntime* GetRemoteRuntime(const network_transport::generic_protocol::tUriConnectorData& connector) override; 161 139 162 140 /*! … … 168 146 } 169 147 148 virtual void Init(tXmlNode* config_node) override; 149 170 150 /*! 171 151 * Reference to Boost asio IO service … … 174 154 { 175 155 return *io_service; 176 }177 178 void PortChanged(data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject>& value, tNetworkPortInfo* info, data_ports::tChangeContext& change_context)179 {180 tChangeEventPointer event_buffer = port_buffer_change_event_buffers.GetUnusedBuffer();181 if (!event_buffer)182 {183 event_buffer = port_buffer_change_event_buffers.AddBuffer(std::unique_ptr<tPortBufferChangeEvent>(new tPortBufferChangeEvent()));184 }185 event_buffer->new_value = std::move(value);186 event_buffer->network_port_info = info;187 event_buffer->change_type = change_context.ChangeType();188 incoming_port_buffer_changes.Enqueue(event_buffer);189 156 } 190 157 … … 202 169 203 170 /*! 171 * Processes incoming structure changes from local runtime 172 */ 173 void ProcessLocalRuntimeStructureChanges() 174 { 175 tTCPPlugin::ProcessLocalRuntimeStructureChanges(); 176 } 177 178 /*! 204 179 * Called in a regular interval to do things like establishing new connections 205 180 */ 206 181 void ProcessLowPriorityTasks(); 207 208 /*!209 * Processes all enqueued runtime change events in TCP thread210 * and distributes them to all connections that are interested.211 */212 void ProcessRuntimeChangeEvents();213 182 214 183 /*! … … 219 188 220 189 /*! 221 * Serializes shared ports and returns that in memory buffer222 *223 * \param connection_type_encoder Type encoder object of connection to serialize shared ports for224 */225 rrlib::serialization::tMemoryBuffer SerializeSharedPorts(common::tRemoteTypes& connection_type_encoder);226 227 228 /*!229 * \return True as soon as peer also serves clients interested in complete application structure230 */231 bool ServesStructure() const232 {233 return serve_structure.load();234 }235 236 /*!237 190 * Mark the peer list as changed. 238 191 * This will cause the peer list to be sent to all connected peers. … … 248 201 void StartServer(); 249 202 250 void StartServingStructure() 251 { 252 serve_structure.store(true); 203 /*! 204 * \return Unused buffer for initialization of prototype streams 205 */ 206 rrlib::serialization::tStackMemoryBuffer<16>& UnusedInitializationBuffer() 207 { 208 return unused_initialization_buffer; 253 209 } 254 210 … … 258 214 private: 259 215 216 friend class tTCPPluginInstance; 260 217 friend class tServer; 261 218 friend class tTCPThread; … … 269 226 friend struct tConnectorTask; 270 227 271 /*! Framework element associated with server */ 272 core::tFrameworkElement& framework_element; 273 274 /*! Options that peer was created with */ 275 tOptions create_options; 276 277 /*! Vector containing all network addresses this peer should try to connect to */ 228 /*! Unused buffer for initialization of prototype streams (must not be destroyed before connections are) */ 229 rrlib::serialization::tStackMemoryBuffer<16> unused_initialization_buffer; 230 231 /*! Vector containing all network addresses this peer should currently try to connect to */ 278 232 std::vector<std::string> connect_to; 233 234 /*! Vector containing all network addresses this peer already connected to (should not be added to connect_to again) */ 235 std::set<std::string> connected_to; 279 236 280 237 /*! Info on this peer */ … … 286 243 * connection to remote part is established. 287 244 */ 288 std::vector<std:: unique_ptr<tPeerInfo>> other_peers;245 std::vector<std::shared_ptr<tPeerInfo>> other_peers; 289 246 290 247 /*! Revision of peer information */ … … 306 263 tServer* server; 307 264 308 /*! Cached info on ports shared by this peer */309 std::map<core::tFrameworkElement::tHandle, rrlib::serialization::tFixedBuffer> shared_ports;310 311 /*! Mutex for 'shared_ports' access */312 rrlib::thread::tMutex shared_ports_mutex;313 314 /*!315 * True as soon as peer also serves clients interested in complete application structure.316 * This may be enabled later, as this can cause quite a lot of overhead if done during317 * construction and startup of large (MCA2) applications.318 */319 std::atomic<bool> serve_structure;320 321 /*!322 * Concurrent queue with incoming structure changes.323 * Queue is filled when runtime changes occur (structure mutex is acquired by this thread).324 * Queue is processed when TCP thread calls ProcessIncomingStructureChanges()325 */326 rrlib::concurrent_containers::tQueue < std::unique_ptr<tSerializedStructureChange>, rrlib::concurrent_containers::tConcurrency::SINGLE_READER_AND_WRITER,327 rrlib::concurrent_containers::tDequeueMode::ALL > incoming_structure_changes;328 329 265 /*! 330 266 * Actively connect to specified network? … … 333 269 bool actively_connect; 334 270 335 /*! Ports to check subscriptions for */336 std::vector<tFrameworkElementHandle> pending_subscription_checks;337 338 /*! Mutex for 'pending_subscription_checks' access */339 rrlib::thread::tMutex pending_subscription_checks_mutex;340 341 /*! Copy for TCP thread */342 std::vector<tFrameworkElementHandle> pending_subscription_checks_copy;343 344 271 /*! True when event loop is running (ProcessEvents() is regularly called by TCP thread) */ 345 272 bool event_loop_running; 346 347 /*! Concurrent queue with incoming port value changes */348 rrlib::concurrent_containers::tQueue < tChangeEventPointer, rrlib::concurrent_containers::tConcurrency::MULTIPLE_WRITERS,349 rrlib::concurrent_containers::tDequeueMode::ALL > incoming_port_buffer_changes;350 351 /*! Buffer pool with port value change event buffers */352 tPortBufferChangeEventPool port_buffer_change_event_buffers;353 354 /*! List with deleted RPC ports - so that buffer pools created in connection for these ports can be deleted */355 std::vector<core::tFrameworkElement::tHandle> deleted_rpc_ports;356 357 /*! Mutex for 'deleted_rpc_ports' access */358 rrlib::thread::tMutex deleted_rpc_ports_mutex;359 273 360 274 … … 368 282 void AddPeerAddresses(tPeerInfo& existing_peer, const std::vector<boost::asio::ip::address>& addresses); 369 283 284 virtual bool Create(core::tAbstractPort& owner_port, const rrlib::uri::tURI& uri, const rrlib::uri::tURIElements& uri_elements, const core::tUriConnectOptions& connect_options) override; 285 370 286 /*! 371 287 * Scans current peer list and adds missing addresses … … 374 290 void InferMissingAddresses(); 375 291 376 /*! Is provided element a shared port (to be announced to other peers)? */ 377 static bool IsSharedPort(core::tFrameworkElement& framework_element); 378 379 virtual void OnEdgeChange(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target) override; 380 virtual void OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element) override; 381 382 /*! Handles runtime changes from callbacks - and forwards info to 'incoming_structure_changes' queue */ 383 void ProcessRuntimeChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element, bool edge_change); 292 virtual void OnStartServingStructure() override; 384 293 385 294 /*! … … 393 302 void StartThread(); 394 303 395 /*!396 * Called whenever a new edge is added and updates network connection info if necessary397 */398 void UpdateNetworkConnectionInfo(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target, bool& target_port_changed);399 304 }; 400 305
Note: See TracChangeset
for help on using the changeset viewer.