source: finroc_plugins_tcp/internal/tPeerImplementation.cpp @ 165:36e703e3df51

14.08
Last change on this file since 165:36e703e3df51 was 165:36e703e3df51, checked in by Max Reichardt <max.reichardt@…>, 3 years ago

Makes TCP plugin more robust with respect to invalid network addresses to connect to (typically provided by user)

File size: 31.5 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/tcp/internal/tPeerImplementation.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2013-01-04
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/tcp/internal/tPeerImplementation.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "core/tRuntimeEnvironment.h"
36#include "plugins/network_transport/tNetworkConnections.h"
37
38//----------------------------------------------------------------------
39// Internal includes with ""
40//----------------------------------------------------------------------
41#include "plugins/tcp/internal/tNetworkPortInfo.h"
42#include "plugins/tcp/internal/tRemotePart.h"
43#include "plugins/tcp/internal/tServer.h"
44#include "plugins/tcp/internal/util.h"
45
46//----------------------------------------------------------------------
47// Debugging
48//----------------------------------------------------------------------
49#include <cassert>
50
51//----------------------------------------------------------------------
52// Namespace usage
53//----------------------------------------------------------------------
54
55//----------------------------------------------------------------------
56// Namespace declaration
57//----------------------------------------------------------------------
58namespace finroc
59{
60namespace tcp
61{
62namespace internal
63{
64
65//----------------------------------------------------------------------
66// Forward declarations / typedefs / enums
67//----------------------------------------------------------------------
68
69//----------------------------------------------------------------------
70// Const values
71//----------------------------------------------------------------------
72
73//----------------------------------------------------------------------
74// Implementation
75//----------------------------------------------------------------------
76
77const int cLOW_PRIORITY_TASK_CALL_INTERVAL = 500; // milliseconds
78const int cPROCESS_EVENTS_CALL_INTERVAL = 5; // milliseconds
79
80template <bool REGULAR>
81struct tProcessLowPriorityTasksCaller
82{
83  tPeerImplementation* implementation;
84
85  tProcessLowPriorityTasksCaller(tPeerImplementation& implementation) : implementation(&implementation) {}
86
87  void operator()(const boost::system::error_code& error)
88  {
89    if (!error)
90    {
91      implementation->ProcessLowPriorityTasks();
92      if (REGULAR)
93      {
94        implementation->low_priority_tasks_timer.expires_from_now(boost::posix_time::milliseconds(cLOW_PRIORITY_TASK_CALL_INTERVAL));
95        implementation->low_priority_tasks_timer.async_wait(*this);
96      }
97    }
98  }
99};
100
101struct tProcessEventsCaller
102{
103  tPeerImplementation* implementation;
104
105  tProcessEventsCaller(tPeerImplementation& implementation) : implementation(&implementation) {}
106
107  void operator()(const boost::system::error_code& error)
108  {
109    if (!error)
110    {
111      implementation->ProcessEvents();
112      implementation->event_processing_timer.expires_from_now(boost::posix_time::milliseconds(cPROCESS_EVENTS_CALL_INTERVAL));
113      implementation->event_processing_timer.async_wait(*this);
114    }
115  }
116};
117
118struct tAddressConnectorTask
119{
120  tPeerImplementation* implementation;
121  std::string connect_to;
122  std::vector<boost::asio::ip::tcp::endpoint> endpoints;
123  size_t current_endpoint_index;
124  std::shared_ptr<boost::asio::ip::tcp::socket> socket;
125
126  tAddressConnectorTask(tPeerImplementation& implementation, const std::string &connect_to) :
127    implementation(&implementation),
128    connect_to(connect_to),
129    endpoints(),
130    current_endpoint_index(0),
131    socket(new boost::asio::ip::tcp::socket(implementation.IOService()))
132  {
133    try
134    {
135      endpoints = ParseAndResolveNetworkAddress(connect_to);
136      Connect("no endpoints resolved");
137    }
138    catch (const std::invalid_argument &e)
139    {
140      FINROC_LOG_PRINT(ERROR, "Could not connect to invalid address '", connect_to, "'. Reason: ", e.what());
141    }
142    catch (const std::exception &e)
143    {
144      FINROC_LOG_PRINT(DEBUG, "Could not connect to ", connect_to, ". Reason: ", e.what());
145      this->implementation->connect_to.push_back(connect_to);
146    }
147  }
148
149  void Connect(const char* fail_reason)
150  {
151    if (current_endpoint_index < endpoints.size())
152    {
153      socket.reset(new boost::asio::ip::tcp::socket(implementation->IOService()));
154      socket->async_connect(endpoints[current_endpoint_index], *this);
155    }
156    else
157    {
158      // put address back
159      FINROC_LOG_PRINT(DEBUG, "Could not connect to ", connect_to, ". Reason: ", fail_reason);
160      implementation->connect_to.push_back(connect_to);
161    }
162  }
163
164  void operator()(const boost::system::error_code& error)
165  {
166    if (error)
167    {
168      current_endpoint_index++;
169      Connect(error.message().c_str());
170    }
171    else
172    {
173      FINROC_LOG_PRINT(DEBUG, "Connected to ", connect_to, " (", endpoints[current_endpoint_index].address().to_string(), ")");
174      tConnection::InitConnection(*implementation, socket, 0x7, NULL, true); // TODO: possibly use multiple connections
175    }
176  }
177};
178
179struct tConnectorTask
180{
181  tPeerImplementation* implementation;
182  tPeerInfo* peer_info;
183  size_t address_index;
184  std::shared_ptr<boost::asio::ip::tcp::socket> socket;
185  std::shared_ptr<tPeerInfo::tActiveConnect> active_connect_indicator;
186
187  tConnectorTask(tPeerImplementation& implementation, tPeerInfo& peer_info) :
188    implementation(&implementation),
189    peer_info(&peer_info),
190    address_index(0),
191    socket(),
192    active_connect_indicator(new tPeerInfo::tActiveConnect(peer_info))
193  {
194    if (peer_info.addresses.size() == 0)
195    {
196      implementation.InferMissingAddresses();
197    }
198    if (peer_info.addresses.size() == 0)
199    {
200      FINROC_LOG_PRINT(WARNING, "Peer info with no addresses. This is likely a programming error.");
201    }
202
203    Connect();
204  }
205
206  void Connect()
207  {
208    if (address_index < peer_info->addresses.size())
209    {
210      boost::asio::ip::address address = peer_info->addresses[address_index];
211      socket.reset(new boost::asio::ip::tcp::socket(implementation->IOService()));
212      socket->async_connect(boost::asio::ip::tcp::endpoint(address, peer_info->uuid.port), *this);
213    }
214  }
215
216  void operator()(const boost::system::error_code& error)
217  {
218    if (error)
219    {
220      // try next address
221      address_index++;
222      Connect();
223    }
224    else
225    {
226      tConnection::InitConnection(*implementation, socket, 0x7, active_connect_indicator); // TODO: possibly use multiple connections
227    }
228  }
229};
230
231/*! Server thread that does all the work for this server (and possibly peer) */
232class tTCPThread : public rrlib::thread::tThread
233{
234public:
235
236  tTCPThread(tPeerImplementation& implementation) :
237    tThread("TCP Thread"),
238    implementation(implementation)
239  {
240  }
241
242private:
243  virtual void Run() override
244  {
245    try
246    {
247      implementation.low_priority_tasks_timer.async_wait(tProcessLowPriorityTasksCaller<true>(implementation));
248      if (implementation.server)
249      {
250        implementation.server->Run();
251      }
252      else
253      {
254        implementation.io_service->run();
255      }
256    }
257    catch (const std::exception& ex)
258    {
259      FINROC_LOG_PRINT(WARNING, "Thread exited with exception ", ex);
260    }
261  }
262
263  virtual void StopThread() override
264  {
265    implementation.io_service->stop();
266  }
267
268  tPeerImplementation& implementation;
269};
270
271
272tPeerImplementation::tPeerImplementation(core::tFrameworkElement& framework_element, const tOptions& options) :
273  framework_element(framework_element),
274  create_options(options),
275  connect_to(),
276  this_peer(options.peer_type),
277  other_peers(),
278  //peer_list_revision(0),
279  peer_list_changed(false),
280  thread(),
281  io_service(new boost::asio::io_service()),
282  low_priority_tasks_timer(*io_service, boost::posix_time::milliseconds(cLOW_PRIORITY_TASK_CALL_INTERVAL)),
283  event_processing_timer(*io_service, boost::posix_time::milliseconds(5)),
284  server(NULL),
285  shared_ports(),
286  shared_ports_mutex(),
287  serve_structure(false),
288  incoming_structure_changes(),
289  actively_connect(false),
290  pending_subscription_checks(),
291  pending_subscription_checks_mutex(),
292  pending_subscription_checks_copy(),
293  event_loop_running(false),
294  incoming_port_buffer_changes(),
295  port_buffer_change_event_buffers(),
296  deleted_rpc_ports(),
297  deleted_rpc_ports_mutex()
298{
299  // initialize and adjust TCP settings
300  tSettings::GetInstance().critical_ping_threshold.Set(options.critical_ping_threshold);
301  tSettings::GetInstance().max_not_acknowledged_packets_bulk.Set(options.max_not_acknowledged_packets_bulk);
302  tSettings::GetInstance().max_not_acknowledged_packets_express.Set(options.max_not_acknowledged_packets_express);
303  tSettings::GetInstance().min_update_interval_bulk.Set(options.min_update_interval_bulk);
304  tSettings::GetInstance().min_update_interval_express.Set(options.min_update_interval_express);
305
306  this_peer.name = options.peer_name;
307
308  // Retrieve host name
309  char buffer[258];
310  if (gethostname(buffer, 257))
311  {
312    this_peer.uuid.host_name = "No host name@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count());
313    FINROC_LOG_PRINT(ERROR, "Error retrieving host name.");
314  }
315  else if (std::string(buffer) == "localhost")
316  {
317    this_peer.uuid.host_name = "localhost@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count());
318    FINROC_LOG_PRINT(ERROR, "The hostname of this system is 'localhost' (according to the hostname() function). When using the finroc_tcp plugin, this is not allowed (a unique identifier for this Finroc runtime environment is derived from the hostname). Ideally, the hostname is the name under which the system can be found in the network using DNS lookup. Otherwise, please set it to a unique name in the network. For now, the current time is appended for the uuid: '", this_peer.uuid.host_name, "'.");
319  }
320  else
321  {
322    this_peer.uuid.host_name = buffer;
323  }
324
325  // Create server
326  if (options.peer_type != tPeerType::CLIENT_ONLY)
327  {
328    server = new tServer(*this);
329  }
330}
331
332tPeerImplementation::~tPeerImplementation()
333{
334  io_service->stop();
335  if (thread)
336  {
337    thread->StopThread();
338    thread->Join();
339  }
340
341  core::tRuntimeEnvironment::GetInstance().RemoveListener(*this);
342}
343
344void tPeerImplementation::AddAddress(const boost::asio::ip::address& address)
345{
346  for (auto it = this_peer.addresses.begin(); it != this_peer.addresses.end(); ++it)
347  {
348    if (*it == address)
349    {
350      return;
351    }
352  }
353
354  this_peer.addresses.push_back(address);
355// peer_list_revision++;
356  peer_list_changed = true;
357}
358
359void tPeerImplementation::AddPeerAddresses(tPeerInfo& existing_peer, const std::vector<boost::asio::ip::address>& addresses)
360{
361  for (auto & address : addresses)
362  {
363    bool found = false;
364    for (auto & existing_address : existing_peer.addresses)
365    {
366      if (existing_address == address)
367      {
368        found = true;
369        break;
370      }
371    }
372    if (!found)
373    {
374      existing_peer.addresses.push_back(address);
375      SetPeerListChanged();
376    }
377  }
378}
379
380void tPeerImplementation::Connect()
381{
382  actively_connect = true;
383
384  for (const std::string & address : create_options.connect_to)
385  {
386    connect_to.push_back(address);
387  }
388
389  low_priority_tasks_timer.async_wait(tProcessLowPriorityTasksCaller<false>(*this)); // immediately trigger connecting
390}
391
392std::string tPeerImplementation::Connect(core::tAbstractPort& local_port, const std::string& remote_runtime_uuid,
393    int remote_port_handle, const std::string remote_port_link, bool disconnect)
394{
395  for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
396  {
397    if ((*it)->remote_part && (*it)->uuid.ToString() == remote_runtime_uuid)
398    {
399      tRemotePart& part = *((*it)->remote_part);
400      auto remote_port = part.remote_port_map.find(remote_port_handle);
401      if (remote_port != part.remote_port_map.end())
402      {
403        if (!disconnect)
404        {
405          local_port.ConnectTo(remote_port->second->GetQualifiedLink(), core::tAbstractPort::tConnectDirection::AUTO, true);
406          if (!local_port.IsConnectedTo(*remote_port->second))
407          {
408            return "Could not connect ports (see console output for reasons)";
409          }
410          return "";
411        }
412        else
413        {
414          local_port.DisconnectFrom(remote_port->second->GetQualifiedLink());
415          if (local_port.IsConnectedTo(*remote_port->second))
416          {
417            return "Could not disconnect ports (see console output for reasons)";
418          }
419          return "";
420        }
421      }
422      else
423      {
424        return "No remote port with handle " + std::to_string(remote_port_handle) + "found";
425      }
426    }
427  }
428  return "No remote runtime with UUID " + remote_runtime_uuid + " found";
429}
430
431
432void tPeerImplementation::DeserializePeerInfo(rrlib::serialization::tInputStream& stream, tPeerInfo& peer)
433{
434  stream >> peer.uuid;
435  stream >> peer.peer_type;
436  stream >> peer.name;
437  int size = stream.ReadInt();
438  peer.addresses.clear();
439  for (int i = 0; i < size; ++i)
440  {
441    boost::asio::ip::address address;
442    stream >> address;
443    peer.addresses.push_back(address);
444  }
445}
446
447tRemotePart* tPeerImplementation::GetRemotePart(const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget)
448{
449  if (uuid == this->this_peer.uuid)
450  {
451    FINROC_LOG_PRINT(ERROR, "Remote part has the same UUID as this one: " + uuid.ToString());
452    throw std::runtime_error("Remote part has the same UUID as this one: " + uuid.ToString());
453  }
454
455  for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
456  {
457    if ((*it)->uuid == uuid)
458    {
459      if (!(*it)->remote_part)
460      {
461        (*it)->remote_part = new tRemotePart(**it, framework_element, *this);
462        (*it)->remote_part->Init();
463      }
464      (*it)->AddAddress(address);
465#if 0
466      if ((*it)->AddAddress(address))
467      {
468        peer_list_revision++;
469      }
470#endif
471      (*it)->name = peer_name;
472      (*it)->never_forget |= never_forget;
473      return (*it)->remote_part;
474    }
475  }
476
477  other_peers.emplace_back(new tPeerInfo(peer_type));
478  tPeerInfo& info = **(other_peers.end() - 1);
479  info.addresses.push_back(address);
480  info.uuid = uuid;
481  info.name = peer_name;
482  info.never_forget = never_forget;
483  info.remote_part = new tRemotePart(info, framework_element, *this);
484  info.remote_part->Init();
485  //peer_list_revision++;
486  return info.remote_part;
487}
488
489void tPeerImplementation::InferMissingAddresses()
490{
491  for (auto & info : other_peers)
492  {
493    if (info->addresses.size() == 0)
494    {
495      for (auto & other_info : other_peers)
496      {
497        if (other_info != info && (other_info->uuid.host_name == info->uuid.host_name))
498        {
499          AddPeerAddresses(*info, other_info->addresses);
500        }
501      }
502    }
503  }
504}
505
506bool tPeerImplementation::IsSharedPort(core::tFrameworkElement& framework_element)
507{
508  return framework_element.IsPort() && framework_element.GetFlag(core::tFrameworkElement::tFlag::SHARED) &&
509         (!framework_element.GetFlag(core::tFrameworkElement::tFlag::NETWORK_ELEMENT));
510}
511
512void tPeerImplementation::OnEdgeChange(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target)
513{
514  // Maintain network connection info for finstruct
515  bool target_port_changed = false;
516  UpdateNetworkConnectionInfo(change_type, source, target, target_port_changed);
517
518  // Forward change to clients
519  if (source.IsReady())
520  {
521    ProcessRuntimeChange(core::tRuntimeListener::tEvent::CHANGE, source, true);
522  }
523  if (target.IsReady() && target_port_changed)
524  {
525    ProcessRuntimeChange(core::tRuntimeListener::tEvent::CHANGE, target, true);
526  }
527
528  // Check subscriptions?
529  if (source.IsReady())
530  {
531    tNetworkPortInfo* network_port_info = source.GetAnnotation<tNetworkPortInfo>();
532    if (network_port_info)
533    {
534      network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex);
535    }
536  }
537  if (target.IsReady())
538  {
539    tNetworkPortInfo* network_port_info = target.GetAnnotation<tNetworkPortInfo>();
540    if (network_port_info)
541    {
542      network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex);
543    }
544  }
545}
546
547void tPeerImplementation::OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element)
548{
549  // Maintain network connection info for finstruct
550  if (change_type == core::tRuntimeListener::ADD && element.IsPort())
551  {
552    core::tAbstractPort& port = static_cast<core::tAbstractPort&>(element);
553    for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it)
554    {
555      bool target_port_changed = false;
556      UpdateNetworkConnectionInfo(change_type, port, *it, target_port_changed);
557    }
558  }
559
560  ProcessRuntimeChange(change_type, element, false);
561
562  // Check subscriptions?
563  if (change_type == core::tRuntimeListener::tEvent::CHANGE)
564  {
565    tNetworkPortInfo* network_port_info = element.GetAnnotation<tNetworkPortInfo>();
566    if (network_port_info)
567    {
568      network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex);
569    }
570  }
571  if (change_type == core::tRuntimeListener::tEvent::ADD && element.IsPort())
572  {
573    // Check for any connected remote destination ports:
574    // Network input port may already be initialized, while local connected output port is initialized now.
575    // => Trigger subscription check in this case
576    core::tAbstractPort& port = static_cast<core::tAbstractPort&>(element);
577    for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it)
578    {
579      tNetworkPortInfo* network_port_info = it->GetAnnotation<tNetworkPortInfo>();
580      if (network_port_info)
581      {
582        network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex);
583      }
584    }
585  }
586
587
588  // RPC port deletion?
589  if (change_type == core::tRuntimeListener::tEvent::REMOVE && element.IsPort() &&
590      rpc_ports::IsRPCType(static_cast<core::tAbstractPort&>(element).GetDataType()))
591  {
592    rrlib::thread::tLock lock(deleted_rpc_ports_mutex);
593    deleted_rpc_ports.push_back(element.GetHandle());
594  }
595}
596
597void tPeerImplementation::ProcessIncomingPeerInfo(const tPeerInfo& peer_info)
598{
599  tPeerInfo* existing_peer = NULL;
600  if (peer_info.uuid == this_peer.uuid)
601  {
602    existing_peer = &this_peer;
603  }
604
605  for (auto & it : other_peers)
606  {
607    if (peer_info.uuid == it->uuid)
608    {
609      existing_peer = &(*it);
610    }
611  }
612
613  if (existing_peer)
614  {
615    if (existing_peer->peer_type != peer_info.peer_type)
616    {
617      FINROC_LOG_PRINT(WARNING, "Peer type of existing peer has changed, will not update it.");
618    }
619    AddPeerAddresses(*existing_peer, peer_info.addresses);
620  }
621  else
622  {
623    other_peers.emplace_back(new tPeerInfo(peer_info.peer_type));
624    tPeerInfo& info = **(other_peers.end() - 1);
625    info.addresses = peer_info.addresses;
626    info.uuid = peer_info.uuid;
627    info.name = peer_info.name;
628  }
629
630}
631
632void tPeerImplementation::ProcessEvents()
633{
634  rrlib::time::tTimestamp time_now = rrlib::time::Now();
635  //FINROC_LOG_PRINT(DEBUG, "Called");
636
637  // Process incoming structure changes
638  ProcessRuntimeChangeEvents();
639
640  // Process pending subscription checks
641  {
642    rrlib::thread::tLock lock(pending_subscription_checks_mutex);
643    pending_subscription_checks_copy = pending_subscription_checks;
644    pending_subscription_checks.clear();
645  }
646  for (auto it = pending_subscription_checks_copy.begin(); it != pending_subscription_checks_copy.end(); ++it)
647  {
648    core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(*it);
649    if (port && port->IsReady())
650    {
651      tNetworkPortInfo* network_port_info = port->GetAnnotation<tNetworkPortInfo>();
652      if (network_port_info)
653      {
654        network_port_info->DoSubscriptionCheck();
655      }
656    }
657  }
658
659  // Process incoming port buffer changes
660  rrlib::concurrent_containers::tQueueFragment<tChangeEventPointer> port_changes = incoming_port_buffer_changes.DequeueAll();
661  while (!port_changes.Empty())
662  {
663    tChangeEventPointer change_event = port_changes.PopFront();
664    change_event->network_port_info->ProcessIncomingBuffer(change_event);
665  }
666
667  // Process active connections
668  for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
669  {
670    if ((*it)->remote_part)
671    {
672      (*it)->remote_part->SendPendingMessages(time_now);
673    }
674  }
675}
676
677void tPeerImplementation::ProcessLowPriorityTasks()
678{
679  FINROC_LOG_PRINT(DEBUG_VERBOSE_2, "Alive ", rrlib::time::Now().time_since_epoch().count());
680
681  // delete buffer pools created for RPC ports
682  std::vector<core::tFrameworkElement::tHandle> deleted_ports;
683  {
684    rrlib::thread::tLock lock(deleted_rpc_ports_mutex);
685    if (!deleted_rpc_ports.empty())
686    {
687      std::swap(deleted_ports, deleted_rpc_ports); // Move deleted ports to local variable and unlock
688    }
689  }
690  if (!deleted_ports.empty())
691  {
692    for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
693    {
694      tPeerInfo& peer = **it;
695      if (peer.remote_part)
696      {
697        peer.remote_part->RpcPortsDeleted(deleted_ports);
698      }
699    }
700  }
701
702  // connect to other peers
703  if (actively_connect)
704  {
705    std::vector<std::string> connect_to_copy = std::move(connect_to);
706    for (auto & address : connect_to_copy)
707    {
708      FINROC_LOG_PRINT(DEBUG, "Connecting to ", address);
709      tAddressConnectorTask connector_task(*this, address);
710    }
711
712    for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
713    {
714      tPeerInfo& peer = **it;
715      if ((!peer.connected) && (!peer.connecting) && (peer.peer_type != tPeerType::CLIENT_ONLY) &&
716          (create_options.auto_connect_to_all_peers || peer.never_forget))
717      {
718        tConnectorTask connector_task(*this, peer);
719      }
720    }
721  }
722
723  // send peer list if needed
724  if (peer_list_changed)
725  {
726    for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
727    {
728      tPeerInfo& peer = **it;
729      tRemotePart *remote_part = peer.remote_part;
730      if (remote_part && peer.connected)
731      {
732        std::shared_ptr<tConnection> management_connection = remote_part->GetManagementConnection();
733        if (management_connection && management_connection->IsReady())
734        {
735
736          tPeerInfoMessage::Serialize(false, management_connection->CurrentWriteStream());
737
738          SerializePeerInfo(management_connection->CurrentWriteStream(), this_peer);
739
740          for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
741          {
742            SerializePeerInfo(management_connection->CurrentWriteStream(), **it);
743          }
744
745          management_connection->CurrentWriteStream().WriteBoolean(false);
746          tPeerInfoMessage::FinishMessage(management_connection->CurrentWriteStream());
747
748        }
749      }
750    }
751
752    peer_list_changed = false;
753  }
754}
755
756void tPeerImplementation::ProcessRuntimeChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element, bool edge_change)
757{
758  bool shared_port = IsSharedPort(element);
759  bool serve_structure_copy = serve_structure.load();
760
761  bool relevant_for_shared_port_client = shared_port && (!edge_change);
762  bool relevant_for_structure_client = !element.GetFlag(core::tFrameworkElement::tFlag::NETWORK_ELEMENT) && (!edge_change);
763
764  if ((relevant_for_shared_port_client || serve_structure_copy) && (change_type != core::tRuntimeListener::tEvent::PRE_INIT))
765  {
766    std::unique_ptr<tSerializedStructureChange> change(new tSerializedStructureChange(change_type, element, serve_structure_copy,
767        relevant_for_shared_port_client ? network_transport::tStructureExchange::SHARED_PORTS :
768        (relevant_for_structure_client ? network_transport::tStructureExchange::COMPLETE_STRUCTURE : network_transport::tStructureExchange::FINSTRUCT)));
769
770    if (relevant_for_shared_port_client)
771    {
772      rrlib::thread::tLock lock(shared_ports_mutex, false);
773      if (change_type == core::tRuntimeListener::tEvent::ADD || change_type == core::tRuntimeListener::tEvent::CHANGE)
774      {
775        rrlib::serialization::tStackMemoryBuffer<2048> buffer;
776        rrlib::serialization::tOutputStream stream(buffer);
777        std::string string_buffer;
778        network_transport::tFrameworkElementInfo::Serialize(stream, element, network_transport::tStructureExchange::SHARED_PORTS, string_buffer);
779        stream.Flush();
780        lock.Lock();
781        shared_ports[element.GetHandle()] = CopyToNewFixedBuffer(buffer);
782      }
783      else if (change_type == core::tRuntimeListener::tEvent::REMOVE)
784      {
785        lock.Lock();
786        shared_ports.erase(element.GetHandle());
787      }
788      incoming_structure_changes.Enqueue(std::move(change)); // do this with lock - to avoid inconsistencies
789    }
790    else
791    {
792      //FINROC_LOG_PRINT(DEBUG, "Enqueuing ", change.get(), " ", element.GetQualifiedName());
793      incoming_structure_changes.Enqueue(std::move(change));
794    }
795  }
796}
797
798void tPeerImplementation::ProcessRuntimeChangeEvents()
799{
800  rrlib::concurrent_containers::tQueueFragment<std::unique_ptr<tSerializedStructureChange>> incoming_structure_changes_fragment = incoming_structure_changes.DequeueAll();
801  while (!incoming_structure_changes_fragment.Empty())
802  {
803    std::unique_ptr<tSerializedStructureChange> incoming_structure_change = incoming_structure_changes_fragment.PopFront();
804    //FINROC_LOG_PRINT(DEBUG, "Dequeuing ", incoming_structure_change.get());
805    for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
806    {
807      if ((*it)->remote_part)
808      {
809        if (static_cast<size_t>((*it)->remote_part->GetDesiredStructureInfo()) >= static_cast<size_t>(incoming_structure_change->MinimumRelevantLevel()))
810        {
811          (*it)->remote_part->SendStructureChange(*incoming_structure_change);
812        }
813      }
814    }
815  }
816}
817
818void tPeerImplementation::RunEventLoop()
819{
820  if (!event_loop_running)
821  {
822    event_loop_running = true;
823    event_processing_timer.expires_from_now(boost::posix_time::milliseconds(cPROCESS_EVENTS_CALL_INTERVAL));
824    event_processing_timer.async_wait(tProcessEventsCaller(*this));
825  }
826}
827
828//FIXME: remove in next Finroc version
829bool IsLoopbackAddress(const boost::asio::ip::address& address)
830{
831  return address.is_v4() ? (address.to_v4() == boost::asio::ip::address_v4::loopback()) : (address.to_v6() == boost::asio::ip::address_v6::loopback());
832}
833
834void tPeerImplementation::SerializePeerInfo(rrlib::serialization::tOutputStream& stream, const tPeerInfo& peer)
835{
836  if ((&peer == &this_peer || peer.connected) && peer.peer_type != tPeerType::CLIENT_ONLY)
837  {
838    stream << true;
839    stream << peer.uuid;
840    stream << peer.peer_type;
841    stream << peer.name;
842
843    // count non-loopback addresses
844    int address_count = 0;
845    for (auto & it : peer.addresses)
846    {
847      // if (!it.is_loopback()) FIXME: replace in next Finroc version
848      if (!IsLoopbackAddress(it))
849      {
850        address_count++;
851      }
852    }
853    stream.WriteInt(address_count);
854
855    // serialize non-loopback addresses
856    for (auto & it : peer.addresses)
857    {
858      // if (!it.is_loopback()) FIXME: replace in next Finroc version
859      if (!IsLoopbackAddress(it))
860      {
861        stream << it;
862      }
863    }
864  }
865}
866
867rrlib::serialization::tMemoryBuffer tPeerImplementation::SerializeSharedPorts(common::tRemoteTypes& connection_type_encoder)
868{
869  rrlib::thread::tLock lock(shared_ports_mutex);
870  ProcessRuntimeChangeEvents();  // to make sure we don't get any shared port events twice
871  rrlib::serialization::tMemoryBuffer buffer(shared_ports.size() * 200);
872  rrlib::serialization::tOutputStream stream(buffer, connection_type_encoder);
873  stream.WriteInt(0); // Placeholder for size
874  stream << rrlib::rtti::tDataType<std::string>(); // write a data type for initialization
875  for (auto it = shared_ports.begin(); it != shared_ports.end(); ++it)
876  {
877    stream << it->first;
878    stream.Write(it->second);
879  }
880  stream.WriteInt(0); // size of next packet
881  stream.Close();
882  buffer.GetBuffer().PutInt(0, buffer.GetSize() - 8);
883  return buffer;
884}
885
886void tPeerImplementation::StartServer()
887{
888  core::tRuntimeEnvironment::GetInstance().AddListener(*this);
889
890  // Collect existing shared ports and store serialized information about them
891  rrlib::serialization::tStackMemoryBuffer<2048> buffer;
892  rrlib::serialization::tOutputStream stream(buffer);
893  std::string string_buffer;
894  enum { cPORT_BUFFER_SIZE = 2048 };
895  core::tAbstractPort* port_buffer[cPORT_BUFFER_SIZE];
896  typename core::tFrameworkElement::tHandle start_handle = 0;
897  while (true)
898  {
899    size_t port_count = core::tRuntimeEnvironment::GetInstance().GetAllPorts(port_buffer, cPORT_BUFFER_SIZE, start_handle);
900    for (size_t i = 0; i < port_count; i++)
901    {
902      core::tAbstractPort& port = *port_buffer[i];
903      if (IsSharedPort(port))
904      {
905        stream.Reset();
906        network_transport::tFrameworkElementInfo::Serialize(stream, port, network_transport::tStructureExchange::SHARED_PORTS, string_buffer);
907        stream.Flush();
908        shared_ports.insert(std::pair<core::tFrameworkElement::tHandle, rrlib::serialization::tFixedBuffer>(port.GetHandle(), CopyToNewFixedBuffer(buffer)));
909      }
910    }
911    if (port_count < cPORT_BUFFER_SIZE)
912    {
913      break;
914    }
915    start_handle = port_buffer[cPORT_BUFFER_SIZE - 1]->GetHandle() + 1;
916  };
917
918  // Start TCP Thread
919  StartThread();
920}
921
922void tPeerImplementation::StartThread()
923{
924  assert(!thread);
925  thread = (new tTCPThread(*this))->GetSharedPtr();
926  thread->SetAutoDelete();
927  thread->Start();
928}
929
930void tPeerImplementation::UpdateNetworkConnectionInfo(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target, bool& target_port_changed)
931{
932  tNetworkPortInfo* target_port_info = target.GetAnnotation<tNetworkPortInfo>();
933  bool destination_is_source = false;
934  core::tAbstractPort* connection_annotated = &source;
935  if ((!target_port_info) || target_port_info->IsServerPort())
936  {
937    target_port_info = source.GetAnnotation<tNetworkPortInfo>();
938    destination_is_source = true;
939    connection_annotated = &target;
940  }
941
942  if (target_port_info && (!target_port_info->IsServerPort()))
943  {
944    target_port_changed = destination_is_source;
945    network_transport::tNetworkConnections* connections_annotation = connection_annotated->GetAnnotation<network_transport::tNetworkConnections>();
946    if (change_type == core::tRuntimeListener::tEvent::ADD)
947    {
948      if (!connections_annotation)
949      {
950        connections_annotation = &connection_annotated->EmplaceAnnotation<network_transport::tNetworkConnections>();
951      }
952      connections_annotation->Add(network_transport::tNetworkConnection(target_port_info->GetRemotePart().peer_info.uuid.ToString(), target_port_info->GetRemoteHandle(), destination_is_source));
953    }
954    else if (change_type == core::tRuntimeListener::tEvent::REMOVE && connections_annotation)
955    {
956      connections_annotation->Remove(network_transport::tNetworkConnection(target_port_info->GetRemotePart().peer_info.uuid.ToString(), target_port_info->GetRemoteHandle(), destination_is_source));
957    }
958  }
959}
960
961//----------------------------------------------------------------------
962// End of namespace declaration
963//----------------------------------------------------------------------
964}
965}
966}
Note: See TracBrowser for help on using the repository browser.