Changeset 167:b27ce10f0427 in finroc_plugins_tcp


Ignore:
Timestamp:
18.02.2019 08:31:58 (3 years ago)
Author:
Max Reichardt <mreichardt@…>
Branch:
17.03
Children:
168:47366d998abf, 169:e94a71638fc7
Parents:
166:876f4328a2da (diff), 165:36e703e3df51 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Phase:
public
Message:

Merge with 14.08

Location:
internal
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • internal/tPeerImplementation.cpp

    r165 r167  
    3434//---------------------------------------------------------------------- 
    3535#include "core/tRuntimeEnvironment.h" 
    36 #include "plugins/network_transport/tNetworkConnections.h" 
    3736 
    3837//---------------------------------------------------------------------- 
    3938// Internal includes with "" 
    4039//---------------------------------------------------------------------- 
    41 #include "plugins/tcp/internal/tNetworkPortInfo.h" 
    42 #include "plugins/tcp/internal/tRemotePart.h" 
     40#include "plugins/tcp/internal/tConnection.h" 
    4341#include "plugins/tcp/internal/tServer.h" 
    4442#include "plugins/tcp/internal/util.h" 
     
    6664// Forward declarations / typedefs / enums 
    6765//---------------------------------------------------------------------- 
     66typedef network_transport::runtime_info::tStructureExchange tStructureExchange; 
     67typedef network_transport::generic_protocol::tRemoteRuntime tRemoteRuntime; 
    6868 
    6969//---------------------------------------------------------------------- 
     
    7575//---------------------------------------------------------------------- 
    7676 
    77 const int cLOW_PRIORITY_TASK_CALL_INTERVAL = 500; // milliseconds 
    78 const int cPROCESS_EVENTS_CALL_INTERVAL = 5; // milliseconds 
     77namespace 
     78{ 
     79 
     80class tRemoteRuntimeRemover : public core::tAnnotation 
     81{ 
     82public: 
     83  tRemoteRuntimeRemover(std::shared_ptr<tPeerInfo>& peer_pointer) : 
     84    peer_pointer(peer_pointer) 
     85  {} 
     86 
     87  virtual void OnManagedDelete() override 
     88  { 
     89    if (peer_pointer->remote_runtime) 
     90    { 
     91      FINROC_LOG_PRINT(DEBUG, "Disconnected from ", peer_pointer->remote_runtime->GetName()); 
     92      peer_pointer->remote_runtime = nullptr; 
     93    } 
     94  } 
     95 
     96  std::shared_ptr<tPeerInfo> peer_pointer; 
     97}; 
     98 
     99boost::posix_time::milliseconds ToBoostPosixTime(const rrlib::time::tDuration& d) 
     100{ 
     101  return boost::posix_time::milliseconds(std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); 
     102} 
     103 
     104} 
    79105 
    80106template <bool REGULAR> 
     
    92118      if (REGULAR) 
    93119      { 
    94         implementation->low_priority_tasks_timer.expires_from_now(boost::posix_time::milliseconds(cLOW_PRIORITY_TASK_CALL_INTERVAL)); 
     120        implementation->low_priority_tasks_timer.expires_from_now(ToBoostPosixTime(implementation->par_process_low_priority_tasks_call_interval.Get())); 
    95121        implementation->low_priority_tasks_timer.async_wait(*this); 
    96122      } 
     
    110136    { 
    111137      implementation->ProcessEvents(); 
    112       implementation->event_processing_timer.expires_from_now(boost::posix_time::milliseconds(cPROCESS_EVENTS_CALL_INTERVAL)); 
     138      implementation->event_processing_timer.expires_from_now(ToBoostPosixTime(implementation->par_process_events_call_interval.Get())); 
    113139      implementation->event_processing_timer.async_wait(*this); 
    114140    } 
     
    171197    else 
    172198    { 
    173       FINROC_LOG_PRINT(DEBUG, "Connected to ", connect_to, " (", endpoints[current_endpoint_index].address().to_string(), ")"); 
    174       tConnection::InitConnection(*implementation, socket, 0x7, NULL, true); // TODO: possibly use multiple connections 
     199      tConnection::TryToEstablishConnection(*implementation, socket, 0x7, NULL, true); // TODO: possibly use multiple connections 
    175200    } 
    176201  } 
     
    224249    else 
    225250    { 
    226       tConnection::InitConnection(*implementation, socket, 0x7, active_connect_indicator); // TODO: possibly use multiple connections 
     251      tConnection::TryToEstablishConnection(*implementation, socket, 0x7, active_connect_indicator); // TODO: possibly use multiple connections 
    227252    } 
    228253  } 
     
    257282    catch (const std::exception& ex) 
    258283    { 
    259       FINROC_LOG_PRINT(WARNING, "Thread exited with exception ", ex); 
     284      FINROC_LOG_PRINT(WARNING, "TCP thread exited with exception ", ex); 
    260285    } 
    261286  } 
     
    269294}; 
    270295 
    271  
    272 tPeerImplementation::tPeerImplementation(core::tFrameworkElement& framework_element, const tOptions& options) : 
    273   framework_element(framework_element), 
    274   create_options(options), 
    275   connect_to(), 
    276   this_peer(options.peer_type), 
     296//FIXME: remove in next Finroc version 
     297bool IsLoopbackAddress(const boost::asio::ip::address& address) 
     298{ 
     299  return address.is_v4() ? (address.to_v4() == boost::asio::ip::address_v4::loopback()) : (address.to_v6() == boost::asio::ip::address_v6::loopback()); 
     300} 
     301 
     302 
     303tPeerImplementation::tPeerImplementation() : 
     304  this_peer(tPeerType::UNSPECIFIED), 
    277305  other_peers(), 
    278306  //peer_list_revision(0), 
     
    280308  thread(), 
    281309  io_service(new boost::asio::io_service()), 
    282   low_priority_tasks_timer(*io_service, boost::posix_time::milliseconds(cLOW_PRIORITY_TASK_CALL_INTERVAL)), 
     310  low_priority_tasks_timer(*io_service, boost::posix_time::milliseconds(500)),  // this is only the initial wait 
    283311  event_processing_timer(*io_service, boost::posix_time::milliseconds(5)), 
    284   server(NULL), 
    285   shared_ports(), 
    286   shared_ports_mutex(), 
    287   serve_structure(false), 
    288   incoming_structure_changes(), 
     312  server(nullptr), 
    289313  actively_connect(false), 
    290   pending_subscription_checks(), 
    291   pending_subscription_checks_mutex(), 
    292   pending_subscription_checks_copy(), 
    293   event_loop_running(false), 
    294   incoming_port_buffer_changes(), 
    295   port_buffer_change_event_buffers(), 
    296   deleted_rpc_ports(), 
    297   deleted_rpc_ports_mutex() 
    298 { 
    299   // initialize and adjust TCP settings 
    300   tSettings::GetInstance().critical_ping_threshold.Set(options.critical_ping_threshold); 
    301   tSettings::GetInstance().max_not_acknowledged_packets_bulk.Set(options.max_not_acknowledged_packets_bulk); 
    302   tSettings::GetInstance().max_not_acknowledged_packets_express.Set(options.max_not_acknowledged_packets_express); 
    303   tSettings::GetInstance().min_update_interval_bulk.Set(options.min_update_interval_bulk); 
    304   tSettings::GetInstance().min_update_interval_express.Set(options.min_update_interval_express); 
    305  
    306   this_peer.name = options.peer_name; 
    307  
    308   // Retrieve host name 
    309   char buffer[258]; 
    310   if (gethostname(buffer, 257)) 
    311   { 
    312     this_peer.uuid.host_name = "No host name@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count()); 
    313     FINROC_LOG_PRINT(ERROR, "Error retrieving host name."); 
    314   } 
    315   else if (std::string(buffer) == "localhost") 
    316   { 
    317     this_peer.uuid.host_name = "localhost@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count()); 
    318     FINROC_LOG_PRINT(ERROR, "The hostname of this system is 'localhost' (according to the hostname() function). When using the finroc_tcp plugin, this is not allowed (a unique identifier for this Finroc runtime environment is derived from the hostname). Ideally, the hostname is the name under which the system can be found in the network using DNS lookup. Otherwise, please set it to a unique name in the network. For now, the current time is appended for the uuid: '", this_peer.uuid.host_name, "'."); 
    319   } 
    320   else 
    321   { 
    322     this_peer.uuid.host_name = buffer; 
    323   } 
    324  
    325   // Create server 
    326   if (options.peer_type != tPeerType::CLIENT_ONLY) 
    327   { 
    328     server = new tServer(*this); 
    329   } 
     314  event_loop_running(false) 
     315{ 
    330316} 
    331317 
     
    338324    thread->Join(); 
    339325  } 
    340  
    341   core::tRuntimeEnvironment::GetInstance().RemoveListener(*this); 
    342326} 
    343327 
     
    381365{ 
    382366  actively_connect = true; 
    383  
    384   for (const std::string & address : create_options.connect_to) 
    385   { 
    386     connect_to.push_back(address); 
    387   } 
    388  
    389367  low_priority_tasks_timer.async_wait(tProcessLowPriorityTasksCaller<false>(*this)); // immediately trigger connecting 
    390368} 
    391  
    392 std::string tPeerImplementation::Connect(core::tAbstractPort& local_port, const std::string& remote_runtime_uuid, 
    393     int remote_port_handle, const std::string remote_port_link, bool disconnect) 
    394 { 
    395   for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
    396   { 
    397     if ((*it)->remote_part && (*it)->uuid.ToString() == remote_runtime_uuid) 
    398     { 
    399       tRemotePart& part = *((*it)->remote_part); 
    400       auto remote_port = part.remote_port_map.find(remote_port_handle); 
    401       if (remote_port != part.remote_port_map.end()) 
    402       { 
    403         if (!disconnect) 
    404         { 
    405           local_port.ConnectTo(remote_port->second->GetQualifiedLink(), core::tAbstractPort::tConnectDirection::AUTO, true); 
    406           if (!local_port.IsConnectedTo(*remote_port->second)) 
    407           { 
    408             return "Could not connect ports (see console output for reasons)"; 
    409           } 
    410           return ""; 
    411         } 
    412         else 
    413         { 
    414           local_port.DisconnectFrom(remote_port->second->GetQualifiedLink()); 
    415           if (local_port.IsConnectedTo(*remote_port->second)) 
    416           { 
    417             return "Could not disconnect ports (see console output for reasons)"; 
    418           } 
    419           return ""; 
    420         } 
    421       } 
    422       else 
    423       { 
    424         return "No remote port with handle " + std::to_string(remote_port_handle) + "found"; 
    425       } 
    426     } 
    427   } 
    428   return "No remote runtime with UUID " + remote_runtime_uuid + " found"; 
    429 } 
    430  
    431369 
    432370void tPeerImplementation::DeserializePeerInfo(rrlib::serialization::tInputStream& stream, tPeerInfo& peer) 
     
    445383} 
    446384 
    447 tRemotePart* tPeerImplementation::GetRemotePart(const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget) 
     385network_transport::generic_protocol::tRemoteRuntime* tPeerImplementation::GetRemoteRuntime(std::shared_ptr<tConnection>& connection, const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget) 
    448386{ 
    449387  if (uuid == this->this_peer.uuid) 
     
    457395    if ((*it)->uuid == uuid) 
    458396    { 
    459       if (!(*it)->remote_part) 
    460       { 
    461         (*it)->remote_part = new tRemotePart(**it, framework_element, *this); 
    462         (*it)->remote_part->Init(); 
     397      if (!(*it)->remote_runtime) 
     398      { 
     399        (*it)->remote_runtime = new tRemoteRuntime(*this, connection, *GetPluginRootFrameworkElement(), uuid.ToString()); 
     400        (*it)->remote_runtime->EmplaceAnnotation<tRemoteRuntimeRemover>(*it); 
     401        (*it)->remote_runtime->Init(); 
    463402      } 
    464403      (*it)->AddAddress(address); 
     
    471410      (*it)->name = peer_name; 
    472411      (*it)->never_forget |= never_forget; 
    473       return (*it)->remote_part; 
     412      return (*it)->remote_runtime; 
    474413    } 
    475414  } 
    476415 
    477416  other_peers.emplace_back(new tPeerInfo(peer_type)); 
    478   tPeerInfo& info = **(other_peers.end() - 1); 
     417  tPeerInfo& info = *other_peers.back(); 
    479418  info.addresses.push_back(address); 
    480419  info.uuid = uuid; 
    481420  info.name = peer_name; 
    482421  info.never_forget = never_forget; 
    483   info.remote_part = new tRemotePart(info, framework_element, *this); 
    484   info.remote_part->Init(); 
     422  info.remote_runtime = new tRemoteRuntime(*this, connection, *GetPluginRootFrameworkElement(), uuid.ToString()); 
     423  info.remote_runtime->EmplaceAnnotation<tRemoteRuntimeRemover>(other_peers.back()); 
     424  info.remote_runtime->Init(); 
    485425  //peer_list_revision++; 
    486   return info.remote_part; 
     426  return info.remote_runtime; 
    487427} 
    488428 
     
    504444} 
    505445 
    506 bool tPeerImplementation::IsSharedPort(core::tFrameworkElement& framework_element) 
    507 { 
    508   return framework_element.IsPort() && framework_element.GetFlag(core::tFrameworkElement::tFlag::SHARED) && 
    509          (!framework_element.GetFlag(core::tFrameworkElement::tFlag::NETWORK_ELEMENT)); 
    510 } 
    511  
    512 void tPeerImplementation::OnEdgeChange(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target) 
    513 { 
    514   // Maintain network connection info for finstruct 
    515   bool target_port_changed = false; 
    516   UpdateNetworkConnectionInfo(change_type, source, target, target_port_changed); 
    517  
    518   // Forward change to clients 
    519   if (source.IsReady()) 
    520   { 
    521     ProcessRuntimeChange(core::tRuntimeListener::tEvent::CHANGE, source, true); 
    522   } 
    523   if (target.IsReady() && target_port_changed) 
    524   { 
    525     ProcessRuntimeChange(core::tRuntimeListener::tEvent::CHANGE, target, true); 
    526   } 
    527  
    528   // Check subscriptions? 
    529   if (source.IsReady()) 
    530   { 
    531     tNetworkPortInfo* network_port_info = source.GetAnnotation<tNetworkPortInfo>(); 
    532     if (network_port_info) 
    533     { 
    534       network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex); 
    535     } 
    536   } 
    537   if (target.IsReady()) 
    538   { 
    539     tNetworkPortInfo* network_port_info = target.GetAnnotation<tNetworkPortInfo>(); 
    540     if (network_port_info) 
    541     { 
    542       network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex); 
    543     } 
    544   } 
    545 } 
    546  
    547 void tPeerImplementation::OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element) 
    548 { 
    549   // Maintain network connection info for finstruct 
    550   if (change_type == core::tRuntimeListener::ADD && element.IsPort()) 
    551   { 
    552     core::tAbstractPort& port = static_cast<core::tAbstractPort&>(element); 
    553     for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it) 
    554     { 
    555       bool target_port_changed = false; 
    556       UpdateNetworkConnectionInfo(change_type, port, *it, target_port_changed); 
    557     } 
    558   } 
    559  
    560   ProcessRuntimeChange(change_type, element, false); 
    561  
    562   // Check subscriptions? 
    563   if (change_type == core::tRuntimeListener::tEvent::CHANGE) 
    564   { 
    565     tNetworkPortInfo* network_port_info = element.GetAnnotation<tNetworkPortInfo>(); 
    566     if (network_port_info) 
    567     { 
    568       network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex); 
    569     } 
    570   } 
    571   if (change_type == core::tRuntimeListener::tEvent::ADD && element.IsPort()) 
    572   { 
    573     // Check for any connected remote destination ports: 
    574     // Network input port may already be initialized, while local connected output port is initialized now. 
    575     // => Trigger subscription check in this case 
    576     core::tAbstractPort& port = static_cast<core::tAbstractPort&>(element); 
    577     for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it) 
    578     { 
    579       tNetworkPortInfo* network_port_info = it->GetAnnotation<tNetworkPortInfo>(); 
    580       if (network_port_info) 
    581       { 
    582         network_port_info->CheckSubscription(pending_subscription_checks, pending_subscription_checks_mutex); 
    583       } 
    584     } 
    585   } 
    586  
    587  
    588   // RPC port deletion? 
    589   if (change_type == core::tRuntimeListener::tEvent::REMOVE && element.IsPort() && 
    590       rpc_ports::IsRPCType(static_cast<core::tAbstractPort&>(element).GetDataType())) 
    591   { 
    592     rrlib::thread::tLock lock(deleted_rpc_ports_mutex); 
    593     deleted_rpc_ports.push_back(element.GetHandle()); 
    594   } 
     446void tPeerImplementation::Init(tXmlNode* config_node) 
     447{ 
     448  tNetworkTransportPlugin::Init(config_node); 
     449  this_peer.peer_type = par_peer_type.Get(); 
     450  this_peer.name = network_transport::generic_protocol::tLocalRuntimeInfo::GetName(); 
     451 
     452  // Retrieve host name 
     453  char buffer[258]; 
     454  if (gethostname(buffer, 257)) 
     455  { 
     456    this_peer.uuid.host_name = "No host name@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count()); 
     457    FINROC_LOG_PRINT(ERROR, "Error retrieving host name."); 
     458  } 
     459  else if (std::string(buffer) == "localhost") 
     460  { 
     461    this_peer.uuid.host_name = "localhost@" + std::to_string(rrlib::time::Now(true).time_since_epoch().count()); 
     462    FINROC_LOG_PRINT(ERROR, "The hostname of this system is 'localhost' (according to the hostname() function). When using the finroc_tcp plugin, this is not allowed (a unique identifier for this Finroc runtime environment is derived from the hostname). Ideally, the hostname is the name under which the system can be found in the network using DNS lookup. Otherwise, please set it to a unique name in the network. For now, the current time is appended for the uuid: '", this_peer.uuid.host_name, "'."); 
     463  } 
     464  else 
     465  { 
     466    this_peer.uuid.host_name = buffer; 
     467  } 
     468 
     469  // Create server 
     470  if (this_peer.peer_type != tPeerType::CLIENT_ONLY) 
     471  { 
     472    server = new tServer(*this); 
     473  } 
     474} 
     475 
     476void tPeerImplementation::OnStartServingStructure() 
     477{ 
     478  Connect(); 
     479  StartServer(); 
    595480} 
    596481 
    597482void tPeerImplementation::ProcessIncomingPeerInfo(const tPeerInfo& peer_info) 
    598483{ 
    599   tPeerInfo* existing_peer = NULL; 
     484  tPeerInfo* existing_peer = nullptr; 
    600485  if (peer_info.uuid == this_peer.uuid) 
    601486  { 
     
    633518{ 
    634519  rrlib::time::tTimestamp time_now = rrlib::time::Now(); 
    635   //FINROC_LOG_PRINT(DEBUG, "Called"); 
    636  
    637   // Process incoming structure changes 
    638   ProcessRuntimeChangeEvents(); 
    639  
    640   // Process pending subscription checks 
    641   { 
    642     rrlib::thread::tLock lock(pending_subscription_checks_mutex); 
    643     pending_subscription_checks_copy = pending_subscription_checks; 
    644     pending_subscription_checks.clear(); 
    645   } 
    646   for (auto it = pending_subscription_checks_copy.begin(); it != pending_subscription_checks_copy.end(); ++it) 
    647   { 
    648     core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(*it); 
    649     if (port && port->IsReady()) 
    650     { 
    651       tNetworkPortInfo* network_port_info = port->GetAnnotation<tNetworkPortInfo>(); 
    652       if (network_port_info) 
    653       { 
    654         network_port_info->DoSubscriptionCheck(); 
    655       } 
    656     } 
    657   } 
    658  
    659   // Process incoming port buffer changes 
    660   rrlib::concurrent_containers::tQueueFragment<tChangeEventPointer> port_changes = incoming_port_buffer_changes.DequeueAll(); 
    661   while (!port_changes.Empty()) 
    662   { 
    663     tChangeEventPointer change_event = port_changes.PopFront(); 
    664     change_event->network_port_info->ProcessIncomingBuffer(change_event); 
    665   } 
     520  ProcessLocalRuntimeCallsToSend(); 
     521  ProcessLocalRuntimePortDataChanges(); 
     522  ProcessLocalRuntimeStructureChanges(); 
    666523 
    667524  // Process active connections 
    668525  for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
    669526  { 
    670     if ((*it)->remote_part) 
    671     { 
    672       (*it)->remote_part->SendPendingMessages(time_now); 
     527    if ((*it)->remote_runtime) 
     528    { 
     529      (*it)->remote_runtime->SendPendingMessages(time_now); 
    673530    } 
    674531  } 
     
    678535{ 
    679536  FINROC_LOG_PRINT(DEBUG_VERBOSE_2, "Alive ", rrlib::time::Now().time_since_epoch().count()); 
    680  
    681   // delete buffer pools created for RPC ports 
    682   std::vector<core::tFrameworkElement::tHandle> deleted_ports; 
    683   { 
    684     rrlib::thread::tLock lock(deleted_rpc_ports_mutex); 
    685     if (!deleted_rpc_ports.empty()) 
    686     { 
    687       std::swap(deleted_ports, deleted_rpc_ports); // Move deleted ports to local variable and unlock 
    688     } 
    689   } 
    690   if (!deleted_ports.empty()) 
    691   { 
    692     for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
    693     { 
    694       tPeerInfo& peer = **it; 
    695       if (peer.remote_part) 
    696       { 
    697         peer.remote_part->RpcPortsDeleted(deleted_ports); 
    698       } 
    699     } 
    700   } 
    701537 
    702538  // connect to other peers 
    703539  if (actively_connect) 
    704540  { 
     541    if (par_connect_to.HasChanged()) 
     542    { 
     543      auto current_connect_to = par_connect_to.GetPointer(); 
     544      for (const std::string & address : (*current_connect_to)) 
     545      { 
     546        if (connected_to.count(address) == 0) 
     547        { 
     548          connect_to.push_back(address); 
     549          connected_to.insert(address); 
     550        } 
     551      } 
     552      par_connect_to.ResetChanged(); 
     553    } 
     554 
    705555    std::vector<std::string> connect_to_copy = std::move(connect_to); 
    706556    for (auto & address : connect_to_copy) 
     
    713563    { 
    714564      tPeerInfo& peer = **it; 
    715       if ((!peer.connected) && (!peer.connecting) && (peer.peer_type != tPeerType::CLIENT_ONLY) && 
    716           (create_options.auto_connect_to_all_peers || peer.never_forget)) 
     565      if ((!peer.remote_runtime) && (!peer.connecting) && (peer.peer_type != tPeerType::CLIENT_ONLY) && 
     566          (par_auto_connect_to_all_peers.Get() || peer.never_forget)) 
    717567      { 
    718568        tConnectorTask connector_task(*this, peer); 
     
    724574  if (peer_list_changed) 
    725575  { 
    726     for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
    727     { 
    728       tPeerInfo& peer = **it; 
    729       tRemotePart *remote_part = peer.remote_part; 
    730       if (remote_part && peer.connected) 
    731       { 
    732         std::shared_ptr<tConnection> management_connection = remote_part->GetManagementConnection(); 
    733         if (management_connection && management_connection->IsReady()) 
     576    for (auto & remote_runtime : ConnectedRuntimes()) 
     577    { 
     578      tConnection& connection = static_cast<tConnection&>(*remote_runtime->GetPrimaryConnection()); 
     579      if (connection.IsReady()) 
     580      { 
     581        auto& stream = connection.CurrentWriteStream(); 
     582        network_transport::generic_protocol::tPeerInfoMessage::Serialize(false, true, stream); 
     583        SerializePeerInfo(stream, this_peer); 
     584        for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
    734585        { 
    735  
    736           tPeerInfoMessage::Serialize(false, management_connection->CurrentWriteStream()); 
    737  
    738           SerializePeerInfo(management_connection->CurrentWriteStream(), this_peer); 
    739  
    740           for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
    741           { 
    742             SerializePeerInfo(management_connection->CurrentWriteStream(), **it); 
    743           } 
    744  
    745           management_connection->CurrentWriteStream().WriteBoolean(false); 
    746           tPeerInfoMessage::FinishMessage(management_connection->CurrentWriteStream()); 
    747  
     586          SerializePeerInfo(stream, **it); 
    748587        } 
     588        stream.WriteBoolean(false); 
     589        network_transport::generic_protocol::tPeerInfoMessage::FinishMessage(stream); 
    749590      } 
    750591    } 
     
    754595} 
    755596 
    756 void tPeerImplementation::ProcessRuntimeChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element, bool edge_change) 
    757 { 
    758   bool shared_port = IsSharedPort(element); 
    759   bool serve_structure_copy = serve_structure.load(); 
    760  
    761   bool relevant_for_shared_port_client = shared_port && (!edge_change); 
    762   bool relevant_for_structure_client = !element.GetFlag(core::tFrameworkElement::tFlag::NETWORK_ELEMENT) && (!edge_change); 
    763  
    764   if ((relevant_for_shared_port_client || serve_structure_copy) && (change_type != core::tRuntimeListener::tEvent::PRE_INIT)) 
    765   { 
    766     std::unique_ptr<tSerializedStructureChange> change(new tSerializedStructureChange(change_type, element, serve_structure_copy, 
    767         relevant_for_shared_port_client ? network_transport::tStructureExchange::SHARED_PORTS : 
    768         (relevant_for_structure_client ? network_transport::tStructureExchange::COMPLETE_STRUCTURE : network_transport::tStructureExchange::FINSTRUCT))); 
    769  
    770     if (relevant_for_shared_port_client) 
    771     { 
    772       rrlib::thread::tLock lock(shared_ports_mutex, false); 
    773       if (change_type == core::tRuntimeListener::tEvent::ADD || change_type == core::tRuntimeListener::tEvent::CHANGE) 
    774       { 
    775         rrlib::serialization::tStackMemoryBuffer<2048> buffer; 
    776         rrlib::serialization::tOutputStream stream(buffer); 
    777         std::string string_buffer; 
    778         network_transport::tFrameworkElementInfo::Serialize(stream, element, network_transport::tStructureExchange::SHARED_PORTS, string_buffer); 
    779         stream.Flush(); 
    780         lock.Lock(); 
    781         shared_ports[element.GetHandle()] = CopyToNewFixedBuffer(buffer); 
    782       } 
    783       else if (change_type == core::tRuntimeListener::tEvent::REMOVE) 
    784       { 
    785         lock.Lock(); 
    786         shared_ports.erase(element.GetHandle()); 
    787       } 
    788       incoming_structure_changes.Enqueue(std::move(change)); // do this with lock - to avoid inconsistencies 
    789     } 
    790     else 
    791     { 
    792       //FINROC_LOG_PRINT(DEBUG, "Enqueuing ", change.get(), " ", element.GetQualifiedName()); 
    793       incoming_structure_changes.Enqueue(std::move(change)); 
    794     } 
    795   } 
    796 } 
    797  
    798 void tPeerImplementation::ProcessRuntimeChangeEvents() 
    799 { 
    800   rrlib::concurrent_containers::tQueueFragment<std::unique_ptr<tSerializedStructureChange>> incoming_structure_changes_fragment = incoming_structure_changes.DequeueAll(); 
    801   while (!incoming_structure_changes_fragment.Empty()) 
    802   { 
    803     std::unique_ptr<tSerializedStructureChange> incoming_structure_change = incoming_structure_changes_fragment.PopFront(); 
    804     //FINROC_LOG_PRINT(DEBUG, "Dequeuing ", incoming_structure_change.get()); 
    805     for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
    806     { 
    807       if ((*it)->remote_part) 
    808       { 
    809         if (static_cast<size_t>((*it)->remote_part->GetDesiredStructureInfo()) >= static_cast<size_t>(incoming_structure_change->MinimumRelevantLevel())) 
    810         { 
    811           (*it)->remote_part->SendStructureChange(*incoming_structure_change); 
    812         } 
    813       } 
    814     } 
    815   } 
    816 } 
    817  
    818597void tPeerImplementation::RunEventLoop() 
    819598{ 
     
    821600  { 
    822601    event_loop_running = true; 
    823     event_processing_timer.expires_from_now(boost::posix_time::milliseconds(cPROCESS_EVENTS_CALL_INTERVAL)); 
     602    event_processing_timer.expires_from_now(ToBoostPosixTime(par_process_events_call_interval.Get())); 
    824603    event_processing_timer.async_wait(tProcessEventsCaller(*this)); 
    825604  } 
    826605} 
    827606 
    828 //FIXME: remove in next Finroc version 
    829 bool IsLoopbackAddress(const boost::asio::ip::address& address) 
    830 { 
    831   return address.is_v4() ? (address.to_v4() == boost::asio::ip::address_v4::loopback()) : (address.to_v6() == boost::asio::ip::address_v6::loopback()); 
    832 } 
    833  
    834607void tPeerImplementation::SerializePeerInfo(rrlib::serialization::tOutputStream& stream, const tPeerInfo& peer) 
    835608{ 
    836   if ((&peer == &this_peer || peer.connected) && peer.peer_type != tPeerType::CLIENT_ONLY) 
     609  if ((&peer == &this_peer || peer.remote_runtime) && peer.peer_type != tPeerType::CLIENT_ONLY) 
    837610  { 
    838611    stream << true; 
     
    865638} 
    866639 
    867 rrlib::serialization::tMemoryBuffer tPeerImplementation::SerializeSharedPorts(common::tRemoteTypes& connection_type_encoder) 
    868 { 
    869   rrlib::thread::tLock lock(shared_ports_mutex); 
    870   ProcessRuntimeChangeEvents();  // to make sure we don't get any shared port events twice 
    871   rrlib::serialization::tMemoryBuffer buffer(shared_ports.size() * 200); 
    872   rrlib::serialization::tOutputStream stream(buffer, connection_type_encoder); 
    873   stream.WriteInt(0); // Placeholder for size 
    874   stream << rrlib::rtti::tDataType<std::string>(); // write a data type for initialization 
    875   for (auto it = shared_ports.begin(); it != shared_ports.end(); ++it) 
    876   { 
    877     stream << it->first; 
    878     stream.Write(it->second); 
    879   } 
    880   stream.WriteInt(0); // size of next packet 
    881   stream.Close(); 
    882   buffer.GetBuffer().PutInt(0, buffer.GetSize() - 8); 
    883   return buffer; 
    884 } 
    885  
    886640void tPeerImplementation::StartServer() 
    887641{ 
    888   core::tRuntimeEnvironment::GetInstance().AddListener(*this); 
    889  
    890   // Collect existing shared ports and store serialized information about them 
    891   rrlib::serialization::tStackMemoryBuffer<2048> buffer; 
    892   rrlib::serialization::tOutputStream stream(buffer); 
    893   std::string string_buffer; 
    894   enum { cPORT_BUFFER_SIZE = 2048 }; 
    895   core::tAbstractPort* port_buffer[cPORT_BUFFER_SIZE]; 
    896   typename core::tFrameworkElement::tHandle start_handle = 0; 
    897   while (true) 
    898   { 
    899     size_t port_count = core::tRuntimeEnvironment::GetInstance().GetAllPorts(port_buffer, cPORT_BUFFER_SIZE, start_handle); 
    900     for (size_t i = 0; i < port_count; i++) 
    901     { 
    902       core::tAbstractPort& port = *port_buffer[i]; 
    903       if (IsSharedPort(port)) 
    904       { 
    905         stream.Reset(); 
    906         network_transport::tFrameworkElementInfo::Serialize(stream, port, network_transport::tStructureExchange::SHARED_PORTS, string_buffer); 
    907         stream.Flush(); 
    908         shared_ports.insert(std::pair<core::tFrameworkElement::tHandle, rrlib::serialization::tFixedBuffer>(port.GetHandle(), CopyToNewFixedBuffer(buffer))); 
    909       } 
    910     } 
    911     if (port_count < cPORT_BUFFER_SIZE) 
    912     { 
    913       break; 
    914     } 
    915     start_handle = port_buffer[cPORT_BUFFER_SIZE - 1]->GetHandle() + 1; 
    916   }; 
    917  
    918642  // Start TCP Thread 
    919643  StartThread(); 
     
    928652} 
    929653 
    930 void tPeerImplementation::UpdateNetworkConnectionInfo(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target, bool& target_port_changed) 
    931 { 
    932   tNetworkPortInfo* target_port_info = target.GetAnnotation<tNetworkPortInfo>(); 
    933   bool destination_is_source = false; 
    934   core::tAbstractPort* connection_annotated = &source; 
    935   if ((!target_port_info) || target_port_info->IsServerPort()) 
    936   { 
    937     target_port_info = source.GetAnnotation<tNetworkPortInfo>(); 
    938     destination_is_source = true; 
    939     connection_annotated = &target; 
    940   } 
    941  
    942   if (target_port_info && (!target_port_info->IsServerPort())) 
    943   { 
    944     target_port_changed = destination_is_source; 
    945     network_transport::tNetworkConnections* connections_annotation = connection_annotated->GetAnnotation<network_transport::tNetworkConnections>(); 
    946     if (change_type == core::tRuntimeListener::tEvent::ADD) 
    947     { 
    948       if (!connections_annotation) 
    949       { 
    950         connections_annotation = &connection_annotated->EmplaceAnnotation<network_transport::tNetworkConnections>(); 
    951       } 
    952       connections_annotation->Add(network_transport::tNetworkConnection(target_port_info->GetRemotePart().peer_info.uuid.ToString(), target_port_info->GetRemoteHandle(), destination_is_source)); 
    953     } 
    954     else if (change_type == core::tRuntimeListener::tEvent::REMOVE && connections_annotation) 
    955     { 
    956       connections_annotation->Remove(network_transport::tNetworkConnection(target_port_info->GetRemotePart().peer_info.uuid.ToString(), target_port_info->GetRemoteHandle(), destination_is_source)); 
    957     } 
    958   } 
    959 } 
    960654 
    961655//---------------------------------------------------------------------- 
  • internal/tPeerImplementation.cpp

    r166 r167  
    162162      Connect("no endpoints resolved"); 
    163163    } 
     164    catch (const std::invalid_argument &e) 
     165    { 
     166      FINROC_LOG_PRINT(ERROR, "Could not connect to invalid address '", connect_to, "'. Reason: ", e.what()); 
     167    } 
    164168    catch (const std::exception &e) 
    165169    { 
     
    549553    } 
    550554 
    551     for (auto it = connect_to.begin(); it != connect_to.end(); ++it) 
    552     { 
    553       FINROC_LOG_PRINT(DEBUG, "Connecting to ", *it); 
    554       tAddressConnectorTask connector_task(*this, *it); 
    555     } 
    556     connect_to.clear(); 
     555    std::vector<std::string> connect_to_copy = std::move(connect_to); 
     556    for (auto & address : connect_to_copy) 
     557    { 
     558      FINROC_LOG_PRINT(DEBUG, "Connecting to ", address); 
     559      tAddressConnectorTask connector_task(*this, address); 
     560    } 
    557561 
    558562    for (auto it = other_peers.begin(); it != other_peers.end(); ++it) 
  • internal/tPeerImplementation.h

    r164 r167  
    4040// External includes (system with <>, local with "") 
    4141//---------------------------------------------------------------------- 
    42 #include <map> 
    4342#include <boost/asio/ip/tcp.hpp> 
    4443#include <boost/asio/deadline_timer.hpp> 
     44#include <set> 
     45 
    4546#include "rrlib/thread/tThread.h" 
    46 #include "core/tFrameworkElement.h" 
    4747 
    4848//---------------------------------------------------------------------- 
    4949// Internal includes with "" 
    5050//---------------------------------------------------------------------- 
    51 #include "plugins/network_transport/structure_info/tFrameworkElementInfo.h" 
    52 #include "plugins/tcp/common/tRemoteTypes.h" 
    53 //#include "plugins/tcp/internal/tNetworkPortInfo.h" 
    5451#include "plugins/tcp/internal/tPeerInfo.h" 
    55 #include "plugins/tcp/internal/tPortBufferChangeEvent.h" 
    56 #include "plugins/tcp/internal/tSerializedStructureChange.h" 
    5752 
    5853//---------------------------------------------------------------------- 
     
    6964// Forward declarations / typedefs / enums 
    7065//---------------------------------------------------------------------- 
    71  
    72 class tPlugin; 
    7366class tServer; 
     67class tConnection; 
    7468 
    7569//---------------------------------------------------------------------- 
     
    8074 * Implementation of different variants of TCP peer. 
    8175 */ 
    82 class tPeerImplementation : public core::tRuntimeListener 
     76class tPeerImplementation : public tTCPPlugin 
    8377{ 
    8478 
    85   typedef rrlib::buffer_pools::tBufferPool < tPortBufferChangeEvent, rrlib::concurrent_containers::tConcurrency::MULTIPLE_READERS, 
    86           rrlib::buffer_pools::management::QueueBased, rrlib::buffer_pools::deleting::CollectGarbage, 
    87           rrlib::buffer_pools::recycling::UseOwnerStorageInBuffer > tPortBufferChangeEventPool; 
    88  
    8979//---------------------------------------------------------------------- 
    9080// Public methods and typedefs 
    9181//---------------------------------------------------------------------- 
    9282public: 
    93  
    94   typedef typename tPortBufferChangeEventPool::tPointer tChangeEventPointer; 
    9583 
    9684  /*! 
     
    9886   * \param Options for peer creation 
    9987   */ 
    100   tPeerImplementation(core::tFrameworkElement& framework_element, const tOptions& options); 
     88  tPeerImplementation(); 
    10189 
    10290  /*! Shuts peer down */ 
     
    11098  /*! Starts actively connecting to the specified network */ 
    11199  void Connect(); 
    112  
    113   /*! 
    114    * Connect local port to port in remote runtime environment using TCP network transport plugin. 
    115    * 
    116    * \param local_port Local port to connect 
    117    * \param remote_runtime_uuid UUID of remote runtime 
    118    * \param remote_port_handle Handle of remote port 
    119    * \param remote_port_link Link of port in remote runtime environment 
    120    * \param disconnect If 'false' the ports are connected - if 'true' the ports are disconnected 
    121    * \return Returns error message if connecting failed. On success an empty string is returned. 
    122    */ 
    123   std::string Connect(core::tAbstractPort& local_port, const std::string& remote_runtime_uuid, int remote_port_handle, const std::string remote_port_link, bool disconnect); 
    124100 
    125101  /*! 
     
    147123 
    148124  /*! 
    149    * Gets remote part with specified UUID. 
    150    * If no such part has been registered yet, creates a new one. 
     125   * Gets remote runtime with specified UUID. 
     126   * If no such runtime has been registered yet, creates a new one. 
    151127   * 
    152    * \param uuid UUID of remote part 
     128   * \param uuid UUID of remote runtime 
    153129   * \param peer_type Peer type 
    154130   * \param peer_name Name of peer. Will be displayed in tooling and status messages. Does not need to be unique. Typically the program/process name. 
    155    * \param address IP address of remote part 
     131   * \param address IP address of remote runtime 
    156132   * \param never_forget Is this a remote peer to never forget? 
    157    * \return Pointer to remote part 
    158    */ 
    159   tRemotePart* GetRemotePart(const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget); 
     133   * \return Pointer to remote runtime 
     134   */ 
     135  network_transport::generic_protocol::tRemoteRuntime* GetRemoteRuntime(std::shared_ptr<tConnection>& connection, const tUUID& uuid, tPeerType peer_type, const std::string& peer_name, const boost::asio::ip::address& address, bool never_forget); 
    160136 
    161137  /*! 
     
    167143  } 
    168144 
     145  virtual void Init(tXmlNode* config_node) override; 
     146 
    169147  /*! 
    170148   * Reference to Boost asio IO service 
     
    173151  { 
    174152    return *io_service; 
    175   } 
    176  
    177   void PortChanged(data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject>& value, tNetworkPortInfo* info, data_ports::tChangeContext& change_context) 
    178   { 
    179     tChangeEventPointer event_buffer = port_buffer_change_event_buffers.GetUnusedBuffer(); 
    180     if (!event_buffer) 
    181     { 
    182       event_buffer = port_buffer_change_event_buffers.AddBuffer(std::unique_ptr<tPortBufferChangeEvent>(new tPortBufferChangeEvent())); 
    183     } 
    184     event_buffer->new_value = std::move(value); 
    185     event_buffer->network_port_info = info; 
    186     event_buffer->change_type = change_context.ChangeType(); 
    187     incoming_port_buffer_changes.Enqueue(event_buffer); 
    188153  } 
    189154 
     
    201166 
    202167  /*! 
     168   * Processes incoming structure changes from local runtime 
     169   */ 
     170  void ProcessLocalRuntimeStructureChanges() 
     171  { 
     172    tTCPPlugin::ProcessLocalRuntimeStructureChanges(); 
     173  } 
     174 
     175  /*! 
    203176   * Called in a regular interval to do things like establishing new connections 
    204177   */ 
    205178  void ProcessLowPriorityTasks(); 
    206  
    207   /*! 
    208    * Processes all enqueued runtime change events in TCP thread 
    209    * and distributes them to all connections that are interested. 
    210    */ 
    211   void ProcessRuntimeChangeEvents(); 
    212179 
    213180  /*! 
     
    218185 
    219186  /*! 
    220    * Serializes shared ports and returns that in memory buffer 
    221    * 
    222    * \param connection_type_encoder Type encoder object of connection to serialize shared ports for 
    223    */ 
    224   rrlib::serialization::tMemoryBuffer SerializeSharedPorts(common::tRemoteTypes& connection_type_encoder); 
    225  
    226  
    227   /*! 
    228    * \return True as soon as peer also serves clients interested in complete application structure 
    229    */ 
    230   bool ServesStructure() const 
    231   { 
    232     return serve_structure.load(); 
    233   } 
    234  
    235   /*! 
    236187   * Mark the peer list as changed. 
    237188   * This will cause the peer list to be sent to all connected peers. 
     
    247198  void StartServer(); 
    248199 
    249   void StartServingStructure() 
    250   { 
    251     serve_structure.store(true); 
     200  /*! 
     201   * \return Unused buffer for initialization of prototype streams 
     202   */ 
     203  rrlib::serialization::tStackMemoryBuffer<16>& UnusedInitializationBuffer() 
     204  { 
     205    return unused_initialization_buffer; 
    252206  } 
    253207 
     
    257211private: 
    258212 
     213  friend class tTCPPluginInstance; 
    259214  friend class tServer; 
    260215  friend class tTCPThread; 
     
    268223  friend struct tConnectorTask; 
    269224 
    270   /*! Framework element associated with server */ 
    271   core::tFrameworkElement& framework_element; 
    272  
    273   /*! Options that peer was created with */ 
    274   tOptions create_options; 
    275  
    276   /*! Vector containing all network addresses this peer should try to connect to */ 
     225  /*! Unused buffer for initialization of prototype streams (must not be destroyed before connections are) */ 
     226  rrlib::serialization::tStackMemoryBuffer<16> unused_initialization_buffer; 
     227 
     228  /*! Vector containing all network addresses this peer should currently try to connect to */ 
    277229  std::vector<std::string> connect_to; 
     230 
     231  /*! Vector containing all network addresses this peer already connected to (should not be added to connect_to again) */ 
     232  std::set<std::string> connected_to; 
    278233 
    279234  /*! Info on this peer */ 
     
    285240   * connection to remote part is established. 
    286241   */ 
    287   std::vector<std::unique_ptr<tPeerInfo>> other_peers; 
     242  std::vector<std::shared_ptr<tPeerInfo>> other_peers; 
    288243 
    289244  /*! Revision of peer information */ 
     
    305260  tServer* server; 
    306261 
    307   /*! Cached info on ports shared by this peer */ 
    308   std::map<core::tFrameworkElement::tHandle, rrlib::serialization::tFixedBuffer> shared_ports; 
    309  
    310   /*! Mutex for 'shared_ports' access */ 
    311   rrlib::thread::tMutex shared_ports_mutex; 
    312  
    313   /*! 
    314    * True as soon as peer also serves clients interested in complete application structure. 
    315    * This may be enabled later, as this can cause quite a lot of overhead if done during 
    316    * construction and startup of large (MCA2) applications. 
    317    */ 
    318   std::atomic<bool> serve_structure; 
    319  
    320   /*! 
    321    * Concurrent queue with incoming structure changes. 
    322    * Queue is filled when runtime changes occur (structure mutex is acquired by this thread). 
    323    * Queue is processed when TCP thread calls ProcessIncomingStructureChanges() 
    324    */ 
    325   rrlib::concurrent_containers::tQueue < std::unique_ptr<tSerializedStructureChange>, rrlib::concurrent_containers::tConcurrency::SINGLE_READER_AND_WRITER, 
    326         rrlib::concurrent_containers::tDequeueMode::ALL > incoming_structure_changes; 
    327  
    328262  /*! 
    329263   * Actively connect to specified network? 
     
    332266  bool actively_connect; 
    333267 
    334   /*! Ports to check subscriptions for */ 
    335   std::vector<tFrameworkElementHandle> pending_subscription_checks; 
    336  
    337   /*! Mutex for 'pending_subscription_checks' access */ 
    338   rrlib::thread::tMutex pending_subscription_checks_mutex; 
    339  
    340   /*! Copy for TCP thread */ 
    341   std::vector<tFrameworkElementHandle> pending_subscription_checks_copy; 
    342  
    343268  /*! True when event loop is running (ProcessEvents() is regularly called by TCP thread) */ 
    344269  bool event_loop_running; 
    345  
    346   /*! Concurrent queue with incoming port value changes */ 
    347   rrlib::concurrent_containers::tQueue < tChangeEventPointer, rrlib::concurrent_containers::tConcurrency::MULTIPLE_WRITERS, 
    348         rrlib::concurrent_containers::tDequeueMode::ALL > incoming_port_buffer_changes; 
    349  
    350   /*! Buffer pool with port value change event buffers */ 
    351   tPortBufferChangeEventPool port_buffer_change_event_buffers; 
    352  
    353   /*! List with deleted RPC ports - so that buffer pools created in connection for these ports can be deleted */ 
    354   std::vector<core::tFrameworkElement::tHandle> deleted_rpc_ports; 
    355  
    356   /*! Mutex for 'deleted_rpc_ports' access */ 
    357   rrlib::thread::tMutex deleted_rpc_ports_mutex; 
    358270 
    359271 
     
    373285  void InferMissingAddresses(); 
    374286 
    375   /*! Is provided element a shared port (to be announced to other peers)? */ 
    376   static bool IsSharedPort(core::tFrameworkElement& framework_element); 
    377  
    378   virtual void OnEdgeChange(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target) override; 
    379   virtual void OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element) override; 
    380  
    381   /*! Handles runtime changes from callbacks - and forwards info to 'incoming_structure_changes' queue */ 
    382   void ProcessRuntimeChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element, bool edge_change); 
     287  virtual void OnStartServingStructure() override; 
    383288 
    384289  /*! 
     
    392297  void StartThread(); 
    393298 
    394   /*! 
    395    * Called whenever a new edge is added and updates network connection info if necessary 
    396    */ 
    397   void UpdateNetworkConnectionInfo(core::tRuntimeListener::tEvent change_type, core::tAbstractPort& source, core::tAbstractPort& target, bool& target_port_changed); 
    398299}; 
    399300 
  • internal/tPeerImplementation.h

    r166 r167  
    9898  /*! Starts actively connecting to the specified network */ 
    9999  void Connect(); 
     100 
     101  /*! 
     102   * Connects to part with specified address 
     103   */ 
     104  void ConnectTo(const std::string& address) 
     105  { 
     106    connect_to.push_back(address); 
     107  } 
    100108 
    101109  /*! 
  • internal/util.h

    r165 r167  
    4444// Internal includes with "" 
    4545//---------------------------------------------------------------------- 
     46#include "plugins/tcp/internal/protocol_definitions.h" 
    4647 
    4748//---------------------------------------------------------------------- 
     
    6364//---------------------------------------------------------------------- 
    6465 
    65 /*! 
    66  * Utility function. 
    67  * Copies contents of specified memory buffer to newly allocated fixed buffer of exactly the contents' size 
    68  * 
    69  * \param memory_buffer Memory buffer whose data to copy 
    70  * \return Fixed buffer with copied data 
    71  */ 
    72 inline rrlib::serialization::tFixedBuffer CopyToNewFixedBuffer(rrlib::serialization::tMemoryBuffer& memory_buffer) 
    73 { 
    74   rrlib::serialization::tFixedBuffer fixed_buffer(memory_buffer.GetSize()); 
    75   memcpy(fixed_buffer.GetPointer(), memory_buffer.GetBufferPointer(0), fixed_buffer.Capacity()); 
    76   return fixed_buffer; 
    77 } 
    7866 
    7967/*! 
  • internal/util.h

    r155 r167  
    6767/*! 
    6868 * Parses and resolves network address in specified string 
    69  * Throws exception if address cannot be parsed or resolved 
    7069 * 
    7170 * \param network_address Network address (e.g. 'localhost:4444') 
    7271 * \return std::vector of TCP endpoints corresponding to this address 
     72 * 
     73 * \throw Throws std::invalid_argument if address cannot be parsed 
     74 * \throw Throws std::runtime_error if address cannot be resolved 
    7375 */ 
    7476std::vector<boost::asio::ip::tcp::endpoint> ParseAndResolveNetworkAddress(const std::string& network_address); 
Note: See TracChangeset for help on using the changeset viewer.