source: finroc_plugins_network_transport/generic_protocol/tRemoteRuntime.cpp @ 72:d0276f64c6d0

17.03
Last change on this file since 72:d0276f64c6d0 was 72:d0276f64c6d0, checked in by Max Reichardt <max.reichardt@…>, 20 months ago

Replaces thread-local variable with ScopedContext from finroc_plugins_data_ports

File size: 40.2 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/network_transport/generic_protocol/tRemoteRuntime.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2017-02-28
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/network_transport/generic_protocol/tRemoteRuntime.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "core/tRuntimeEnvironment.h"
36#include "plugins/rpc_ports/internal/tRPCInterfaceTypeInfo.h"
37#include "plugins/rpc_ports/tPromise.h"
38
39//----------------------------------------------------------------------
40// Internal includes with ""
41//----------------------------------------------------------------------
42#include "plugins/network_transport/generic_protocol/tNetworkPortInfo.h"
43#include "plugins/network_transport/generic_protocol/tNetworkTransportPlugin.h"
44
45//----------------------------------------------------------------------
46// Debugging
47//----------------------------------------------------------------------
48#include <cassert>
49
50//----------------------------------------------------------------------
51// Namespace usage
52//----------------------------------------------------------------------
53
54//----------------------------------------------------------------------
55// Namespace declaration
56//----------------------------------------------------------------------
57namespace finroc
58{
59namespace network_transport
60{
61namespace generic_protocol
62{
63
64//----------------------------------------------------------------------
65// Forward declarations / typedefs / enums
66//----------------------------------------------------------------------
67
68//----------------------------------------------------------------------
69// Const values
70//----------------------------------------------------------------------
71
72//----------------------------------------------------------------------
73// Implementation
74//----------------------------------------------------------------------
75
76namespace
77{
78
79/*!
80 * Deserialization scope for RPC calls.
81 * Buffer pool is created/provided when needed
82 */
83class tRPCDeserializationScope : public data_ports::api::tDeserializationScope
84{
85public:
86  tRPCDeserializationScope(core::tFrameworkElement::tHandle local_port_handle,
87                           std::map<core::tFrameworkElement::tHandle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>>& rpc_call_buffer_pools) :
88    tDeserializationScope(),
89    local_port_handle(local_port_handle),
90    rpc_call_buffer_pools(rpc_call_buffer_pools)
91  {}
92
93private:
94  virtual data_ports::standard::tMultiTypePortBufferPool& ObtainBufferPool() override
95  {
96    auto it = rpc_call_buffer_pools.find(local_port_handle);
97    if (it == rpc_call_buffer_pools.end())
98    {
99      it = rpc_call_buffer_pools.insert(std::pair<core::tFrameworkElement::tHandle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>>(
100                                          local_port_handle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>(new data_ports::standard::tMultiTypePortBufferPool()))).first;
101    }
102    return *(it->second);
103  }
104
105  core::tFrameworkElement::tHandle local_port_handle;
106  std::map<core::tFrameworkElement::tHandle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>>& rpc_call_buffer_pools;
107};
108
109class tServerSideConversionAnnotation : public core::tAnnotation
110{
111public:
112
113  tServerSideConversionAnnotation(const tServerSideConversionInfo& conversion_info) :
114    conversion_info(conversion_info)
115  {}
116
117  const tServerSideConversionInfo conversion_info;
118};
119
120/*!
121 * \param server_port_map Server port map remote runtime
122 * \param connection_handle Connection handle (== client port handle)
123 * \param server_port Handle is from message to server (connection_handle != port handle)
124 * \return Network connector port associated to connector handle (abstract port and tNetworkPortInfo pointers)
125 */
126std::pair<core::tAbstractPort*, tNetworkPortInfo*> GetNetworkConnectorPort(const std::map<tFrameworkElementHandle, tNetworkPortInfo*>& server_port_map, tHandle connection_handle, bool server_port)
127{
128  typedef std::pair<core::tAbstractPort*, tNetworkPortInfo*> tReturn;
129  if (server_port)
130  {
131    auto it = server_port_map.find(connection_handle);
132    if (it != server_port_map.end())
133    {
134      return tReturn(it->second->GetAnnotated<core::tAbstractPort>(), it->second);
135    }
136  }
137  else
138  {
139    core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(connection_handle);
140    if (port)
141    {
142      tNetworkPortInfo* info = port->GetAnnotation<tNetworkPortInfo>();
143      if (info)
144      {
145        return tReturn(port, info);
146      }
147    }
148  }
149  return tReturn(nullptr, nullptr);
150}
151
152}
153
154
155tRemoteRuntime::tRemoteRuntime(tNetworkTransportPlugin& network_transport, const std::shared_ptr<tConnection>& primary_connection, core::tFrameworkElement& parent, const std::string& name) :
156  tFrameworkElement(&parent, name, tFlag::NETWORK_ELEMENT),
157  network_transport(network_transport),
158  shared_connection_info(primary_connection->shared_connection_info),
159  client_ports(new core::tFrameworkElement(this, "Client Ports", tFlag::NETWORK_ELEMENT | tFlag::AUTO_RENAME)),
160  server_ports(new core::tFrameworkElement(this, "Server Ports", tFlag::NETWORK_ELEMENT | tFlag::AUTO_RENAME)),
161  server_port_map(),
162  remote_port_map(),
163  not_ready_calls(),
164  calls_awaiting_response(),
165  next_call_id(0),
166  pull_calls_awaiting_response(),
167  rpc_call_buffer_pools()
168{
169  connections[0] = primary_connection;
170  FINROC_LOG_PRINT(DEBUG, "Connected to " + GetName());
171}
172
173tRemoteRuntime::~tRemoteRuntime()
174{}
175
176bool tRemoteRuntime::AddConnection(std::shared_ptr<tConnection> connection, bool primary_connection)
177{
178  if ((!primary_connection) && (!connections[0]))
179  {
180    FINROC_LOG_PRINT(WARNING, "Primary connection must be added first");
181    return false;
182  }
183
184  size_t index = primary_connection ? 0 : 1;
185  if (connections[index])
186  {
187    return false;
188  }
189  connections[index] = connection;
190
191  if ((!primary_connection) && (connections[0]->shared_connection_info != connections[1]->shared_connection_info))
192  {
193    connections[1]->shared_connection_info = connections[0]->shared_connection_info;
194  }
195
196  return true;
197}
198
199void tRemoteRuntime::AddRemotePort(network_transport::runtime_info::tRemoteFrameworkElementInfo& info)
200{
201  if (info.static_info.link_count == 0)
202  {
203    FINROC_LOG_PRINT(WARNING, "Remote shared port has no links. Ignoring.");
204    return;
205  }
206  if (!info.static_info.type)
207  {
208    FINROC_LOG_PRINT(WARNING, "Remote shared port '", info.static_info.link_data[0].name, "' has unknown type. Ignoring.");
209    return;
210  }
211  if (remote_port_map.find(info.id.handle) != remote_port_map.end())
212  {
213    FINROC_LOG_PRINT(WARNING, "Received info on remote shared port '", info.static_info.link_data[0].name, "' twice.");
214    return;
215  }
216  auto result = remote_port_map.emplace(info.id.handle, info);
217  if (!result.second)
218  {
219    throw std::runtime_error("Remote port already in remote_port_map");
220  }
221  FINROC_LOG_PRINT(DEBUG, "Received remote shared port info: ", info.static_info.link_data[0].path);
222  network_transport.OnNewRemotePort(*this, *result.first);
223}
224
225void tRemoteRuntime::OnInitialization()
226{
227  network_transport.connected_runtimes.push_back(this);
228}
229
230void tRemoteRuntime::OnManagedDelete()
231{
232  {
233    auto& vector = network_transport.connected_runtimes;
234    vector.erase(std::remove(vector.begin(), vector.end(), this), vector.end());
235  }
236
237  for (auto & connection : connections)
238  {
239    if (connection)
240    {
241      connection->Close();
242      connection.reset();
243    }
244  }
245  not_ready_calls.clear();
246  calls_awaiting_response.clear();
247
248  // Update connectors (important: called before children are deleted)
249  for (auto & port_info : remote_port_map)
250  {
251    for (tNetworkPortInfoClient * client_port : port_info.second.client_ports)
252    {
253      for (tNetworkConnector * connector : client_port->used_by_connectors)
254      {
255        connector->UpdateStatus(core::tUriConnector::tStatus::DISCONNECTED);
256        connector->temporary_connector_port.reset();
257        connector->temporary_conversion_port.reset();
258      }
259      client_port->used_by_connectors.clear();
260    }
261  }
262}
263
264data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject> tRemoteRuntime::OnPullRequest(data_ports::tGenericPort& origin)
265{
266  return network_transport.LocalRuntimeInfo()->OnPullRequest(origin, *this);
267}
268
269void tRemoteRuntime::PortDeleted(tNetworkPortInfo& deleted_port)
270{
271  for (auto & connection : connections)
272  {
273    if (connection)
274    {
275      auto& vector = connection->ports_with_data_to_send;
276      vector.erase(std::remove(vector.begin(), vector.end(), &deleted_port), vector.end());
277    }
278  }
279
280  if (deleted_port.IsServerPort())
281  {
282    size_t erased = server_port_map.erase(deleted_port.GetConnectionHandle());
283    if (!erased)
284    {
285      FINROC_LOG_PRINT(ERROR, "Deleted server port was not im map (This is a programming error)");
286    }
287  }
288}
289
290bool tRemoteRuntime::ProcessMessage(tOpCode opcode, rrlib::serialization::tMemoryBuffer& buffer, tConnection& connection)
291{
292  FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Processing message ", make_builder::GetEnumString(opcode));
293  rrlib::serialization::tInputStream stream(buffer, shared_connection_info->input_stream_prototype);
294
295  if (opcode == tOpCode::PORT_VALUE_CHANGE || opcode == tOpCode::SMALL_PORT_VALUE_CHANGE || opcode == tOpCode::SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP)
296  {
297    tPortValueChange message;
298    message.Deserialize(stream, false);
299    uint8_t flags = message.Get<1>();
300    message_flags::tDataEncoding encoding = static_cast<message_flags::tDataEncoding>(flags & 0x7);
301    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), flags & message_flags::cTO_SERVER);
302    if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType()))
303    {
304      tNetworkPortInfo::tNetworkPortPublishingContext scoped_context(*port.second);
305      data_ports::tGenericPort generic_port = data_ports::tGenericPort::Wrap(*port.first);
306
307      bool another_value = false;
308      do
309      {
310        rrlib::time::tTimestamp timestamp = rrlib::time::cNO_TIME;
311        data_ports::tChangeStatus change_type;
312        stream >> change_type;
313        if (opcode != tOpCode::SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP)
314        {
315          stream >> timestamp;
316        }
317        data_ports::tPortDataPointer<rrlib::rtti::tGenericObject> buffer = generic_port.GetUnusedBuffer();
318        buffer.SetTimestamp(timestamp);
319        if (encoding == message_flags::tDataEncoding::cBINARY_COMPRESSED_ENCODING)
320        {
321          char compression_format_buffer[100];
322          if (stream.ReadString(compression_format_buffer, true) >= sizeof(compression_format_buffer))
323          {
324            FINROC_LOG_PRINT(WARNING, "Compression format string exceeds max length");
325            return false;
326          }
327          if (compression_format_buffer[0])
328          {
329            size_t data_size = stream.ReadInt();
330            size_t data_end_position = stream.GetAbsoluteReadPosition() + data_size;
331            FINROC_LOG_PRINT(WARNING, "Decompressing data from network failed: finroc_plugins_data_compression is superseded by rrlib_rtti_conversion");
332            stream.Seek(data_end_position);
333          }
334          else
335          {
336            buffer->Deserialize(stream);
337            generic_port.BrowserPublish(buffer, false, change_type);
338          }
339        }
340        else
341        {
342          buffer->Deserialize(stream, static_cast<rrlib::serialization::tDataEncoding>(encoding));
343          generic_port.BrowserPublish(buffer, false, change_type);
344        }
345        another_value = stream.ReadBoolean();
346      }
347      while (another_value);
348
349      message.FinishDeserialize(stream);
350      connection.received_data_after_last_connect = true;
351    }
352  }
353  else if (opcode == tOpCode::RPC_CALL)
354  {
355    tRPCCall message;
356    message.Deserialize(stream, false);
357    rpc_ports::tCallType call_type = message.Get<1>();
358
359    const runtime_info::tRemoteType& remote_rpc_interface_type = stream.ReadRegisterEntry<runtime_info::tRemoteType>();
360    uint8_t function_index;
361    stream >> function_index;
362    rrlib::rtti::tType rpc_interface_type = remote_rpc_interface_type.GetLocalDataType();
363    if (!rpc_interface_type)
364    {
365      FINROC_LOG_PRINT(ERROR, "Remote type ", remote_rpc_interface_type.GetName(), " is not known here. Ignoring call.");
366      return false;
367    }
368
369    const rpc_ports::internal::tRPCInterfaceTypeInfo* type_info = rpc_ports::internal::tRPCInterfaceTypeInfo::Get(rpc_interface_type);
370    if ((!type_info) || (!rpc_ports::IsRPCType(rpc_interface_type)))
371    {
372      FINROC_LOG_PRINT(ERROR, "Type ", rpc_interface_type.GetName(), " is no RPC type. Ignoring call.");
373      return false;
374    }
375    FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Received ", make_builder::GetEnumString(call_type));
376
377    if (call_type == rpc_ports::tCallType::RPC_MESSAGE || call_type == rpc_ports::tCallType::RPC_REQUEST)
378    {
379      core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(message.Get<0>());
380      if (port && rpc_interface_type == port->GetDataType())
381      {
382        tRPCDeserializationScope deserialization_scope(message.Get<0>(), rpc_call_buffer_pools);
383        if (call_type == rpc_ports::tCallType::RPC_MESSAGE)
384        {
385          type_info->DeserializeMessage(stream, static_cast<rpc_ports::internal::tRPCPort&>(*port), function_index);
386        }
387        else
388        {
389          type_info->DeserializeRequest(stream, static_cast<rpc_ports::internal::tRPCPort&>(*port), function_index, *this);
390        }
391      }
392    }
393    else // type is RPC response
394    {
395      tCallId call_id;
396      stream >> call_id;
397
398      tCallPointer call_awaiting_this_response;
399      for (auto it = calls_awaiting_response.begin(); it != calls_awaiting_response.end(); ++it)
400      {
401        if (it->second->GetCallId() == call_id)
402        {
403          call_awaiting_this_response = std::move(it->second);
404          calls_awaiting_response.erase(it);
405          break;
406        }
407      }
408      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Call awaiting: ", call_awaiting_this_response.get());
409      if (call_awaiting_this_response)
410      {
411        core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(call_awaiting_this_response->GetLocalPortHandle());
412        if (port)
413        {
414          tRPCDeserializationScope deserialization_scope(call_awaiting_this_response->GetLocalPortHandle(), rpc_call_buffer_pools);
415          type_info->DeserializeResponse(stream, function_index, *this, call_awaiting_this_response.get());
416          message.FinishDeserialize(stream);
417          return false;
418        }
419      }
420      call_awaiting_this_response.reset();
421      type_info->DeserializeResponse(stream, function_index, *this, call_awaiting_this_response.get());
422    }
423    message.FinishDeserialize(stream);
424  }
425  else if (opcode == tOpCode::PULLCALL)
426  {
427    tPullCall message;
428    message.Deserialize(stream);
429    uint8_t flags = message.Get<2>();
430    message_flags::tDataEncoding encoding = static_cast<message_flags::tDataEncoding>(flags & 0x7);
431    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), flags & message_flags::cTO_SERVER);
432    rrlib::serialization::tOutputStream& write_stream = GetConnection(flags & message_flags::cHIGH_PRIORITY)->CurrentWriteStream();
433    if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType()))
434    {
435      tPullCallReturn::Serialize(false, true, write_stream, message.Get<1>(), false);
436      data_ports::tGenericPort data_port = data_ports::tGenericPort::Wrap(*port.first);
437      data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject> pulled_buffer = data_port.GetPointer(data_ports::tStrategy::PULL_IGNORING_HANDLER_ON_THIS_PORT);
438      write_stream << pulled_buffer->GetType() << pulled_buffer.GetTimestamp();
439      pulled_buffer->Serialize(write_stream, static_cast<rrlib::serialization::tDataEncoding>(encoding));
440      tPullCallReturn::FinishMessage(write_stream);
441    }
442    else
443    {
444      tPullCallReturn::Serialize(true, true, write_stream, message.Get<1>(), true);
445    }
446  }
447  else if (opcode == tOpCode::PULLCALL_RETURN)
448  {
449    tPullCallReturn message;
450    message.Deserialize(stream, false);
451
452    for (auto it = pull_calls_awaiting_response.begin(); it != pull_calls_awaiting_response.end(); ++it)
453    {
454      if (it->call_id == message.Get<0>())
455      {
456        bool failed = message.Get<1>();
457        core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(it->local_port_handle);
458        if ((!failed) && port && port->IsReady() && data_ports::IsDataFlowType(port->GetDataType()))
459        {
460          rrlib::rtti::tType data_type;
461          rrlib::time::tTimestamp timestamp;
462          stream >> data_type >> timestamp;
463
464          data_ports::tGenericPort data_port = data_ports::tGenericPort::Wrap(*port);
465          data_ports::tPortDataPointer<rrlib::rtti::tGenericObject> pulled_buffer = data_port.GetUnusedBuffer();
466          if (pulled_buffer->GetType() != data_type)
467          {
468            FINROC_LOG_PRINT(WARNING, "Port data pulled via ", port, " has invalid type.");
469            it->promise->SetException(rpc_ports::tFutureStatus::INVALID_DATA_RECEIVED);
470          }
471          else
472          {
473            pulled_buffer.SetTimestamp(timestamp);
474            pulled_buffer->Deserialize(stream);
475            message.FinishDeserialize(stream);
476            it->promise->SetValue(std::move(pulled_buffer));
477          }
478        }
479        else
480        {
481          it->promise->SetException(rpc_ports::tFutureStatus::NO_CONNECTION);
482        }
483        pull_calls_awaiting_response.erase(it); // remove pull call from list
484        break;
485      }
486    }
487  }
488  else if (opcode == tOpCode::UPDATE_CONNECTION)
489  {
490    tUpdateConnectionMessage message;
491    message.Deserialize(stream);
492
493    // Get server port
494    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), true);
495    if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType()))
496    {
497      tDynamicConnectionData data;
498      data.minimal_update_interval = message.Get<1>();
499      data.high_priority = message.Get<2>();
500      data.strategy = message.Get<3>();
501      port.second->SetServerSideDynamicConnectionData(data);
502      bool push_strategy = data.strategy > 0;
503      data_ports::common::tAbstractDataPort& data_port = static_cast<data_ports::common::tAbstractDataPort&>(*port.first);
504      if (data_port.PushStrategy() != push_strategy)
505      {
506        // flags need to be changed
507        rrlib::thread::tLock lock(GetStructureMutex(), false);
508        if (lock.TryLock())
509        {
510          if (data_port.PushStrategy() != push_strategy)
511          {
512            data_port.SetPushStrategy(push_strategy);
513          }
514        }
515        else
516        {
517          return true; // We could not obtain lock - try again later
518        }
519      }
520    }
521    else
522    {
523      FINROC_LOG_PRINT(WARNING, "Cannot find connection to update (requested handle: ", message.Get<0>(), ")");
524    }
525  }
526
527  else if (opcode == tOpCode::CONNECT_PORTS)
528  {
529    tConnectPortsMessage message;
530    message.Deserialize(stream, false);
531
532    try
533    {
534      // Get or create server port
535      auto it = server_port_map.find(message.Get<0>());
536      if (it != server_port_map.end())
537      {
538        throw std::runtime_error("Connection handle already occupied. Ignoring CONNECT_PORTS message.");
539      }
540      else
541      {
542        rrlib::thread::tLock lock(GetStructureMutex(), false);
543        if (lock.TryLock())
544        {
545          bool publish_connection = message.Get<1>();
546          bool tool_connection = GetDesiredStructureInfo() != runtime_info::tStructureExchange::SHARED_PORTS;
547
548          // Read subscription data
549          tStaticNetworkConnectorParameters static_subscription_parameters;
550          tDynamicConnectionData dynamic_connection_data;
551          stream >> static_subscription_parameters >> dynamic_connection_data;
552          message.FinishDeserialize(stream);
553
554          // Create server port
555          core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(static_subscription_parameters.server_port_id);
556          if ((!port) || (!port->IsReady()))
557          {
558            throw std::runtime_error("Port for subscription not available");
559          }
560
561          tFlags flags = tFlag::NETWORK_ELEMENT | tFlag::VOLATILE;
562          if ((!tool_connection) && port->IsOutputPort() && publish_connection)
563          {
564            throw std::runtime_error("Cannot publish to output ports with basic (SHARED_PORTS) connection");
565          }
566          if (!publish_connection)
567          {
568            flags |= tFlag::ACCEPTS_DATA; // create input port
569          }
570          else
571          {
572            flags |= tFlag::OUTPUT_PORT | tFlag::EMITS_DATA | tFlag::NO_INITIAL_PUSHING; // create output io port
573          }
574          if (tool_connection)
575          {
576            flags |= tFlag::TOOL_PORT;
577          }
578          if (dynamic_connection_data.strategy > 0)
579          {
580            flags |= tFlag::PUSH_STRATEGY;
581          }
582
583          std::string port_name = rrlib::uri::tURI(port->GetPath()).ToString() + (publish_connection ? " (Publish)" : " (Subscribe)");
584          core::tAbstractPort* port_to_connect_to = port;
585          if (!static_subscription_parameters.server_side_conversion.NoConversion())
586          {
587            port_to_connect_to = nullptr;
588
589            // Check whether port already exists
590            if (!publish_connection)
591            {
592              for (auto it = port->OutgoingConnectionsBegin(); it != port->OutgoingConnectionsEnd(); ++it)
593              {
594                tServerSideConversionAnnotation* info = it->Destination().GetAnnotation<tServerSideConversionAnnotation>();
595                if (info && info->conversion_info == static_subscription_parameters.server_side_conversion)
596                {
597                  port_to_connect_to = &it->Destination();
598                }
599              }
600            }
601            else
602            {
603              for (auto it = port->IncomingConnectionsBegin(); it != port->IncomingConnectionsEnd(); ++it)
604              {
605                tServerSideConversionAnnotation* info = it->Source().GetAnnotation<tServerSideConversionAnnotation>();
606                if (info && info->conversion_info == static_subscription_parameters.server_side_conversion)
607                {
608                  port_to_connect_to = &it->Source();
609                }
610              }
611            }
612
613            if (!port_to_connect_to)
614            {
615              // Resolve conversion
616              rrlib::rtti::tType destination_type = rrlib::rtti::tType::FindType(static_subscription_parameters.server_side_conversion.destination_type);
617              if (!destination_type)
618              {
619                throw std::runtime_error("Server-side conversion to unknown type" + static_subscription_parameters.server_side_conversion.destination_type);
620              }
621              size_t size = static_subscription_parameters.server_side_conversion.operation_1.length() == 0 ? 0 : (static_subscription_parameters.server_side_conversion.operation_2.length() == 0 ? 1 : 2);
622              rrlib::rtti::tType intermediate_type;
623              if (size == 2 || static_subscription_parameters.server_side_conversion.intermediate_type.length())
624              {
625                intermediate_type = rrlib::rtti::tType::FindType(static_subscription_parameters.server_side_conversion.intermediate_type);
626                if (!intermediate_type)
627                {
628                  throw std::runtime_error("Server-side conversion with unknown type " + static_subscription_parameters.server_side_conversion.intermediate_type);
629                }
630              }
631
632              const rrlib::rtti::conversion::tRegisteredConversionOperation* operation1 = nullptr;
633              const rrlib::rtti::conversion::tRegisteredConversionOperation* operation2 = nullptr;
634              if (size >= 1)
635              {
636                operation1 = &rrlib::rtti::conversion::tRegisteredConversionOperation::Find(static_subscription_parameters.server_side_conversion.operation_1, port->GetDataType(), size == 1 ? destination_type : intermediate_type);
637                if (size >= 2)
638                {
639                  operation2 = &rrlib::rtti::conversion::tRegisteredConversionOperation::Find(static_subscription_parameters.server_side_conversion.operation_2, intermediate_type, destination_type);
640                }
641              }
642
643              rrlib::rtti::conversion::tConversionOperationSequence conversion;
644              if (size == 1)
645              {
646                conversion = rrlib::rtti::conversion::tConversionOperationSequence(*operation1, intermediate_type);
647              }
648              else if (size == 2)
649              {
650                conversion = rrlib::rtti::conversion::tConversionOperationSequence(*operation1, *operation2, intermediate_type);
651              }
652
653              // Create and connect new conversion port
654              data_ports::tGenericPort created_port(port_name + " to " + destination_type.GetName(), &GetServerPortsElement(), destination_type, tFlag::NETWORK_ELEMENT | tFlag::VOLATILE | tFlag::EMITS_DATA | tFlag::ACCEPTS_DATA | (publish_connection ? (tFlag::OUTPUT_PORT | tFlag::NO_INITIAL_PUSHING) : tFlag::PORT) | (tool_connection ? tFlag::TOOL_PORT : tFlag::PORT));
655              if (created_port.ConnectTo(port, core::tConnectOptions(conversion, core::tConnectionFlag::NON_PRIMARY_CONNECTOR | (publish_connection ? core::tConnectionFlag::DIRECTION_TO_DESTINATION : core::tConnectionFlag::DIRECTION_TO_SOURCE))))
656              {
657                created_port.GetWrapped()->EmplaceAnnotation<tServerSideConversionAnnotation>(static_subscription_parameters.server_side_conversion);
658                created_port.Init();
659                port_to_connect_to = created_port.GetWrapped();
660              }
661              else
662              {
663                created_port.ManagedDelete();
664                throw std::runtime_error("Conversion could not be applied");
665              }
666            }
667          }
668
669          data_ports::tGenericPort created_port(port_name, &GetServerPortsElement(), &GetServerPortsElement(), port_to_connect_to->GetDataType(), flags);
670          tNetworkPortInfo* network_port_info = new tNetworkPortInfo(*this, message.Get<0>(), message.Get<0>(), dynamic_connection_data.strategy, *created_port.GetWrapped(), port->GetHandle());
671          network_port_info->SetDesiredEncoding(static_subscription_parameters.server_side_conversion.encoding);
672          network_port_info->SetServerSideDynamicConnectionData(dynamic_connection_data);
673          created_port.AddPortListenerForPointer(*network_port_info);
674          created_port.SetPullRequestHandler(this);
675          created_port.Init();
676          created_port.ConnectTo(*port_to_connect_to, core::tConnectionFlag::NON_PRIMARY_CONNECTOR | (publish_connection ? core::tConnectionFlag::DIRECTION_TO_DESTINATION : core::tConnectionFlag::DIRECTION_TO_SOURCE));
677          server_port_map.emplace(message.Get<0>(), network_port_info);
678          FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Created server port ", created_port.GetWrapped());
679        }
680        else
681        {
682          return true; // We could not obtain lock - try again later
683        }
684      }
685    }
686    catch (const std::exception& e)
687    {
688      FINROC_LOG_PRINT(WARNING, "Error connecting ports (notifying client also): ", e.what());
689      auto& stream = GetPrimaryConnection()->CurrentWriteStream();
690      tConnectPortsErrorMessage::Serialize(false, true, stream, message.Get<0>());
691      stream << e.what();
692      tConnectPortsErrorMessage::FinishMessage(stream);
693    }
694  }
695  else if (opcode == tOpCode::DISCONNECT_PORTS)
696  {
697    tDisconnectPortsMessage message;
698    message.Deserialize(stream);
699    auto it = server_port_map.find(message.Get<0>());
700    if (it != server_port_map.end())
701    {
702      rrlib::thread::tLock lock(GetStructureMutex(), false);
703      if (lock.TryLock())
704      {
705        // delete conversion ports
706        core::tAbstractPort& network_port = *it->second->GetAnnotated<core::tAbstractPort>();
707        if (network_port.IsOutputPort())
708        {
709          for (auto connector = network_port.OutgoingConnectionsBegin(); connector != network_port.OutgoingConnectionsEnd(); ++connector)
710          {
711            tServerSideConversionAnnotation* info = connector->Destination().GetAnnotation<tServerSideConversionAnnotation>();
712            if (info && connector->Destination().CountIncomingConnections() == 1)
713            {
714              connector->Destination().ManagedDelete();
715            }
716          }
717        }
718        else
719        {
720          for (auto connector = network_port.IncomingConnectionsBegin(); connector != network_port.IncomingConnectionsEnd(); ++connector)
721          {
722            tServerSideConversionAnnotation* info = connector->Source().GetAnnotation<tServerSideConversionAnnotation>();
723            if (info && connector->Source().CountOutgoingConnections() == 1)
724            {
725              connector->Source().ManagedDelete();
726            }
727          }
728        }
729
730        // Delete port (this will also
731        network_port.ManagedDelete();
732      }
733      else
734      {
735        return true; // We could not obtain lock - try again later
736      }
737    }
738    else
739    {
740      FINROC_LOG_PRINT(DEBUG_WARNING, "Port for disconnecting not available (", message.Get<0>(), ")");
741      return false;
742    }
743  }
744  else if (opcode == tOpCode::TYPE_UPDATE)
745  {
746    tTypeUpdateMessage message;
747    message.Deserialize(stream, false);
748    rrlib::rtti::tType type;
749    stream >> type;
750    stream.ReadShort(); // Discard remote network update time default for data type (legacy)
751    message.FinishDeserialize(stream);
752  }
753  else if (opcode == tOpCode::STRUCTURE_CREATED)
754  {
755    rrlib::thread::tLock lock(GetStructureMutex(), false);
756    if (lock.TryLock())
757    {
758      tStructureCreatedMessage message;
759      message.Deserialize(stream, false);
760      network_transport::runtime_info::tRemoteFrameworkElementInfo framework_element_info;
761      framework_element_info.id.handle = message.Get<0>();
762      stream >> framework_element_info;
763      message.FinishDeserialize(stream);
764      AddRemotePort(framework_element_info);
765    }
766    else
767    {
768      return true; // We could not obtain lock - try again later
769    }
770  }
771  else if (opcode == tOpCode::STRUCTURE_CHANGED)
772  {
773    rrlib::thread::tLock lock(GetStructureMutex(), false);
774    if (lock.TryLock())
775    {
776      tStructureChangedMessage message;
777      message.Deserialize(stream, false);
778      if (stream.GetSourceInfo().revision == 0)
779      {
780        stream.ReadInt(); // new flags (flag changes are obsolete)
781      }
782      runtime_info::tRemoteFrameworkElementInfo::tDynamicInfo dynamic_info;
783      stream >> dynamic_info;
784      message.FinishDeserialize(stream);
785
786      auto port_to_change = remote_port_map.find(message.Get<0>());
787      if (port_to_change != remote_port_map.end())
788      {
789        //port_to_change
790        port_to_change->second.dynamic_info.strategy = dynamic_info.strategy;
791        for (tNetworkPortInfoClient * client_port : port_to_change->second.client_ports)
792        {
793          data_ports::common::tAbstractDataPort& data_port = static_cast<data_ports::common::tAbstractDataPort&>(*client_port->GetPort());
794          data_port.SetPushStrategy(dynamic_info.strategy > 0);
795          //data_port.SetMinNetUpdateIntervalRaw(dynamic_info.min_net_update_time);
796          //tNetworkPortInfo* network_port_info = data_port.GetAnnotation<tNetworkPortInfo>();
797          client_port->NetworkPortInfo().current_dynamic_connection_data.strategy = dynamic_info.strategy;
798          client_port->NetworkPortInfo().ChangeStrategy(dynamic_info.strategy);
799        }
800      }
801      else
802      {
803        FINROC_LOG_PRINT(WARNING, "There is no port to change with handle ", message.Get<0>());
804      }
805    }
806    else
807    {
808      return true; // We could not obtain lock - try again later
809    }
810  }
811  else if (opcode == tOpCode::STRUCTURE_DELETED)
812  {
813    rrlib::thread::tLock lock(GetStructureMutex(), false);
814    if (lock.TryLock())
815    {
816      tStructureDeletedMessage message;
817      message.Deserialize(stream);
818
819      auto port_to_delete = remote_port_map.find(message.Get<0>());
820      if (port_to_delete != remote_port_map.end())
821      {
822        for (tNetworkPortInfoClient * client_port : port_to_delete->second.client_ports)
823        {
824          for (tNetworkConnector * connector : client_port->used_by_connectors)
825          {
826            connector->UpdateStatus(core::tUriConnector::tStatus::DISCONNECTED);
827            connector->temporary_connector_port.reset();
828            connector->temporary_conversion_port.reset();
829          }
830        }
831        remote_port_map.erase(message.Get<0>());
832      }
833      else
834      {
835        FINROC_LOG_PRINT(WARNING, "There is no port to delete with handle ", message.Get<0>());
836      }
837    }
838    else
839    {
840      return true; // We could not obtain lock - try again later
841    }
842  }
843  else if (opcode == tOpCode::CONNECT_PORTS_ERROR)
844  {
845    tConnectPortsErrorMessage message;
846    message.Deserialize(stream, false);
847    std::string error = stream.ReadString();
848    message.FinishDeserialize(stream);
849
850    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), false);
851    if (port.first && port.first->IsReady() && port.second->GetClientInfo())
852    {
853      FINROC_LOG_PRINT(WARNING, "Could not connect to remote port '", port.second->GetClientInfo()->GetPort()->GetName(), "'. Reason: ", error);
854      port.second->GetClientInfo()->connected = false;
855      for (tNetworkConnector * connector : port.second->GetClientInfo()->used_by_connectors)
856      {
857        connector->UpdateStatus(core::tUriConnector::tStatus::ERROR);
858        connector->temporary_connector_port.reset();
859        connector->temporary_conversion_port.reset();
860      }
861    }
862    else
863    {
864      FINROC_LOG_PRINT(WARNING, "Received CONNECT_PORTS_ERROR message for unknown connection handle");
865    }
866  }
867  else if (opcode == tOpCode::SUBSCRIBE_LEGACY || opcode == tOpCode::UNSUBSCRIBE_LEGACY)
868  {
869    FINROC_LOG_PRINT(WARNING, "OpCode ", make_builder::GetEnumString(opcode), " is superseded and no longer served by this peer");
870    throw std::runtime_error("Superseded OpCode");
871  }
872  else
873  {
874    FINROC_LOG_PRINT(WARNING, "OpCode ", make_builder::GetEnumString(opcode), " is not served by this peer");
875    throw std::runtime_error("Invalid OpCode");
876  }
877
878  return false;
879}
880
881void tRemoteRuntime::ProcessStructurePacket(rrlib::serialization::tInputStream& stream)
882{
883  try
884  {
885    const runtime_info::tRemoteType& type = stream.ReadRegisterEntry<runtime_info::tRemoteType>();
886    if (type.GetName() != rrlib::rtti::tDataType<std::string>().GetName())
887    {
888      FINROC_LOG_PRINT(ERROR, "Type encoding does not seem to work");
889      return;
890    }
891
892    network_transport::runtime_info::tRemoteFrameworkElementInfo info;
893    while (stream.Remaining())
894    {
895      stream >> info.id.handle;
896      stream >> info;
897      AddRemotePort(info);
898    }
899  }
900  catch (const std::exception& e)
901  {
902    FINROC_LOG_PRINT(ERROR, "Error processing structure packet:", e);
903  }
904}
905
906void tRemoteRuntime::PublishStructureChange(const tLocalRuntimeInfo::tStructureChangeEventToPublish& structure_change_event)
907{
908  if (shared_connection_info->initial_reading_complete && shared_connection_info->initial_writing_complete && shared_connection_info->output_stream_prototype.GetTargetInfo().revision != 0) // do not publish to legacy runtimes
909  {
910    if (shared_connection_info->initial_structure_writing_complete || structure_change_event.local_handle < shared_connection_info->framework_elements_in_full_structure_exchange_sent_until_handle)
911    {
912      structure_change_event.WriteToStream(GetPrimaryConnection()->CurrentWriteStream());
913    }
914  }
915}
916
917void tRemoteRuntime::SendCall(tCallPointer& call_to_send, const rrlib::time::tTimestamp& time_now)
918{
919  if (!call_to_send->ReadyForSending())
920  {
921    //FINROC_LOG_PRINT(ERROR, "Emplacing ", call_to_send.get());
922    not_ready_calls.emplace_back(std::move(call_to_send));
923  }
924  else
925  {
926    SendCallImplementation(call_to_send, time_now);
927  }
928}
929
930void tRemoteRuntime::SendCallImplementation(tCallPointer& call_to_send, const rrlib::time::tTimestamp& time_now)
931{
932  tConnection& connection = *GetExpressConnection();  // TODO: Should there be a possibility to specify which connection to use RPC calls? problem: message does not
933
934  FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Sending Call ", make_builder::GetEnumString(call_to_send->GetCallType()));
935  bool expects_response = call_to_send->ExpectsResponse();
936  if (expects_response)
937  {
938    call_to_send->SetCallId(next_call_id);
939    next_call_id++;
940  }
941  tRPCCall::Serialize(false, true, connection.CurrentWriteStream(), call_to_send->GetRemotePortHandle(), call_to_send->GetCallType());
942  call_to_send->GetCall()->Serialize(connection.CurrentWriteStream());
943  tRPCCall::FinishMessage(connection.CurrentWriteStream());
944  if (expects_response)
945  {
946    rrlib::time::tDuration timeout = call_to_send->ResponseTimeout();
947    calls_awaiting_response.emplace_back(time_now + timeout, std::move(call_to_send));
948  }
949}
950
951void tRemoteRuntime::SendPendingMessages(const rrlib::time::tTimestamp& time_now)
952{
953  if ((!GetPrimaryConnection()) || GetPrimaryConnection()->IsClosed())
954  {
955    return;
956  }
957
958  for (auto it = not_ready_calls.begin(); it < not_ready_calls.end();)
959  {
960    if ((*it)->ReadyForSending())
961    {
962      tCallPointer call_pointer = std::move(*it);
963      SendCallImplementation(call_pointer, time_now);
964      it = not_ready_calls.erase(it);
965    }
966    else
967    {
968      ++it;
969    }
970  }
971  for (auto it = calls_awaiting_response.begin(); it < calls_awaiting_response.end();)
972  {
973    if (time_now > it->first) // Did call time out?
974    {
975      it = calls_awaiting_response.erase(it);
976    }
977    else
978    {
979      ++it;
980    }
981  }
982  for (auto it = pull_calls_awaiting_response.begin(); it < pull_calls_awaiting_response.end();)
983  {
984    if (time_now > it->timeout_time) // Did call time out?
985    {
986      it = pull_calls_awaiting_response.erase(it);
987    }
988    else
989    {
990      ++it;
991    }
992  }
993
994  for (auto & connection : connections)
995  {
996    if (connection && (!connection->IsClosed()))
997    {
998      connection->SendPendingMessages(time_now);
999    }
1000  }
1001}
1002
1003void tRemoteRuntime::SendPullRequest(tLocalRuntimeInfo::tPullCallInfo& pull_call_info)
1004{
1005  // We do this here, because this is the TCP thread now (and next_call_id is not thread-safe)
1006  pull_call_info.call_id = next_call_id;
1007  next_call_id++;
1008  pull_calls_awaiting_response.push_back(pull_call_info);
1009
1010  // Send call
1011  rrlib::serialization::tOutputStream& stream = GetExpressConnection()->CurrentWriteStream();  // pull request is small -> we can always use express connection
1012  bool legacy = stream.GetTargetInfo().revision == 0;
1013  uint8_t message_flags = legacy ? static_cast<uint8_t>(message_flags::cBINARY_ENCODING) : pull_call_info.message_flags;
1014  tPullCall::Serialize(true, true, stream, legacy ? pull_call_info.remote_port_handle : pull_call_info.connection_handle, pull_call_info.call_id, message_flags);
1015  // Send immediately? (pull calls are somewhat outdated -> no); ex-code: this->SendPendingMessages(rrlib::time::Now(true));
1016}
1017
1018void tRemoteRuntime::SendResponse(typename tResponseSender::tCallPointer && response_to_send)
1019{
1020  if (response_to_send->ReadyForSending())
1021  {
1022    rrlib::time::tTimestamp time_now = rrlib::time::Now();
1023    SendCallImplementation(response_to_send, time_now);
1024    SendPendingMessages(time_now);
1025  }
1026  else
1027  {
1028    not_ready_calls.emplace_back(std::move(response_to_send));
1029  }
1030}
1031
1032//----------------------------------------------------------------------
1033// End of namespace declaration
1034//----------------------------------------------------------------------
1035}
1036}
1037}
Note: See TracBrowser for help on using the repository browser.