source: finroc_plugins_tcp/internal/tPeerImplementation.h @ 164:2864638434d9

14.08
Last change on this file since 164:2864638434d9 was 164:2864638434d9, checked in by Jochen Hirth <hirth@…>, 4 years ago

Added the possiblity to connect to other parts from a running program.

File size: 14.1 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/tcp/internal/tPeerImplementation.h
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2013-01-04
27 *
28 * \brief   Contains tPeerImplementation
29 *
30 * \b tPeerImplementation
31 *
32 * Implementation of different variants of TCP peer.
33 *
34 */
35//----------------------------------------------------------------------
36#ifndef __plugins__tcp__internal__tPeerImplementation_h__
37#define __plugins__tcp__internal__tPeerImplementation_h__
38
39//----------------------------------------------------------------------
40// External includes (system with <>, local with "")
41//----------------------------------------------------------------------
42#include <map>
43#include <boost/asio/ip/tcp.hpp>
44#include <boost/asio/deadline_timer.hpp>
45#include "rrlib/thread/tThread.h"
46#include "core/tFrameworkElement.h"
47
48//----------------------------------------------------------------------
49// Internal includes with ""
50//----------------------------------------------------------------------
51#include "plugins/network_transport/structure_info/tFrameworkElementInfo.h"
52#include "plugins/tcp/common/tRemoteTypes.h"
53//#include "plugins/tcp/internal/tNetworkPortInfo.h"
54#include "plugins/tcp/internal/tPeerInfo.h"
55#include "plugins/tcp/internal/tPortBufferChangeEvent.h"
56#include "plugins/tcp/internal/tSerializedStructureChange.h"
57
58//----------------------------------------------------------------------
59// Namespace declaration
60//----------------------------------------------------------------------
61namespace finroc
62{
63namespace tcp
64{
65namespace internal
66{
67
68//----------------------------------------------------------------------
69// Forward declarations / typedefs / enums
70//----------------------------------------------------------------------
71
72class tPlugin;
73class tServer;
74
75//----------------------------------------------------------------------
76// Class declaration
77//----------------------------------------------------------------------
78//! TCP Peer implementation
79/*!
80 * Implementation of different variants of TCP peer.
81 */
82class tPeerImplementation : public core::tRuntimeListener
83{
84
85  typedef rrlib::buffer_pools::tBufferPool < tPortBufferChangeEvent, rrlib::concurrent_containers::tConcurrency::MULTIPLE_READERS,
86          rrlib::buffer_pools::management::QueueBased, rrlib::buffer_pools::deleting::CollectGarbage,
87          rrlib::buffer_pools::recycling::UseOwnerStorageInBuffer > tPortBufferChangeEventPool;
88
89//----------------------------------------------------------------------
90// Public methods and typedefs
91//----------------------------------------------------------------------
92public:
93
94  typedef typename tPortBufferChangeEventPool::tPointer tChangeEventPointer;
95
96  /*!
97   * \param framework_element Framework element associated with peer
98   * \param Options for peer creation
99   */
100  tPeerImplementation(core::tFrameworkElement& framework_element, const tOptions& options);
101
102  /*! Shuts peer down */
103  ~tPeerImplementation();
104
105  /*!
106   * Adds a address for this peer (if not already set)
107   */
108  void AddAddress(const boost::asio::ip::address& address);
109
110  /*! Starts actively connecting to the specified network */
111  void Connect();
112
113  /*!
114   * Connect local port to port in remote runtime environment using TCP network transport plugin.
115   *
116   * \param local_port Local port to connect
117   * \param remote_runtime_uuid UUID of remote runtime
118   * \param remote_port_handle Handle of remote port
119   * \param remote_port_link Link of port in remote runtime environment
120   * \param disconnect If 'false' the ports are connected - if 'true' the ports are disconnected
121   * \return Returns error message if connecting failed. On success an empty string is returned.
122   */
123  std::string Connect(core::tAbstractPort& local_port, const std::string& remote_runtime_uuid, int remote_port_handle, const std::string remote_port_link, bool disconnect);
124
125  /*!
126   * Connects to part with specified address
127   */
128  void ConnectTo(const std::string& address)
129  {
130    connect_to.push_back(address);
131  }
132
133  /*!
134   * Deserialize tPeerInfo from an input stream for peer exchange
135   * \param stream the input stream to deserialize from
136   * \param p the tPeerInfo to deserialize into
137   */
138  void DeserializePeerInfo(rrlib::serialization::tInputStream& stream, tPeerInfo& p);
139
140  /*!
141   * \return Peer info about this peer
142   */
143  const tPeerInfo& GetPeerInfo() const
144  {
145    return this_peer;
146  }
147
148  /*!
149   * Gets remote part with specified UUID.
150   * If no such part has been registered yet, creates a new one.
151   *
152   * \param uuid UUID of remote part
153   * \param peer_type Peer type
154   * \param peer_name Name of peer. Will be displayed in tooling and status messages. Does not need to be unique. Typically the program/process name.
155   * \param address IP address of remote part
156   * \param never_forget Is this a remote peer to never forget?
157   * \return Pointer to remote part
158   */
159  tRemotePart* GetRemotePart(const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget);
160
161  /*!
162   * \return Thread id of TCP thread. Null, if TCP thread does not exist
163   */
164  typename rrlib::thread::tThread::tThreadId GetTCPThreadId()
165  {
166    return thread ? thread->GetId() : 0;
167  }
168
169  /*!
170   * Reference to Boost asio IO service
171   */
172  boost::asio::io_service& IOService()
173  {
174    return *io_service;
175  }
176
177  void PortChanged(data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject>& value, tNetworkPortInfo* info, data_ports::tChangeContext& change_context)
178  {
179    tChangeEventPointer event_buffer = port_buffer_change_event_buffers.GetUnusedBuffer();
180    if (!event_buffer)
181    {
182      event_buffer = port_buffer_change_event_buffers.AddBuffer(std::unique_ptr<tPortBufferChangeEvent>(new tPortBufferChangeEvent()));
183    }
184    event_buffer->new_value = std::move(value);
185    event_buffer->network_port_info = info;
186    event_buffer->change_type = change_context.ChangeType();
187    incoming_port_buffer_changes.Enqueue(event_buffer);
188  }
189
190  /*!
191   * When there are active connections, called with high frequency (every 5 ms)
192   * to process incoming events (such as port data and structure changes)
193   */
194  void ProcessEvents();
195
196  /*!
197   * Processes tPeerInfo received e.g. from other peers
198   * \param peer_info the info to be processed
199   */
200  void ProcessIncomingPeerInfo(const tPeerInfo& peer_info);
201
202  /*!
203   * Called in a regular interval to do things like establishing new connections
204   */
205  void ProcessLowPriorityTasks();
206
207  /*!
208   * Processes all enqueued runtime change events in TCP thread
209   * and distributes them to all connections that are interested.
210   */
211  void ProcessRuntimeChangeEvents();
212
213  /*!
214   * Starts running event loop (the one that call ProcessEvents() every 5ms),
215   * unless it's already running
216   */
217  void RunEventLoop();
218
219  /*!
220   * Serializes shared ports and returns that in memory buffer
221   *
222   * \param connection_type_encoder Type encoder object of connection to serialize shared ports for
223   */
224  rrlib::serialization::tMemoryBuffer SerializeSharedPorts(common::tRemoteTypes& connection_type_encoder);
225
226
227  /*!
228   * \return True as soon as peer also serves clients interested in complete application structure
229   */
230  bool ServesStructure() const
231  {
232    return serve_structure.load();
233  }
234
235  /*!
236   * Mark the peer list as changed.
237   * This will cause the peer list to be sent to all connected peers.
238   */
239  void SetPeerListChanged()
240  {
241    peer_list_changed = true;
242  }
243
244  /*!
245   * Starts server: Peer thread is instantiated and socket is opened
246   */
247  void StartServer();
248
249  void StartServingStructure()
250  {
251    serve_structure.store(true);
252  }
253
254//----------------------------------------------------------------------
255// Private fields and methods
256//----------------------------------------------------------------------
257private:
258
259  friend class tServer;
260  friend class tTCPThread;
261  friend class tRemotePart;
262
263  template <bool REGULAR>
264  friend struct tProcessLowPriorityTasksCaller;
265  friend struct tProcessEventsCaller;
266
267  friend struct tAddressConnectorTask;
268  friend struct tConnectorTask;
269
270  /*! Framework element associated with server */
271  core::tFrameworkElement& framework_element;
272
273  /*! Options that peer was created with */
274  tOptions create_options;
275
276  /*! Vector containing all network addresses this peer should try to connect to */
277  std::vector<std::string> connect_to;
278
279  /*! Info on this peer */
280  tPeerInfo this_peer;
281
282  /*!
283   * List of network peers that can be connected to
284   * Each entry contains reference to tRemotePart instance as soon as a
285   * connection to remote part is established.
286   */
287  std::vector<std::unique_ptr<tPeerInfo>> other_peers;
288
289  /*! Revision of peer information */
290  //int32_t peer_list_revision;
291
292  /*! Set to true if peer list has changed and needs to be sent */
293  bool peer_list_changed;
294
295  /*! Primary TCP thread that does all the socket related work for this peer */
296  std::shared_ptr<rrlib::thread::tThread> thread;
297
298  /*! Boost asio IO service */
299  std::shared_ptr<boost::asio::io_service> io_service;
300
301  /*! Timers for calling ProcessLowPriorityTasks() and ProcessEvents() regularly */
302  boost::asio::deadline_timer low_priority_tasks_timer, event_processing_timer;
303
304  /*! TCP server - NULL if this is a client-only peer */
305  tServer* server;
306
307  /*! Cached info on ports shared by this peer */
308  std::map<core::tFrameworkElement::tHandle, rrlib::serialization::tFixedBuffer> shared_ports;
309
310  /*! Mutex for 'shared_ports' access */
311  rrlib::thread::tMutex shared_ports_mutex;
312
313  /*!
314   * True as soon as peer also serves clients interested in complete application structure.
315   * This may be enabled later, as this can cause quite a lot of overhead if done during
316   * construction and startup of large (MCA2) applications.
317   */
318  std::atomic<bool> serve_structure;
319
320  /*!
321   * Concurrent queue with incoming structure changes.
322   * Queue is filled when runtime changes occur (structure mutex is acquired by this thread).
323   * Queue is processed when TCP thread calls ProcessIncomingStructureChanges()
324   */
325  rrlib::concurrent_containers::tQueue < std::unique_ptr<tSerializedStructureChange>, rrlib::concurrent_containers::tConcurrency::SINGLE_READER_AND_WRITER,
326        rrlib::concurrent_containers::tDequeueMode::ALL > incoming_structure_changes;
327
328  /*!
329   * Actively connect to specified network?
330   * False initially; true after Connect() has been called
331   */
332  bool actively_connect;
333
334  /*! Ports to check subscriptions for */
335  std::vector<tFrameworkElementHandle> pending_subscription_checks;
336
337  /*! Mutex for 'pending_subscription_checks' access */
338  rrlib::thread::tMutex pending_subscription_checks_mutex;
339
340  /*! Copy for TCP thread */
341  std::vector<tFrameworkElementHandle> pending_subscription_checks_copy;
342
343  /*! True when event loop is running (ProcessEvents() is regularly called by TCP thread) */
344  bool event_loop_running;
345
346  /*! Concurrent queue with incoming port value changes */
347  rrlib::concurrent_containers::tQueue < tChangeEventPointer, rrlib::concurrent_containers::tConcurrency::MULTIPLE_WRITERS,
348        rrlib::concurrent_containers::tDequeueMode::ALL > incoming_port_buffer_changes;
349
350  /*! Buffer pool with port value change event buffers */
351  tPortBufferChangeEventPool port_buffer_change_event_buffers;
352
353  /*! List with deleted RPC ports - so that buffer pools created in connection for these ports can be deleted */
354  std::vector<core::tFrameworkElement::tHandle> deleted_rpc_ports;
355
356  /*! Mutex for 'deleted_rpc_ports' access */
357  rrlib::thread::tMutex deleted_rpc_ports_mutex;
358
359
360  /*!
361   * Adds all the provided addresses to the specified peer info
362   * (if they have not been added already)
363   *
364   * \param existing_peer Peer to add addresses to
365   * \param addresses Addresses to check and possibly add
366   */
367  void AddPeerAddresses(tPeerInfo& existing_peer, const std::vector<boost::asio::ip::address>& addresses);
368
369  /*!
370   * Scans current peer list and adds missing addresses
371   * (e.g. if two peers have the same host, they must both have the same IP addresses)
372   */
373  void InferMissingAddresses();
374
375  /*! Is provided element a shared port (to be announced to other peers)? */
376  static bool IsSharedPort(core::tFrameworkElement& framework_element);
377
378  virtual void OnEdgeChange(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target) override;
379  virtual void OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element) override;
380
381  /*! Handles runtime changes from callbacks - and forwards info to 'incoming_structure_changes' queue */
382  void ProcessRuntimeChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element, bool edge_change);
383
384  /*!
385   * Serialize tPeerInfo to an output stream for peer exchange
386   * \param stream the output stream to serialize to
387   * \param p the tPeerInfo to be serialized
388   */
389  void SerializePeerInfo(rrlib::serialization::tOutputStream& stream, const tPeerInfo& p);
390
391  /*! Starts TCP Thread */
392  void StartThread();
393
394  /*!
395   * Called whenever a new edge is added and updates network connection info if necessary
396   */
397  void UpdateNetworkConnectionInfo(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target, bool& target_port_changed);
398};
399
400//----------------------------------------------------------------------
401// End of namespace declaration
402//----------------------------------------------------------------------
403}
404}
405}
406
407
408#endif
Note: See TracBrowser for help on using the repository browser.