source: finroc_plugins_tcp/internal/tPeerImplementation.cpp @ 155:587c109f7f6a

17.03
Last change on this file since 155:587c109f7f6a was 155:587c109f7f6a, checked in by Max Reichardt <mreichardt@…>, 2 years ago

Moves TCP-independent generic message protocol to finroc_plugins_network_transport and makes TCP pluing based on this.

File size: 18.7 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/tcp/internal/tPeerImplementation.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2013-01-04
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/tcp/internal/tPeerImplementation.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "core/tRuntimeEnvironment.h"
36
37//----------------------------------------------------------------------
38// Internal includes with ""
39//----------------------------------------------------------------------
40#include "plugins/tcp/internal/tConnection.h"
41#include "plugins/tcp/internal/tServer.h"
42#include "plugins/tcp/internal/util.h"
43
44//----------------------------------------------------------------------
45// Debugging
46//----------------------------------------------------------------------
47#include <cassert>
48
49//----------------------------------------------------------------------
50// Namespace usage
51//----------------------------------------------------------------------
52
53//----------------------------------------------------------------------
54// Namespace declaration
55//----------------------------------------------------------------------
56namespace finroc
57{
58namespace tcp
59{
60namespace internal
61{
62
63//----------------------------------------------------------------------
64// Forward declarations / typedefs / enums
65//----------------------------------------------------------------------
66typedef network_transport::runtime_info::tStructureExchange tStructureExchange;
67typedef network_transport::generic_protocol::tRemoteRuntime tRemoteRuntime;
68
69//----------------------------------------------------------------------
70// Const values
71//----------------------------------------------------------------------
72
73//----------------------------------------------------------------------
74// Implementation
75//----------------------------------------------------------------------
76
77namespace
78{
79
80class tRemoteRuntimeRemover : public core::tAnnotation
81{
82public:
83  tRemoteRuntimeRemover(std::shared_ptr<tPeerInfo>& peer_pointer) :
84    peer_pointer(peer_pointer)
85  {}
86
87  virtual void OnManagedDelete() override
88  {
89    if (peer_pointer->remote_runtime)
90    {
91      FINROC_LOG_PRINT(DEBUG, "Disconnected from ", peer_pointer->remote_runtime->GetName());
92      peer_pointer->remote_runtime = nullptr;
93    }
94  }
95
96  std::shared_ptr<tPeerInfo>& peer_pointer;
97};
98
99boost::posix_time::milliseconds ToBoostPosixTime(const rrlib::time::tDuration& d)
100{
101  return boost::posix_time::milliseconds(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
102}
103
104}
105
106template <bool REGULAR>
107struct tProcessLowPriorityTasksCaller
108{
109  tPeerImplementation* implementation;
110
111  tProcessLowPriorityTasksCaller(tPeerImplementation& implementation) : implementation(&implementation) {}
112
113  void operator()(const boost::system::error_code& error)
114  {
115    if (!error)
116    {
117      implementation->ProcessLowPriorityTasks();
118      if (REGULAR)
119      {
120        implementation->low_priority_tasks_timer.expires_from_now(ToBoostPosixTime(implementation->par_process_low_priority_tasks_call_interval.Get()));
121        implementation->low_priority_tasks_timer.async_wait(*this);
122      }
123    }
124  }
125};
126
127struct tProcessEventsCaller
128{
129  tPeerImplementation* implementation;
130
131  tProcessEventsCaller(tPeerImplementation& implementation) : implementation(&implementation) {}
132
133  void operator()(const boost::system::error_code& error)
134  {
135    if (!error)
136    {
137      implementation->ProcessEvents();
138      implementation->event_processing_timer.expires_from_now(ToBoostPosixTime(implementation->par_process_events_call_interval.Get()));
139      implementation->event_processing_timer.async_wait(*this);
140    }
141  }
142};
143
144struct tAddressConnectorTask
145{
146  tPeerImplementation* implementation;
147  std::string connect_to;
148  std::vector<boost::asio::ip::tcp::endpoint> endpoints;
149  size_t current_endpoint_index;
150  std::shared_ptr<boost::asio::ip::tcp::socket> socket;
151
152  tAddressConnectorTask(tPeerImplementation& implementation, const std::string &connect_to) :
153    implementation(&implementation),
154    connect_to(connect_to),
155    endpoints(),
156    current_endpoint_index(0),
157    socket(new boost::asio::ip::tcp::socket(implementation.IOService()))
158  {
159    try
160    {
161      endpoints = ParseAndResolveNetworkAddress(connect_to);
162      Connect("no endpoints resolved");
163    }
164    catch (const std::exception &e)
165    {
166      FINROC_LOG_PRINT(DEBUG, "Could not connect to ", connect_to, ". Reason: ", e.what());
167      this->implementation->connect_to.push_back(connect_to);
168    }
169  }
170
171  void Connect(const char* fail_reason)
172  {
173    if (current_endpoint_index < endpoints.size())
174    {
175      socket.reset(new boost::asio::ip::tcp::socket(implementation->IOService()));
176      socket->async_connect(endpoints[current_endpoint_index], *this);
177    }
178    else
179    {
180      // put address back
181      FINROC_LOG_PRINT(DEBUG, "Could not connect to ", connect_to, ". Reason: ", fail_reason);
182      implementation->connect_to.push_back(connect_to);
183    }
184  }
185
186  void operator()(const boost::system::error_code& error)
187  {
188    if (error)
189    {
190      current_endpoint_index++;
191      Connect(error.message().c_str());
192    }
193    else
194    {
195      tConnection::TryToEstablishConnection(*implementation, socket, 0x7, NULL, true); // TODO: possibly use multiple connections
196    }
197  }
198};
199
200struct tConnectorTask
201{
202  tPeerImplementation* implementation;
203  tPeerInfo* peer_info;
204  size_t address_index;
205  std::shared_ptr<boost::asio::ip::tcp::socket> socket;
206  std::shared_ptr<tPeerInfo::tActiveConnect> active_connect_indicator;
207
208  tConnectorTask(tPeerImplementation& implementation, tPeerInfo& peer_info) :
209    implementation(&implementation),
210    peer_info(&peer_info),
211    address_index(0),
212    socket(),
213    active_connect_indicator(new tPeerInfo::tActiveConnect(peer_info))
214  {
215    if (peer_info.addresses.size() == 0)
216    {
217      implementation.InferMissingAddresses();
218    }
219    if (peer_info.addresses.size() == 0)
220    {
221      FINROC_LOG_PRINT(WARNING, "Peer info with no addresses. This is likely a programming error.");
222    }
223
224    Connect();
225  }
226
227  void Connect()
228  {
229    if (address_index < peer_info->addresses.size())
230    {
231      boost::asio::ip::address address = peer_info->addresses[address_index];
232      socket.reset(new boost::asio::ip::tcp::socket(implementation->IOService()));
233      socket->async_connect(boost::asio::ip::tcp::endpoint(address, peer_info->uuid.port), *this);
234    }
235  }
236
237  void operator()(const boost::system::error_code& error)
238  {
239    if (error)
240    {
241      // try next address
242      address_index++;
243      Connect();
244    }
245    else
246    {
247      tConnection::TryToEstablishConnection(*implementation, socket, 0x7, active_connect_indicator); // TODO: possibly use multiple connections
248    }
249  }
250};
251
252/*! Server thread that does all the work for this server (and possibly peer) */
253class tTCPThread : public rrlib::thread::tThread
254{
255public:
256
257  tTCPThread(tPeerImplementation& implementation) :
258    tThread("TCP Thread"),
259    implementation(implementation)
260  {
261  }
262
263private:
264  virtual void Run() override
265  {
266    try
267    {
268      implementation.low_priority_tasks_timer.async_wait(tProcessLowPriorityTasksCaller<true>(implementation));
269      if (implementation.server)
270      {
271        implementation.server->Run();
272      }
273      else
274      {
275        implementation.io_service->run();
276      }
277    }
278    catch (const std::exception& ex)
279    {
280      FINROC_LOG_PRINT(WARNING, "TCP thread exited with exception ", ex);
281    }
282  }
283
284  virtual void StopThread() override
285  {
286    implementation.io_service->stop();
287  }
288
289  tPeerImplementation& implementation;
290};
291
292//FIXME: remove in next Finroc version
293bool IsLoopbackAddress(const boost::asio::ip::address& address)
294{
295  return address.is_v4() ? (address.to_v4() == boost::asio::ip::address_v4::loopback()) : (address.to_v6() == boost::asio::ip::address_v6::loopback());
296}
297
298
299tPeerImplementation::tPeerImplementation() :
300  this_peer(tPeerType::UNSPECIFIED),
301  other_peers(),
302  //peer_list_revision(0),
303  peer_list_changed(false),
304  thread(),
305  io_service(new boost::asio::io_service()),
306  low_priority_tasks_timer(*io_service, boost::posix_time::milliseconds(500)),  // this is only the initial wait
307  event_processing_timer(*io_service, boost::posix_time::milliseconds(5)),
308  server(nullptr),
309  actively_connect(false),
310  event_loop_running(false)
311{
312}
313
314tPeerImplementation::~tPeerImplementation()
315{
316  io_service->stop();
317  if (thread)
318  {
319    thread->StopThread();
320    thread->Join();
321  }
322}
323
324void tPeerImplementation::AddAddress(const boost::asio::ip::address& address)
325{
326  for (auto it = this_peer.addresses.begin(); it != this_peer.addresses.end(); ++it)
327  {
328    if (*it == address)
329    {
330      return;
331    }
332  }
333
334  this_peer.addresses.push_back(address);
335// peer_list_revision++;
336  peer_list_changed = true;
337}
338
339void tPeerImplementation::AddPeerAddresses(tPeerInfo& existing_peer, const std::vector<boost::asio::ip::address>& addresses)
340{
341  for (auto & address : addresses)
342  {
343    bool found = false;
344    for (auto & existing_address : existing_peer.addresses)
345    {
346      if (existing_address == address)
347      {
348        found = true;
349        break;
350      }
351    }
352    if (!found)
353    {
354      existing_peer.addresses.push_back(address);
355      SetPeerListChanged();
356    }
357  }
358}
359
360void tPeerImplementation::Connect()
361{
362  actively_connect = true;
363  low_priority_tasks_timer.async_wait(tProcessLowPriorityTasksCaller<false>(*this)); // immediately trigger connecting
364}
365
366void tPeerImplementation::DeserializePeerInfo(rrlib::serialization::tInputStream& stream, tPeerInfo& peer)
367{
368  stream >> peer.uuid;
369  stream >> peer.peer_type;
370  stream >> peer.name;
371  int size = stream.ReadInt();
372  peer.addresses.clear();
373  for (int i = 0; i < size; ++i)
374  {
375    boost::asio::ip::address address;
376    stream >> address;
377    peer.addresses.push_back(address);
378  }
379}
380
381network_transport::generic_protocol::tRemoteRuntime* tPeerImplementation::GetRemoteRuntime(std::shared_ptr<tConnection>& connection, const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget)
382{
383  if (uuid == this->this_peer.uuid)
384  {
385    FINROC_LOG_PRINT(ERROR, "Remote part has the same UUID as this one: " + uuid.ToString());
386    throw std::runtime_error("Remote part has the same UUID as this one: " + uuid.ToString());
387  }
388
389  for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
390  {
391    if ((*it)->uuid == uuid)
392    {
393      if (!(*it)->remote_runtime)
394      {
395        (*it)->remote_runtime = new tRemoteRuntime(*this, connection, *GetPluginRootFrameworkElement(), uuid.ToString());
396        (*it)->remote_runtime->EmplaceAnnotation<tRemoteRuntimeRemover>(*it);
397        (*it)->remote_runtime->Init();
398      }
399      (*it)->AddAddress(address);
400#if 0
401      if ((*it)->AddAddress(address))
402      {
403        peer_list_revision++;
404      }
405#endif
406      (*it)->name = peer_name;
407      (*it)->never_forget |= never_forget;
408      return (*it)->remote_runtime;
409    }
410  }
411
412  other_peers.emplace_back(new tPeerInfo(peer_type));
413  tPeerInfo& info = *other_peers.back();
414  info.addresses.push_back(address);
415  info.uuid = uuid;
416  info.name = peer_name;
417  info.never_forget = never_forget;
418  info.remote_runtime = new tRemoteRuntime(*this, connection, *GetPluginRootFrameworkElement(), uuid.ToString());
419  info.remote_runtime->EmplaceAnnotation<tRemoteRuntimeRemover>(other_peers.back());
420  info.remote_runtime->Init();
421  //peer_list_revision++;
422  return info.remote_runtime;
423}
424
425void tPeerImplementation::InferMissingAddresses()
426{
427  for (auto & info : other_peers)
428  {
429    if (info->addresses.size() == 0)
430    {
431      for (auto & other_info : other_peers)
432      {
433        if (other_info != info && (other_info->uuid.host_name == info->uuid.host_name))
434        {
435          AddPeerAddresses(*info, other_info->addresses);
436        }
437      }
438    }
439  }
440}
441
442void tPeerImplementation::Init(rrlib::xml::tNode* config_node)
443{
444  tNetworkTransportPlugin::Init(config_node);
445  this_peer.peer_type = par_peer_type.Get();
446  this_peer.name = network_transport::generic_protocol::tLocalRuntimeInfo::GetName();
447
448  // Retrieve host name
449  char buffer[258];
450  if (gethostname(buffer, 257))
451  {
452    this_peer.uuid.host_name = "No host name@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count());
453    FINROC_LOG_PRINT(ERROR, "Error retrieving host name.");
454  }
455  else if (std::string(buffer) == "localhost")
456  {
457    this_peer.uuid.host_name = "localhost@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count());
458    FINROC_LOG_PRINT(ERROR, "The hostname of this system is 'localhost' (according to the hostname() function). When using the finroc_tcp plugin, this is not allowed (a unique identifier for this Finroc runtime environment is derived from the hostname). Ideally, the hostname is the name under which the system can be found in the network using DNS lookup. Otherwise, please set it to a unique name in the network. For now, the current time is appended for the uuid: '", this_peer.uuid.host_name, "'.");
459  }
460  else
461  {
462    this_peer.uuid.host_name = buffer;
463  }
464
465  // Create server
466  if (this_peer.peer_type != tPeerType::CLIENT_ONLY)
467  {
468    server = new tServer(*this);
469  }
470}
471
472void tPeerImplementation::OnStartServingStructure()
473{
474  Connect();
475  StartServer();
476}
477
478void tPeerImplementation::ProcessIncomingPeerInfo(const tPeerInfo& peer_info)
479{
480  tPeerInfo* existing_peer = nullptr;
481  if (peer_info.uuid == this_peer.uuid)
482  {
483    existing_peer = &this_peer;
484  }
485
486  for (auto & it : other_peers)
487  {
488    if (peer_info.uuid == it->uuid)
489    {
490      existing_peer = &(*it);
491    }
492  }
493
494  if (existing_peer)
495  {
496    if (existing_peer->peer_type != peer_info.peer_type)
497    {
498      FINROC_LOG_PRINT(WARNING, "Peer type of existing peer has changed, will not update it.");
499    }
500    AddPeerAddresses(*existing_peer, peer_info.addresses);
501  }
502  else
503  {
504    other_peers.emplace_back(new tPeerInfo(peer_info.peer_type));
505    tPeerInfo& info = **(other_peers.end() - 1);
506    info.addresses = peer_info.addresses;
507    info.uuid = peer_info.uuid;
508    info.name = peer_info.name;
509  }
510
511}
512
513void tPeerImplementation::ProcessEvents()
514{
515  rrlib::time::tTimestamp time_now = rrlib::time::Now();
516  ProcessLocalRuntimeCallsToSend();
517  ProcessLocalRuntimePortDataChanges();
518  ProcessLocalRuntimeStructureChanges();
519
520  // Process active connections
521  for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
522  {
523    if ((*it)->remote_runtime)
524    {
525      (*it)->remote_runtime->SendPendingMessages(time_now);
526    }
527  }
528}
529
530void tPeerImplementation::ProcessLowPriorityTasks()
531{
532  FINROC_LOG_PRINT(DEBUG_VERBOSE_2, "Alive ", rrlib::time::Now().time_since_epoch().count());
533
534  // connect to other peers
535  if (actively_connect)
536  {
537    if (par_connect_to.HasChanged())
538    {
539      auto current_connect_to = par_connect_to.GetPointer();
540      for (const std::string & address : (*current_connect_to))
541      {
542        if (connected_to.count(address) == 0)
543        {
544          connect_to.push_back(address);
545          connected_to.insert(address);
546        }
547      }
548      par_connect_to.ResetChanged();
549    }
550
551    for (auto it = connect_to.begin(); it != connect_to.end(); ++it)
552    {
553      FINROC_LOG_PRINT(DEBUG, "Connecting to ", *it);
554      tAddressConnectorTask connector_task(*this, *it);
555    }
556    connect_to.clear();
557
558    for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
559    {
560      tPeerInfo& peer = **it;
561      if ((!peer.remote_runtime) && (!peer.connecting) && (peer.peer_type != tPeerType::CLIENT_ONLY) &&
562          (par_auto_connect_to_all_peers.Get() || peer.never_forget))
563      {
564        tConnectorTask connector_task(*this, peer);
565      }
566    }
567  }
568
569  // send peer list if needed
570  if (peer_list_changed)
571  {
572    for (auto & remote_runtime : ConnectedRuntimes())
573    {
574      tConnection& connection = static_cast<tConnection&>(*remote_runtime->GetPrimaryConnection());
575      if (connection.IsReady())
576      {
577        auto& stream = connection.CurrentWriteStream();
578        network_transport::generic_protocol::tPeerInfoMessage::Serialize(false, true, stream);
579        SerializePeerInfo(stream, this_peer);
580        for (auto it = other_peers.begin(); it != other_peers.end(); ++it)
581        {
582          SerializePeerInfo(stream, **it);
583        }
584        stream.WriteBoolean(false);
585        network_transport::generic_protocol::tPeerInfoMessage::FinishMessage(stream);
586      }
587    }
588
589    peer_list_changed = false;
590  }
591}
592
593void tPeerImplementation::RunEventLoop()
594{
595  if (!event_loop_running)
596  {
597    event_loop_running = true;
598    event_processing_timer.expires_from_now(ToBoostPosixTime(par_process_events_call_interval.Get()));
599    event_processing_timer.async_wait(tProcessEventsCaller(*this));
600  }
601}
602
603void tPeerImplementation::SerializePeerInfo(rrlib::serialization::tOutputStream& stream, const tPeerInfo& peer)
604{
605  if ((&peer == &this_peer || peer.remote_runtime) && peer.peer_type != tPeerType::CLIENT_ONLY)
606  {
607    stream << true;
608    stream << peer.uuid;
609    stream << peer.peer_type;
610    stream << peer.name;
611
612    // count non-loopback addresses
613    int address_count = 0;
614    for (auto & it : peer.addresses)
615    {
616      // if (!it.is_loopback()) FIXME: replace in next Finroc version
617      if (!IsLoopbackAddress(it))
618      {
619        address_count++;
620      }
621    }
622    stream.WriteInt(address_count);
623
624    // serialize non-loopback addresses
625    for (auto & it : peer.addresses)
626    {
627      // if (!it.is_loopback()) FIXME: replace in next Finroc version
628      if (!IsLoopbackAddress(it))
629      {
630        stream << it;
631      }
632    }
633  }
634}
635
636void tPeerImplementation::StartServer()
637{
638  // Start TCP Thread
639  StartThread();
640}
641
642void tPeerImplementation::StartThread()
643{
644  assert(!thread);
645  thread = (new tTCPThread(*this))->GetSharedPtr();
646  thread->SetAutoDelete();
647  thread->Start();
648}
649
650
651//----------------------------------------------------------------------
652// End of namespace declaration
653//----------------------------------------------------------------------
654}
655}
656}
Note: See TracBrowser for help on using the repository browser.