Changeset 26:9f93ea63d74e in finroc_plugins_network_transport
- Timestamp:
- 08.06.2017 02:40:59 (4 years ago)
- Branch:
- 17.03
- Phase:
- public
- Location:
- generic_protocol
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
generic_protocol/definitions.h
r20 r26 211 211 212 212 213 // Parameters: [connection handle == client port handle] after message: [static connector parameters][dynamic connection data]214 typedef tMessage <tOpCode, tOpCode::CONNECT_PORTS, tMessageSize::VARIABLE_UP_TO_4GB, tFrameworkElementHandle > tConnectPortsMessage;213 // Parameters: [connection handle == client port handle][connect direction: connection for publishing data to server?] after message: [static connector parameters][dynamic connection data] 214 typedef tMessage <tOpCode, tOpCode::CONNECT_PORTS, tMessageSize::VARIABLE_UP_TO_4GB, tFrameworkElementHandle, bool> tConnectPortsMessage; 215 215 216 216 // Parameters: [connection handle == client port handle] after message: [string: error message] -
generic_protocol/tDynamicConnectionData.h
r19 r26 72 72 uint16_t strategy = -1; 73 73 74 /*! bool */75 74 }; 76 75 -
generic_protocol/tNetworkPortInfo.cpp
r23 r26 66 66 // Const values 67 67 //---------------------------------------------------------------------- 68 __thread tNetworkPortInfo* tNetworkPortInfo::current_publishing_network_port = nullptr; 68 69 69 70 //---------------------------------------------------------------------- … … 92 93 if (data_ports::IsDataFlowType(port->GetDataType())) 93 94 { 94 if (port->IsOutputPort()) 95 { 96 values_to_send.SetMaxLength(port->GetFlag(core::tFrameworkElement::tFlag::PUSH_STRATEGY_REVERSE) ? 1 : 0); 97 } 98 else 95 if (!port->IsOutputPort()) 99 96 { 100 97 values_to_send.SetMaxLength(strategy < 0 ? 0 : strategy); -
generic_protocol/tNetworkPortInfo.h
r23 r26 164 164 { 165 165 //FINROC_LOG_PRINT(DEBUG, "Port Changed ", port.GetWrapped()->GetQualifiedName(), " ", rrlib::serialization::Serialize(*value)); 166 if (strategy > 0 )166 if (strategy > 0 && (!(current_publishing_network_port && current_publishing_network_port->connection_handle == -connection_handle && (¤t_publishing_network_port->remote_runtime == &remote_runtime)))) 167 167 { 168 168 remote_runtime.GetLocalRuntimeInfo().OnPortChange(value, this, change_context); … … 293 293 std::unique_ptr<tNetworkPortInfoClient> client_info; 294 294 295 /*! Network port that currently publishes data using the current thread (null if current thread does not publish data from the net) */ 296 static __thread tNetworkPortInfo* current_publishing_network_port; 297 295 298 296 299 virtual void OnManagedDelete() override; -
generic_protocol/tNetworkPortInfoClient.cpp
r19 r26 122 122 { 123 123 tStaticNetworkConnectorParameters static_parameters(used_by_connectors[0]->StaticParameters(), NetworkPortInfo().remote_port_handle); 124 tConnectPortsMessage::Serialize(false, true, stream, port->GetHandle() );124 tConnectPortsMessage::Serialize(false, true, stream, port->GetHandle(), port->IsInputPort()); 125 125 stream << static_parameters << new_connection_data; 126 126 tConnectPortsMessage::FinishMessage(stream); -
generic_protocol/tNetworkTransportPlugin.cpp
r19 r26 201 201 { 202 202 connector.temporary_conversion_port.reset(new data_ports::tGenericPort(remote_runtime.client_ports, rrlib::uri::tURI(connector.StaticParameters().server_port_id.path).ToString(), connector.OwnerPort().GetDataType(), tFlag::ACCEPTS_DATA | tFlag::PUSH_STRATEGY | tFlag::EMITS_DATA | (created_port.GetWrapped()->IsOutputPort() ? tFlag::OUTPUT_PORT : tFlag::PORT)), core::tPortWrapperBase::tDeleter()); 203 connector.temporary_conversion_port->ConnectTo(connector.OwnerPort(), core::tConnectionFlag::NON_PRIMARY_CONNECTOR); // Do this first so that initial pushing works (does not send default first) for remote input ports 203 204 connector.temporary_conversion_port->ConnectTo(connector.temporary_connector_port->GetPort(), core::tConnectOptions(connector.LocalConversionOperations(), core::tConnectionFlag::NON_PRIMARY_CONNECTOR)); 204 connector.temporary_conversion_port->ConnectTo(connector.OwnerPort(), core::tConnectionFlag::NON_PRIMARY_CONNECTOR);205 205 } 206 206 else -
generic_protocol/tRemoteRuntime.cpp
r24 r26 302 302 if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType())) 303 303 { 304 tNetworkPortInfo::current_publishing_network_port = port.second; 304 305 data_ports::tGenericPort generic_port = data_ports::tGenericPort::Wrap(*port.first); 305 306 … … 346 347 while (another_value); 347 348 349 tNetworkPortInfo::current_publishing_network_port = nullptr; 348 350 message.FinishDeserialize(stream); 349 351 connection.received_data_after_last_connect = true; … … 542 544 if (lock.TryLock()) 543 545 { 546 bool publish_connection = message.Get<1>(); 547 bool tool_connection = GetDesiredStructureInfo() != runtime_info::tStructureExchange::SHARED_PORTS; 548 544 549 // Read subscription data 545 550 tStaticNetworkConnectorParameters static_subscription_parameters; … … 556 561 557 562 tFlags flags = tFlag::NETWORK_ELEMENT | tFlag::VOLATILE; 558 if (port->IsOutputPort()) 563 if ((!tool_connection) && port->IsOutputPort() && publish_connection) 564 { 565 throw std::runtime_error("Cannot publish to output ports with basic (SHARED_PORTS) connection"); 566 } 567 if (!publish_connection) 559 568 { 560 569 flags |= tFlag::ACCEPTS_DATA; // create input port … … 562 571 else 563 572 { 564 flags |= tFlag::OUTPUT_PORT | tFlag::EMITS_DATA ; // create output io port565 } 566 if ( GetDesiredStructureInfo() != runtime_info::tStructureExchange::SHARED_PORTS)573 flags |= tFlag::OUTPUT_PORT | tFlag::EMITS_DATA | tFlag::NO_INITIAL_PUSHING; // create output io port 574 } 575 if (tool_connection) 567 576 { 568 577 flags |= tFlag::TOOL_PORT; … … 572 581 flags |= tFlag::PUSH_STRATEGY; 573 582 } 574 if (static_subscription_parameters.reverse_push) 575 { 576 flags |= tFlag::PUSH_STRATEGY_REVERSE; 577 } 578 583 584 std::string port_name = rrlib::uri::tURI(port->GetPath()).ToString() + (publish_connection ? " (Publish)" : " (Subscribe)"); 579 585 core::tAbstractPort* port_to_connect_to = port; 580 586 if (!static_subscription_parameters.server_side_conversion.NoConversion()) … … 583 589 584 590 // Check whether port already exists 585 if ( port->IsOutputPort())591 if (!publish_connection) 586 592 { 587 593 for (auto it = port->OutgoingConnectionsBegin(); it != port->OutgoingConnectionsEnd(); ++it) … … 647 653 648 654 // Create and connect new conversion port 649 data_ports::tGenericPort created_port( rrlib::uri::tURI(port->GetPath()).ToString(), &GetServerPortsElement(), destination_type, tFlag::NETWORK_ELEMENT | tFlag::VOLATILE | tFlag::EMITS_DATA | tFlag::ACCEPTS_DATA | (port->IsOutputPort() ? tFlag::OUTPUT_PORT : tFlag::PORT));650 if (created_port.ConnectTo(port, core::tConnectOptions(conversion, core::tConnectionFlag::NON_PRIMARY_CONNECTOR )))655 data_ports::tGenericPort created_port(port_name + " to " + destination_type.GetName(), &GetServerPortsElement(), destination_type, tFlag::NETWORK_ELEMENT | tFlag::VOLATILE | tFlag::EMITS_DATA | tFlag::ACCEPTS_DATA | (publish_connection ? (tFlag::OUTPUT_PORT | tFlag::NO_INITIAL_PUSHING) : tFlag::PORT) | (tool_connection ? tFlag::TOOL_PORT : tFlag::PORT)); 656 if (created_port.ConnectTo(port, core::tConnectOptions(conversion, core::tConnectionFlag::NON_PRIMARY_CONNECTOR | (publish_connection ? core::tConnectionFlag::DIRECTION_TO_DESTINATION : core::tConnectionFlag::DIRECTION_TO_SOURCE)))) 651 657 { 652 658 created_port.GetWrapped()->EmplaceAnnotation<tServerSideConversionAnnotation>(static_subscription_parameters.server_side_conversion); … … 662 668 } 663 669 664 data_ports::tGenericPort created_port( rrlib::uri::tURI(port->GetPath()).ToString(), &GetServerPortsElement(), &GetServerPortsElement(), port_to_connect_to->GetDataType(), flags);670 data_ports::tGenericPort created_port(port_name, &GetServerPortsElement(), &GetServerPortsElement(), port_to_connect_to->GetDataType(), flags); 665 671 tNetworkPortInfo* network_port_info = new tNetworkPortInfo(*this, message.Get<0>(), message.Get<0>(), dynamic_connection_data.strategy, *created_port.GetWrapped(), port->GetHandle()); 666 672 network_port_info->SetDesiredEncoding(static_subscription_parameters.server_side_conversion.encoding); … … 669 675 created_port.SetPullRequestHandler(this); 670 676 created_port.Init(); 671 created_port.ConnectTo(*port_to_connect_to, core::tConnectionFlag::NON_PRIMARY_CONNECTOR );677 created_port.ConnectTo(*port_to_connect_to, core::tConnectionFlag::NON_PRIMARY_CONNECTOR | (publish_connection ? core::tConnectionFlag::DIRECTION_TO_DESTINATION : core::tConnectionFlag::DIRECTION_TO_SOURCE)); 672 678 server_port_map.emplace(message.Get<0>(), network_port_info); 673 679 FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Created server port ", created_port.GetWrapped()); … … 784 790 data_ports::common::tAbstractDataPort& data_port = static_cast<data_ports::common::tAbstractDataPort&>(*client_port->GetPort()); 785 791 data_port.SetPushStrategy(dynamic_info.strategy > 0); 786 //data_port.SetReversePushStrategy(dynamic_info.flags.Get(tFlag::PUSH_STRATEGY_REVERSE));787 792 //data_port.SetMinNetUpdateIntervalRaw(dynamic_info.min_net_update_time); 788 793 //tNetworkPortInfo* network_port_info = data_port.GetAnnotation<tNetworkPortInfo>(); -
generic_protocol/tStaticConnectorParameters.h
r19 r26 77 77 TServerPortId server_port_id; 78 78 79 /*! Whether to subscribe at input port for reverse pushing */80 bool reverse_push;81 79 82 80 tStaticConnectorParameters() : 83 server_port_id(TServerPortId()), 84 reverse_push(false) 81 server_port_id(TServerPortId()) 85 82 {} 86 83 … … 89 86 tStaticConnectorParameters(const tStaticConnectorParameters<TOtherId>& other, const TServerPortId& this_id) : 90 87 server_side_conversion(other.server_side_conversion), 91 server_port_id(this_id), 92 reverse_push(other.reverse_push) 88 server_port_id(this_id) 93 89 {} 94 90 … … 99 95 inline bool operator==(const tStaticConnectorParameters<TServerPortId>& lhs, const tStaticConnectorParameters<TServerPortId>& rhs) 100 96 { 101 return lhs.server_port_id == rhs.server_port_id && lhs. reverse_push == rhs.reverse_push && lhs.server_side_conversion == rhs.server_side_conversion;97 return lhs.server_port_id == rhs.server_port_id && lhs.server_side_conversion == rhs.server_side_conversion; 102 98 } 103 99 … … 111 107 inline rrlib::serialization::tOutputStream& operator << (rrlib::serialization::tOutputStream& stream, const tStaticConnectorParameters<TServerPortId>& data) 112 108 { 113 stream << data.server_port_id << data. reverse_push << data.server_side_conversion;109 stream << data.server_port_id << data.server_side_conversion; 114 110 return stream; 115 111 } … … 118 114 inline rrlib::serialization::tInputStream& operator >> (rrlib::serialization::tInputStream& stream, tStaticConnectorParameters<TServerPortId>& data) 119 115 { 120 stream >> data.server_port_id >> data. reverse_push >> data.server_side_conversion;116 stream >> data.server_port_id >> data.server_side_conversion; 121 117 return stream; 122 118 }
Note: See TracChangeset
for help on using the changeset viewer.