Changeset 26:9f93ea63d74e in finroc_plugins_network_transport


Ignore:
Timestamp:
08.06.2017 02:40:59 (20 months ago)
Author:
Max Reichardt <mreichardt@…>
Branch:
17.03
Phase:
public
Message:

Adapts to removal of reverse pushing - by adding a parameter to CONNECT_PORTS opcode as to whether connection is used for publishing or subscribing

Location:
generic_protocol
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • generic_protocol/definitions.h

    r20 r26  
    211211 
    212212 
    213 // Parameters: [connection handle == client port handle] after message: [static connector parameters][dynamic connection data] 
    214 typedef tMessage <tOpCode, tOpCode::CONNECT_PORTS, tMessageSize::VARIABLE_UP_TO_4GB, tFrameworkElementHandle> tConnectPortsMessage; 
     213// Parameters: [connection handle == client port handle][connect direction: connection for publishing data to server?] after message: [static connector parameters][dynamic connection data] 
     214typedef tMessage <tOpCode, tOpCode::CONNECT_PORTS, tMessageSize::VARIABLE_UP_TO_4GB, tFrameworkElementHandle, bool> tConnectPortsMessage; 
    215215 
    216216// Parameters: [connection handle == client port handle] after message: [string: error message] 
  • generic_protocol/tDynamicConnectionData.h

    r19 r26  
    7272  uint16_t strategy = -1; 
    7373 
    74   /*! bool  */ 
    7574}; 
    7675 
  • generic_protocol/tNetworkPortInfo.cpp

    r23 r26  
    6666// Const values 
    6767//---------------------------------------------------------------------- 
     68__thread tNetworkPortInfo* tNetworkPortInfo::current_publishing_network_port = nullptr; 
    6869 
    6970//---------------------------------------------------------------------- 
     
    9293  if (data_ports::IsDataFlowType(port->GetDataType())) 
    9394  { 
    94     if (port->IsOutputPort()) 
    95     { 
    96       values_to_send.SetMaxLength(port->GetFlag(core::tFrameworkElement::tFlag::PUSH_STRATEGY_REVERSE) ? 1 : 0); 
    97     } 
    98     else 
     95    if (!port->IsOutputPort()) 
    9996    { 
    10097      values_to_send.SetMaxLength(strategy < 0 ? 0 : strategy); 
  • generic_protocol/tNetworkPortInfo.h

    r23 r26  
    164164  { 
    165165    //FINROC_LOG_PRINT(DEBUG, "Port Changed ", port.GetWrapped()->GetQualifiedName(), " ", rrlib::serialization::Serialize(*value)); 
    166     if (strategy > 0) 
     166    if (strategy > 0 && (!(current_publishing_network_port && current_publishing_network_port->connection_handle == -connection_handle && (&current_publishing_network_port->remote_runtime == &remote_runtime)))) 
    167167    { 
    168168      remote_runtime.GetLocalRuntimeInfo().OnPortChange(value, this, change_context); 
     
    293293  std::unique_ptr<tNetworkPortInfoClient> client_info; 
    294294 
     295  /*! Network port that currently publishes data using the current thread (null if current thread does not publish data from the net) */ 
     296  static __thread tNetworkPortInfo* current_publishing_network_port; 
     297 
    295298 
    296299  virtual void OnManagedDelete() override; 
  • generic_protocol/tNetworkPortInfoClient.cpp

    r19 r26  
    122122  { 
    123123    tStaticNetworkConnectorParameters static_parameters(used_by_connectors[0]->StaticParameters(), NetworkPortInfo().remote_port_handle); 
    124     tConnectPortsMessage::Serialize(false, true, stream, port->GetHandle()); 
     124    tConnectPortsMessage::Serialize(false, true, stream, port->GetHandle(), port->IsInputPort()); 
    125125    stream << static_parameters << new_connection_data; 
    126126    tConnectPortsMessage::FinishMessage(stream); 
  • generic_protocol/tNetworkTransportPlugin.cpp

    r19 r26  
    201201    { 
    202202      connector.temporary_conversion_port.reset(new data_ports::tGenericPort(remote_runtime.client_ports, rrlib::uri::tURI(connector.StaticParameters().server_port_id.path).ToString(), connector.OwnerPort().GetDataType(), tFlag::ACCEPTS_DATA | tFlag::PUSH_STRATEGY | tFlag::EMITS_DATA | (created_port.GetWrapped()->IsOutputPort() ? tFlag::OUTPUT_PORT : tFlag::PORT)), core::tPortWrapperBase::tDeleter()); 
     203      connector.temporary_conversion_port->ConnectTo(connector.OwnerPort(), core::tConnectionFlag::NON_PRIMARY_CONNECTOR); // Do this first so that initial pushing works (does not send default first) for remote input ports 
    203204      connector.temporary_conversion_port->ConnectTo(connector.temporary_connector_port->GetPort(), core::tConnectOptions(connector.LocalConversionOperations(), core::tConnectionFlag::NON_PRIMARY_CONNECTOR)); 
    204       connector.temporary_conversion_port->ConnectTo(connector.OwnerPort(), core::tConnectionFlag::NON_PRIMARY_CONNECTOR); 
    205205    } 
    206206    else 
  • generic_protocol/tRemoteRuntime.cpp

    r24 r26  
    302302    if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType())) 
    303303    { 
     304      tNetworkPortInfo::current_publishing_network_port = port.second; 
    304305      data_ports::tGenericPort generic_port = data_ports::tGenericPort::Wrap(*port.first); 
    305306 
     
    346347      while (another_value); 
    347348 
     349      tNetworkPortInfo::current_publishing_network_port = nullptr; 
    348350      message.FinishDeserialize(stream); 
    349351      connection.received_data_after_last_connect = true; 
     
    542544        if (lock.TryLock()) 
    543545        { 
     546          bool publish_connection = message.Get<1>(); 
     547          bool tool_connection = GetDesiredStructureInfo() != runtime_info::tStructureExchange::SHARED_PORTS; 
     548 
    544549          // Read subscription data 
    545550          tStaticNetworkConnectorParameters static_subscription_parameters; 
     
    556561 
    557562          tFlags flags = tFlag::NETWORK_ELEMENT | tFlag::VOLATILE; 
    558           if (port->IsOutputPort()) 
     563          if ((!tool_connection) && port->IsOutputPort() && publish_connection) 
     564          { 
     565            throw std::runtime_error("Cannot publish to output ports with basic (SHARED_PORTS) connection"); 
     566          } 
     567          if (!publish_connection) 
    559568          { 
    560569            flags |= tFlag::ACCEPTS_DATA; // create input port 
     
    562571          else 
    563572          { 
    564             flags |= tFlag::OUTPUT_PORT | tFlag::EMITS_DATA; // create output io port 
    565           } 
    566           if (GetDesiredStructureInfo() != runtime_info::tStructureExchange::SHARED_PORTS) 
     573            flags |= tFlag::OUTPUT_PORT | tFlag::EMITS_DATA | tFlag::NO_INITIAL_PUSHING; // create output io port 
     574          } 
     575          if (tool_connection) 
    567576          { 
    568577            flags |= tFlag::TOOL_PORT; 
     
    572581            flags |= tFlag::PUSH_STRATEGY; 
    573582          } 
    574           if (static_subscription_parameters.reverse_push) 
    575           { 
    576             flags |= tFlag::PUSH_STRATEGY_REVERSE; 
    577           } 
    578  
     583 
     584          std::string port_name = rrlib::uri::tURI(port->GetPath()).ToString() + (publish_connection ? " (Publish)" : " (Subscribe)"); 
    579585          core::tAbstractPort* port_to_connect_to = port; 
    580586          if (!static_subscription_parameters.server_side_conversion.NoConversion()) 
     
    583589 
    584590            // Check whether port already exists 
    585             if (port->IsOutputPort()) 
     591            if (!publish_connection) 
    586592            { 
    587593              for (auto it = port->OutgoingConnectionsBegin(); it != port->OutgoingConnectionsEnd(); ++it) 
     
    647653 
    648654              // Create and connect new conversion port 
    649               data_ports::tGenericPort created_port(rrlib::uri::tURI(port->GetPath()).ToString(), &GetServerPortsElement(), destination_type, tFlag::NETWORK_ELEMENT | tFlag::VOLATILE | tFlag::EMITS_DATA | tFlag::ACCEPTS_DATA | (port->IsOutputPort() ? tFlag::OUTPUT_PORT : tFlag::PORT)); 
    650               if (created_port.ConnectTo(port, core::tConnectOptions(conversion, core::tConnectionFlag::NON_PRIMARY_CONNECTOR))) 
     655              data_ports::tGenericPort created_port(port_name + " to " + destination_type.GetName(), &GetServerPortsElement(), destination_type, tFlag::NETWORK_ELEMENT | tFlag::VOLATILE | tFlag::EMITS_DATA | tFlag::ACCEPTS_DATA | (publish_connection ? (tFlag::OUTPUT_PORT | tFlag::NO_INITIAL_PUSHING) : tFlag::PORT) | (tool_connection ? tFlag::TOOL_PORT : tFlag::PORT)); 
     656              if (created_port.ConnectTo(port, core::tConnectOptions(conversion, core::tConnectionFlag::NON_PRIMARY_CONNECTOR | (publish_connection ? core::tConnectionFlag::DIRECTION_TO_DESTINATION : core::tConnectionFlag::DIRECTION_TO_SOURCE)))) 
    651657              { 
    652658                created_port.GetWrapped()->EmplaceAnnotation<tServerSideConversionAnnotation>(static_subscription_parameters.server_side_conversion); 
     
    662668          } 
    663669 
    664           data_ports::tGenericPort created_port(rrlib::uri::tURI(port->GetPath()).ToString(), &GetServerPortsElement(), &GetServerPortsElement(), port_to_connect_to->GetDataType(), flags); 
     670          data_ports::tGenericPort created_port(port_name, &GetServerPortsElement(), &GetServerPortsElement(), port_to_connect_to->GetDataType(), flags); 
    665671          tNetworkPortInfo* network_port_info = new tNetworkPortInfo(*this, message.Get<0>(), message.Get<0>(), dynamic_connection_data.strategy, *created_port.GetWrapped(), port->GetHandle()); 
    666672          network_port_info->SetDesiredEncoding(static_subscription_parameters.server_side_conversion.encoding); 
     
    669675          created_port.SetPullRequestHandler(this); 
    670676          created_port.Init(); 
    671           created_port.ConnectTo(*port_to_connect_to, core::tConnectionFlag::NON_PRIMARY_CONNECTOR); 
     677          created_port.ConnectTo(*port_to_connect_to, core::tConnectionFlag::NON_PRIMARY_CONNECTOR | (publish_connection ? core::tConnectionFlag::DIRECTION_TO_DESTINATION : core::tConnectionFlag::DIRECTION_TO_SOURCE)); 
    672678          server_port_map.emplace(message.Get<0>(), network_port_info); 
    673679          FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Created server port ", created_port.GetWrapped()); 
     
    784790          data_ports::common::tAbstractDataPort& data_port = static_cast<data_ports::common::tAbstractDataPort&>(*client_port->GetPort()); 
    785791          data_port.SetPushStrategy(dynamic_info.strategy > 0); 
    786           //data_port.SetReversePushStrategy(dynamic_info.flags.Get(tFlag::PUSH_STRATEGY_REVERSE)); 
    787792          //data_port.SetMinNetUpdateIntervalRaw(dynamic_info.min_net_update_time); 
    788793          //tNetworkPortInfo* network_port_info = data_port.GetAnnotation<tNetworkPortInfo>(); 
  • generic_protocol/tStaticConnectorParameters.h

    r19 r26  
    7777  TServerPortId server_port_id; 
    7878 
    79   /*! Whether to subscribe at input port for reverse pushing */ 
    80   bool reverse_push; 
    8179 
    8280  tStaticConnectorParameters() : 
    83     server_port_id(TServerPortId()), 
    84     reverse_push(false) 
     81    server_port_id(TServerPortId()) 
    8582  {} 
    8683 
     
    8986  tStaticConnectorParameters(const tStaticConnectorParameters<TOtherId>& other, const TServerPortId& this_id) : 
    9087    server_side_conversion(other.server_side_conversion), 
    91     server_port_id(this_id), 
    92     reverse_push(other.reverse_push) 
     88    server_port_id(this_id) 
    9389  {} 
    9490 
     
    9995inline bool operator==(const tStaticConnectorParameters<TServerPortId>& lhs, const tStaticConnectorParameters<TServerPortId>& rhs) 
    10096{ 
    101   return lhs.server_port_id == rhs.server_port_id && lhs.reverse_push == rhs.reverse_push && lhs.server_side_conversion == rhs.server_side_conversion; 
     97  return lhs.server_port_id == rhs.server_port_id && lhs.server_side_conversion == rhs.server_side_conversion; 
    10298} 
    10399 
     
    111107inline rrlib::serialization::tOutputStream& operator << (rrlib::serialization::tOutputStream& stream, const tStaticConnectorParameters<TServerPortId>& data) 
    112108{ 
    113   stream << data.server_port_id << data.reverse_push << data.server_side_conversion; 
     109  stream << data.server_port_id << data.server_side_conversion; 
    114110  return stream; 
    115111} 
     
    118114inline rrlib::serialization::tInputStream& operator >> (rrlib::serialization::tInputStream& stream, tStaticConnectorParameters<TServerPortId>& data) 
    119115{ 
    120   stream >> data.server_port_id >> data.reverse_push >> data.server_side_conversion; 
     116  stream >> data.server_port_id >> data.server_side_conversion; 
    121117  return stream; 
    122118} 
Note: See TracChangeset for help on using the changeset viewer.