source: finroc_plugins_network_transport/generic_protocol/tRemoteRuntime.cpp @ 24:cddabeebd261

17.03
Last change on this file since 24:cddabeebd261 was 24:cddabeebd261, checked in by Max Reichardt <mreichardt@…>, 7 years ago

Fixes robustness issue (potential crash) with closed connections and improves inline documentation

File size: 39.3 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/network_transport/generic_protocol/tRemoteRuntime.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2017-02-28
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/network_transport/generic_protocol/tRemoteRuntime.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "core/tRuntimeEnvironment.h"
36#include "plugins/rpc_ports/internal/tRPCInterfaceTypeInfo.h"
37#include "plugins/rpc_ports/tPromise.h"
38
39//----------------------------------------------------------------------
40// Internal includes with ""
41//----------------------------------------------------------------------
42#include "plugins/network_transport/generic_protocol/tNetworkPortInfo.h"
43#include "plugins/network_transport/generic_protocol/tNetworkTransportPlugin.h"
44
45//----------------------------------------------------------------------
46// Debugging
47//----------------------------------------------------------------------
48#include <cassert>
49
50//----------------------------------------------------------------------
51// Namespace usage
52//----------------------------------------------------------------------
53
54//----------------------------------------------------------------------
55// Namespace declaration
56//----------------------------------------------------------------------
57namespace finroc
58{
59namespace network_transport
60{
61namespace generic_protocol
62{
63
64//----------------------------------------------------------------------
65// Forward declarations / typedefs / enums
66//----------------------------------------------------------------------
67
68//----------------------------------------------------------------------
69// Const values
70//----------------------------------------------------------------------
71
72//----------------------------------------------------------------------
73// Implementation
74//----------------------------------------------------------------------
75
76namespace
77{
78
79/*!
80 * Deserialization scope for RPC calls.
81 * Buffer pool is created/provided when needed
82 */
83class tRPCDeserializationScope : public data_ports::api::tDeserializationScope
84{
85public:
86  tRPCDeserializationScope(core::tFrameworkElement::tHandle local_port_handle,
87                           std::map<core::tFrameworkElement::tHandle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>>& rpc_call_buffer_pools) :
88    tDeserializationScope(),
89    local_port_handle(local_port_handle),
90    rpc_call_buffer_pools(rpc_call_buffer_pools)
91  {}
92
93private:
94  virtual data_ports::standard::tMultiTypePortBufferPool& ObtainBufferPool() override
95  {
96    auto it = rpc_call_buffer_pools.find(local_port_handle);
97    if (it == rpc_call_buffer_pools.end())
98    {
99      it = rpc_call_buffer_pools.insert(std::pair<core::tFrameworkElement::tHandle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>>(
100                                          local_port_handle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>(new data_ports::standard::tMultiTypePortBufferPool()))).first;
101    }
102    return *(it->second);
103  }
104
105  core::tFrameworkElement::tHandle local_port_handle;
106  std::map<core::tFrameworkElement::tHandle, std::unique_ptr<data_ports::standard::tMultiTypePortBufferPool>>& rpc_call_buffer_pools;
107};
108
109class tServerSideConversionAnnotation : public core::tAnnotation
110{
111public:
112
113  tServerSideConversionAnnotation(const tServerSideConversionInfo& conversion_info) :
114    conversion_info(conversion_info)
115  {}
116
117  const tServerSideConversionInfo conversion_info;
118};
119
120/*!
121 * \param server_port_map Server port map remote runtime
122 * \param connection_handle Connection handle (== client port handle)
123 * \param server_port Handle is from message to server (connection_handle != port handle)
124 * \return Network connector port associated to connector handle (abstract port and tNetworkPortInfo pointers)
125 */
126std::pair<core::tAbstractPort*, tNetworkPortInfo*> GetNetworkConnectorPort(const std::map<tFrameworkElementHandle, tNetworkPortInfo*>& server_port_map, tHandle connection_handle, bool server_port)
127{
128  typedef std::pair<core::tAbstractPort*, tNetworkPortInfo*> tReturn;
129  if (server_port)
130  {
131    auto it = server_port_map.find(connection_handle);
132    if (it != server_port_map.end())
133    {
134      return tReturn(it->second->GetAnnotated<core::tAbstractPort>(), it->second);
135    }
136  }
137  else
138  {
139    core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(connection_handle);
140    if (port)
141    {
142      tNetworkPortInfo* info = port->GetAnnotation<tNetworkPortInfo>();
143      if (info)
144      {
145        return tReturn(port, info);
146      }
147    }
148  }
149  return tReturn(nullptr, nullptr);
150}
151
152}
153
154
155tRemoteRuntime::tRemoteRuntime(tNetworkTransportPlugin& network_transport, const std::shared_ptr<tConnection>& primary_connection, core::tFrameworkElement& parent, const std::string& name) :
156  tFrameworkElement(&parent, name, tFlag::NETWORK_ELEMENT),
157  network_transport(network_transport),
158  shared_connection_info(primary_connection->shared_connection_info),
159  client_ports(new core::tFrameworkElement(this, "Client Ports", tFlag::NETWORK_ELEMENT | tFlag::AUTO_RENAME)),
160  server_ports(new core::tFrameworkElement(this, "Server Ports", tFlag::NETWORK_ELEMENT | tFlag::AUTO_RENAME)),
161  server_port_map(),
162  remote_port_map(),
163  not_ready_calls(),
164  calls_awaiting_response(),
165  next_call_id(0),
166  pull_calls_awaiting_response(),
167  rpc_call_buffer_pools()
168{
169  connections[0] = primary_connection;
170  FINROC_LOG_PRINT(DEBUG, "Connected to " + GetName());
171}
172
173tRemoteRuntime::~tRemoteRuntime()
174{}
175
176bool tRemoteRuntime::AddConnection(std::shared_ptr<tConnection> connection, bool primary_connection)
177{
178  if ((!primary_connection) && (!connections[0]))
179  {
180    FINROC_LOG_PRINT(WARNING, "Primary connection must be added first");
181    return false;
182  }
183
184  size_t index = primary_connection ? 0 : 1;
185  if (connections[index])
186  {
187    return false;
188  }
189  connections[index] = connection;
190
191  if ((!primary_connection) && (connections[0]->shared_connection_info != connections[1]->shared_connection_info))
192  {
193    connections[1]->shared_connection_info = connections[0]->shared_connection_info;
194  }
195
196  return true;
197}
198
199void tRemoteRuntime::AddRemotePort(network_transport::runtime_info::tRemoteFrameworkElementInfo& info)
200{
201  if (info.static_info.link_count == 0)
202  {
203    FINROC_LOG_PRINT(WARNING, "Remote shared port has no links. Ignoring.");
204    return;
205  }
206  if (!info.static_info.type)
207  {
208    FINROC_LOG_PRINT(WARNING, "Remote shared port '", info.static_info.link_data[0].name, "' has unknown type. Ignoring.");
209    return;
210  }
211  if (remote_port_map.find(info.id.handle) != remote_port_map.end())
212  {
213    FINROC_LOG_PRINT(WARNING, "Received info on remote shared port '", info.static_info.link_data[0].name, "' twice.");
214    return;
215  }
216  auto result = remote_port_map.emplace(info.id.handle, info);
217  if (!result.second)
218  {
219    throw std::runtime_error("Remote port already in remote_port_map");
220  }
221  FINROC_LOG_PRINT(DEBUG, "Received remote shared port info: ", info.static_info.link_data[0].path);
222  network_transport.OnNewRemotePort(*this, *result.first);
223}
224
225void tRemoteRuntime::OnInitialization()
226{
227  network_transport.connected_runtimes.push_back(this);
228}
229
230void tRemoteRuntime::OnManagedDelete()
231{
232  {
233    auto& vector = network_transport.connected_runtimes;
234    vector.erase(std::remove(vector.begin(), vector.end(), this), vector.end());
235  }
236
237  for (auto & connection : connections)
238  {
239    if (connection)
240    {
241      connection->Close();
242      connection.reset();
243    }
244  }
245  not_ready_calls.clear();
246  calls_awaiting_response.clear();
247
248  // Update connectors (important: called before children are deleted)
249  for (auto & port_info : remote_port_map)
250  {
251    for (tNetworkPortInfoClient * client_port : port_info.second.client_ports)
252    {
253      for (tNetworkConnector * connector : client_port->used_by_connectors)
254      {
255        connector->UpdateStatus(core::tUriConnector::tStatus::DISCONNECTED);
256        connector->temporary_connector_port.reset();
257        connector->temporary_conversion_port.reset();
258      }
259      client_port->used_by_connectors.clear();
260    }
261  }
262}
263
264data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject> tRemoteRuntime::OnPullRequest(data_ports::tGenericPort& origin)
265{
266  return network_transport.LocalRuntimeInfo()->OnPullRequest(origin, *this);
267}
268
269void tRemoteRuntime::PortDeleted(tNetworkPortInfo& deleted_port)
270{
271  for (auto & connection : connections)
272  {
273    if (connection)
274    {
275      auto& vector = connection->ports_with_data_to_send;
276      vector.erase(std::remove(vector.begin(), vector.end(), &deleted_port), vector.end());
277    }
278  }
279
280  if (deleted_port.IsServerPort())
281  {
282    size_t erased = server_port_map.erase(deleted_port.GetConnectionHandle());
283    if (!erased)
284    {
285      FINROC_LOG_PRINT(ERROR, "Deleted server port was not im map (This is a programming error)");
286    }
287  }
288}
289
290bool tRemoteRuntime::ProcessMessage(tOpCode opcode, rrlib::serialization::tMemoryBuffer& buffer, tConnection& connection)
291{
292  FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Processing message ", make_builder::GetEnumString(opcode));
293  rrlib::serialization::tInputStream stream(buffer, shared_connection_info->input_stream_prototype);
294
295  if (opcode == tOpCode::PORT_VALUE_CHANGE || opcode == tOpCode::SMALL_PORT_VALUE_CHANGE || opcode == tOpCode::SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP)
296  {
297    tPortValueChange message;
298    message.Deserialize(stream, false);
299    uint8_t flags = message.Get<1>();
300    message_flags::tDataEncoding encoding = static_cast<message_flags::tDataEncoding>(flags & 0x3);
301    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), flags & message_flags::cTO_SERVER);
302    if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType()))
303    {
304      data_ports::tGenericPort generic_port = data_ports::tGenericPort::Wrap(*port.first);
305
306      bool another_value = false;
307      do
308      {
309        rrlib::time::tTimestamp timestamp = rrlib::time::cNO_TIME;
310        data_ports::tChangeStatus change_type;
311        stream >> change_type;
312        if (opcode != tOpCode::SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP)
313        {
314          stream >> timestamp;
315        }
316        data_ports::tPortDataPointer<rrlib::rtti::tGenericObject> buffer = generic_port.GetUnusedBuffer();
317        buffer.SetTimestamp(timestamp);
318        if (encoding == message_flags::tDataEncoding::cBINARY_COMPRESSED_ENCODING)
319        {
320          char compression_format_buffer[100];
321          if (stream.ReadString(compression_format_buffer, true) >= sizeof(compression_format_buffer))
322          {
323            FINROC_LOG_PRINT(WARNING, "Compression format string exceeds max length");
324            return false;
325          }
326          if (compression_format_buffer[0])
327          {
328            size_t data_size = stream.ReadInt();
329            size_t data_end_position = stream.GetAbsoluteReadPosition() + data_size;
330            FINROC_LOG_PRINT(WARNING, "Decompressing data from network failed: finroc_plugins_data_compression is superseded by rrlib_rtti_conversion");
331            stream.Seek(data_end_position);
332          }
333          else
334          {
335            buffer->Deserialize(stream);
336            generic_port.BrowserPublish(buffer, false, change_type);
337          }
338        }
339        else
340        {
341          buffer->Deserialize(stream, static_cast<rrlib::serialization::tDataEncoding>(encoding));
342          generic_port.BrowserPublish(buffer, false, change_type);
343        }
344        another_value = stream.ReadBoolean();
345      }
346      while (another_value);
347
348      message.FinishDeserialize(stream);
349      connection.received_data_after_last_connect = true;
350    }
351  }
352  else if (opcode == tOpCode::RPC_CALL)
353  {
354    tRPCCall message;
355    message.Deserialize(stream, false);
356    rpc_ports::tCallType call_type = message.Get<1>();
357
358    const runtime_info::tRemoteType& remote_rpc_interface_type = stream.ReadRegisterEntry<runtime_info::tRemoteType>();
359    uint8_t function_index;
360    stream >> function_index;
361    rrlib::rtti::tType rpc_interface_type = remote_rpc_interface_type.GetLocalDataType();
362    if (!rpc_interface_type)
363    {
364      FINROC_LOG_PRINT(ERROR, "Remote type ", remote_rpc_interface_type.GetName(), " is not known here. Ignoring call.");
365      return false;
366    }
367
368    const rpc_ports::internal::tRPCInterfaceTypeInfo* type_info = rpc_ports::internal::tRPCInterfaceTypeInfo::Get(rpc_interface_type);
369    if ((!type_info) || (!rpc_ports::IsRPCType(rpc_interface_type)))
370    {
371      FINROC_LOG_PRINT(ERROR, "Type ", rpc_interface_type.GetName(), " is no RPC type. Ignoring call.");
372      return false;
373    }
374    FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Received ", make_builder::GetEnumString(call_type));
375
376    if (call_type == rpc_ports::tCallType::RPC_MESSAGE || call_type == rpc_ports::tCallType::RPC_REQUEST)
377    {
378      core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(message.Get<0>());
379      if (port && rpc_interface_type == port->GetDataType())
380      {
381        tRPCDeserializationScope deserialization_scope(message.Get<0>(), rpc_call_buffer_pools);
382        if (call_type == rpc_ports::tCallType::RPC_MESSAGE)
383        {
384          type_info->DeserializeMessage(stream, static_cast<rpc_ports::internal::tRPCPort&>(*port), function_index);
385        }
386        else
387        {
388          type_info->DeserializeRequest(stream, static_cast<rpc_ports::internal::tRPCPort&>(*port), function_index, *this);
389        }
390      }
391    }
392    else // type is RPC response
393    {
394      tCallId call_id;
395      stream >> call_id;
396
397      tCallPointer call_awaiting_this_response;
398      for (auto it = calls_awaiting_response.begin(); it != calls_awaiting_response.end(); ++it)
399      {
400        if (it->second->GetCallId() == call_id)
401        {
402          call_awaiting_this_response = std::move(it->second);
403          calls_awaiting_response.erase(it);
404          break;
405        }
406      }
407      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Call awaiting: ", call_awaiting_this_response.get());
408      if (call_awaiting_this_response)
409      {
410        core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(call_awaiting_this_response->GetLocalPortHandle());
411        if (port)
412        {
413          tRPCDeserializationScope deserialization_scope(call_awaiting_this_response->GetLocalPortHandle(), rpc_call_buffer_pools);
414          type_info->DeserializeResponse(stream, function_index, *this, call_awaiting_this_response.get());
415          message.FinishDeserialize(stream);
416          return false;
417        }
418      }
419      call_awaiting_this_response.reset();
420      type_info->DeserializeResponse(stream, function_index, *this, call_awaiting_this_response.get());
421    }
422    message.FinishDeserialize(stream);
423  }
424  else if (opcode == tOpCode::PULLCALL)
425  {
426    tPullCall message;
427    message.Deserialize(stream);
428    uint8_t flags = message.Get<2>();
429    message_flags::tDataEncoding encoding = static_cast<message_flags::tDataEncoding>(flags & 0x3);
430    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), flags & message_flags::cTO_SERVER);
431    rrlib::serialization::tOutputStream& write_stream = GetConnection(flags & message_flags::cHIGH_PRIORITY)->CurrentWriteStream();
432    if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType()))
433    {
434      tPullCallReturn::Serialize(false, true, write_stream, message.Get<1>(), false);
435      data_ports::tGenericPort data_port = data_ports::tGenericPort::Wrap(*port.first);
436      data_ports::tPortDataPointer<const rrlib::rtti::tGenericObject> pulled_buffer = data_port.GetPointer(data_ports::tStrategy::PULL_IGNORING_HANDLER_ON_THIS_PORT);
437      write_stream << pulled_buffer->GetType() << pulled_buffer.GetTimestamp();
438      pulled_buffer->Serialize(write_stream, static_cast<rrlib::serialization::tDataEncoding>(encoding));
439      tPullCallReturn::FinishMessage(write_stream);
440    }
441    else
442    {
443      tPullCallReturn::Serialize(true, true, write_stream, message.Get<1>(), true);
444    }
445  }
446  else if (opcode == tOpCode::PULLCALL_RETURN)
447  {
448    tPullCallReturn message;
449    message.Deserialize(stream, false);
450
451    for (auto it = pull_calls_awaiting_response.begin(); it != pull_calls_awaiting_response.end(); ++it)
452    {
453      if (it->call_id == message.Get<0>())
454      {
455        bool failed = message.Get<1>();
456        core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(it->local_port_handle);
457        if ((!failed) && port && port->IsReady() && data_ports::IsDataFlowType(port->GetDataType()))
458        {
459          rrlib::rtti::tType data_type;
460          rrlib::time::tTimestamp timestamp;
461          stream >> data_type >> timestamp;
462
463          data_ports::tGenericPort data_port = data_ports::tGenericPort::Wrap(*port);
464          data_ports::tPortDataPointer<rrlib::rtti::tGenericObject> pulled_buffer = data_port.GetUnusedBuffer();
465          if (pulled_buffer->GetType() != data_type)
466          {
467            FINROC_LOG_PRINT(WARNING, "Port data pulled via ", port, " has invalid type.");
468            it->promise->SetException(rpc_ports::tFutureStatus::INVALID_DATA_RECEIVED);
469          }
470          else
471          {
472            pulled_buffer.SetTimestamp(timestamp);
473            pulled_buffer->Deserialize(stream);
474            message.FinishDeserialize(stream);
475            it->promise->SetValue(std::move(pulled_buffer));
476          }
477        }
478        else
479        {
480          it->promise->SetException(rpc_ports::tFutureStatus::NO_CONNECTION);
481        }
482        pull_calls_awaiting_response.erase(it); // remove pull call from list
483        break;
484      }
485    }
486  }
487  else if (opcode == tOpCode::UPDATE_CONNECTION)
488  {
489    tUpdateConnectionMessage message;
490    message.Deserialize(stream);
491
492    // Get server port
493    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), true);
494    if (port.first && port.first->IsReady() && data_ports::IsDataFlowType(port.first->GetDataType()))
495    {
496      tDynamicConnectionData data;
497      data.minimal_update_interval = message.Get<1>();
498      data.high_priority = message.Get<2>();
499      data.strategy = message.Get<3>();
500      port.second->SetServerSideDynamicConnectionData(data);
501      bool push_strategy = data.strategy > 0;
502      data_ports::common::tAbstractDataPort& data_port = static_cast<data_ports::common::tAbstractDataPort&>(*port.first);
503      if (data_port.PushStrategy() != push_strategy)
504      {
505        // flags need to be changed
506        rrlib::thread::tLock lock(GetStructureMutex(), false);
507        if (lock.TryLock())
508        {
509          if (data_port.PushStrategy() != push_strategy)
510          {
511            data_port.SetPushStrategy(push_strategy);
512          }
513        }
514        else
515        {
516          return true; // We could not obtain lock - try again later
517        }
518      }
519    }
520    else
521    {
522      FINROC_LOG_PRINT(WARNING, "Cannot find connection to update (requested handle: ", message.Get<0>(), ")");
523    }
524  }
525
526  else if (opcode == tOpCode::CONNECT_PORTS)
527  {
528    tConnectPortsMessage message;
529    message.Deserialize(stream, false);
530
531    try
532    {
533      // Get or create server port
534      auto it = server_port_map.find(message.Get<0>());
535      if (it != server_port_map.end())
536      {
537        throw std::runtime_error("Connection handle already occupied. Ignoring CONNECT_PORTS message.");
538      }
539      else
540      {
541        rrlib::thread::tLock lock(GetStructureMutex(), false);
542        if (lock.TryLock())
543        {
544          // Read subscription data
545          tStaticNetworkConnectorParameters static_subscription_parameters;
546          tDynamicConnectionData dynamic_connection_data;
547          stream >> static_subscription_parameters >> dynamic_connection_data;
548          message.FinishDeserialize(stream);
549
550          // Create server port
551          core::tAbstractPort* port = core::tRuntimeEnvironment::GetInstance().GetPort(static_subscription_parameters.server_port_id);
552          if ((!port) || (!port->IsReady()))
553          {
554            throw std::runtime_error("Port for subscription not available");
555          }
556
557          tFlags flags = tFlag::NETWORK_ELEMENT | tFlag::VOLATILE;
558          if (port->IsOutputPort())
559          {
560            flags |= tFlag::ACCEPTS_DATA; // create input port
561          }
562          else
563          {
564            flags |= tFlag::OUTPUT_PORT | tFlag::EMITS_DATA; // create output io port
565          }
566          if (GetDesiredStructureInfo() != runtime_info::tStructureExchange::SHARED_PORTS)
567          {
568            flags |= tFlag::TOOL_PORT;
569          }
570          if (dynamic_connection_data.strategy > 0)
571          {
572            flags |= tFlag::PUSH_STRATEGY;
573          }
574          if (static_subscription_parameters.reverse_push)
575          {
576            flags |= tFlag::PUSH_STRATEGY_REVERSE;
577          }
578
579          core::tAbstractPort* port_to_connect_to = port;
580          if (!static_subscription_parameters.server_side_conversion.NoConversion())
581          {
582            port_to_connect_to = nullptr;
583
584            // Check whether port already exists
585            if (port->IsOutputPort())
586            {
587              for (auto it = port->OutgoingConnectionsBegin(); it != port->OutgoingConnectionsEnd(); ++it)
588              {
589                tServerSideConversionAnnotation* info = it->Destination().GetAnnotation<tServerSideConversionAnnotation>();
590                if (info && info->conversion_info == static_subscription_parameters.server_side_conversion)
591                {
592                  port_to_connect_to = &it->Destination();
593                }
594              }
595            }
596            else
597            {
598              for (auto it = port->IncomingConnectionsBegin(); it != port->IncomingConnectionsEnd(); ++it)
599              {
600                tServerSideConversionAnnotation* info = it->Source().GetAnnotation<tServerSideConversionAnnotation>();
601                if (info && info->conversion_info == static_subscription_parameters.server_side_conversion)
602                {
603                  port_to_connect_to = &it->Source();
604                }
605              }
606            }
607
608            if (!port_to_connect_to)
609            {
610              // Resolve conversion
611              rrlib::rtti::tType destination_type = rrlib::rtti::tType::FindType(static_subscription_parameters.server_side_conversion.destination_type);
612              if (!destination_type)
613              {
614                throw std::runtime_error("Server-side conversion to unknown type" + static_subscription_parameters.server_side_conversion.destination_type);
615              }
616              size_t size = static_subscription_parameters.server_side_conversion.operation_1.length() == 0 ? 0 : (static_subscription_parameters.server_side_conversion.operation_2.length() == 0 ? 1 : 2);
617              rrlib::rtti::tType intermediate_type;
618              if (size == 2 || static_subscription_parameters.server_side_conversion.intermediate_type.length())
619              {
620                intermediate_type = rrlib::rtti::tType::FindType(static_subscription_parameters.server_side_conversion.intermediate_type);
621                if (!intermediate_type)
622                {
623                  throw std::runtime_error("Server-side conversion with unknown type " + static_subscription_parameters.server_side_conversion.intermediate_type);
624                }
625              }
626
627              const rrlib::rtti::conversion::tRegisteredConversionOperation* operation1 = nullptr;
628              const rrlib::rtti::conversion::tRegisteredConversionOperation* operation2 = nullptr;
629              if (size >= 1)
630              {
631                operation1 = &rrlib::rtti::conversion::tRegisteredConversionOperation::Find(static_subscription_parameters.server_side_conversion.operation_1, port->GetDataType(), size == 1 ? destination_type : intermediate_type);
632                if (size >= 2)
633                {
634                  operation2 = &rrlib::rtti::conversion::tRegisteredConversionOperation::Find(static_subscription_parameters.server_side_conversion.operation_2, intermediate_type, destination_type);
635                }
636              }
637
638              rrlib::rtti::conversion::tConversionOperationSequence conversion;
639              if (size == 1)
640              {
641                conversion = rrlib::rtti::conversion::tConversionOperationSequence(*operation1, intermediate_type);
642              }
643              else if (size == 2)
644              {
645                conversion = rrlib::rtti::conversion::tConversionOperationSequence(*operation1, *operation2, intermediate_type);
646              }
647
648              // Create and connect new conversion port
649              data_ports::tGenericPort created_port(rrlib::uri::tURI(port->GetPath()).ToString(), &GetServerPortsElement(), destination_type, tFlag::NETWORK_ELEMENT | tFlag::VOLATILE | tFlag::EMITS_DATA | tFlag::ACCEPTS_DATA | (port->IsOutputPort() ? tFlag::OUTPUT_PORT : tFlag::PORT));
650              if (created_port.ConnectTo(port, core::tConnectOptions(conversion, core::tConnectionFlag::NON_PRIMARY_CONNECTOR)))
651              {
652                created_port.GetWrapped()->EmplaceAnnotation<tServerSideConversionAnnotation>(static_subscription_parameters.server_side_conversion);
653                created_port.Init();
654                port_to_connect_to = created_port.GetWrapped();
655              }
656              else
657              {
658                created_port.ManagedDelete();
659                throw std::runtime_error("Conversion could not be applied");
660              }
661            }
662          }
663
664          data_ports::tGenericPort created_port(rrlib::uri::tURI(port->GetPath()).ToString(), &GetServerPortsElement(), &GetServerPortsElement(), port_to_connect_to->GetDataType(), flags);
665          tNetworkPortInfo* network_port_info = new tNetworkPortInfo(*this, message.Get<0>(), message.Get<0>(), dynamic_connection_data.strategy, *created_port.GetWrapped(), port->GetHandle());
666          network_port_info->SetDesiredEncoding(static_subscription_parameters.server_side_conversion.encoding);
667          network_port_info->SetServerSideDynamicConnectionData(dynamic_connection_data);
668          created_port.AddPortListenerForPointer(*network_port_info);
669          created_port.SetPullRequestHandler(this);
670          created_port.Init();
671          created_port.ConnectTo(*port_to_connect_to, core::tConnectionFlag::NON_PRIMARY_CONNECTOR);
672          server_port_map.emplace(message.Get<0>(), network_port_info);
673          FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Created server port ", created_port.GetWrapped());
674        }
675        else
676        {
677          return true; // We could not obtain lock - try again later
678        }
679      }
680    }
681    catch (const std::exception& e)
682    {
683      FINROC_LOG_PRINT(WARNING, "Error connecting ports (notifying client also): ", e.what());
684      auto& stream = GetPrimaryConnection()->CurrentWriteStream();
685      tConnectPortsErrorMessage::Serialize(false, true, stream, message.Get<0>());
686      stream << e.what();
687      tConnectPortsErrorMessage::FinishMessage(stream);
688    }
689  }
690  else if (opcode == tOpCode::DISCONNECT_PORTS)
691  {
692    tDisconnectPortsMessage message;
693    message.Deserialize(stream);
694    auto it = server_port_map.find(message.Get<0>());
695    if (it != server_port_map.end())
696    {
697      rrlib::thread::tLock lock(GetStructureMutex(), false);
698      if (lock.TryLock())
699      {
700        // delete conversion ports
701        core::tAbstractPort& network_port = *it->second->GetAnnotated<core::tAbstractPort>();
702        if (network_port.IsOutputPort())
703        {
704          for (auto connector = network_port.OutgoingConnectionsBegin(); connector != network_port.OutgoingConnectionsEnd(); ++connector)
705          {
706            tServerSideConversionAnnotation* info = connector->Destination().GetAnnotation<tServerSideConversionAnnotation>();
707            if (info && connector->Destination().CountIncomingConnections() == 1)
708            {
709              connector->Destination().ManagedDelete();
710            }
711          }
712        }
713        else
714        {
715          for (auto connector = network_port.IncomingConnectionsBegin(); connector != network_port.IncomingConnectionsEnd(); ++connector)
716          {
717            tServerSideConversionAnnotation* info = connector->Source().GetAnnotation<tServerSideConversionAnnotation>();
718            if (info && connector->Source().CountOutgoingConnections() == 1)
719            {
720              connector->Source().ManagedDelete();
721            }
722          }
723        }
724
725        // Delete port (this will also
726        network_port.ManagedDelete();
727      }
728      else
729      {
730        return true; // We could not obtain lock - try again later
731      }
732    }
733    else
734    {
735      FINROC_LOG_PRINT(DEBUG_WARNING, "Port for disconnecting not available (", message.Get<0>(), ")");
736      return false;
737    }
738  }
739  else if (opcode == tOpCode::TYPE_UPDATE)
740  {
741    tTypeUpdateMessage message;
742    message.Deserialize(stream, false);
743    rrlib::rtti::tType type;
744    stream >> type;
745    stream.ReadShort(); // Discard remote network update time default for data type (legacy)
746    message.FinishDeserialize(stream);
747  }
748  else if (opcode == tOpCode::STRUCTURE_CREATED)
749  {
750    rrlib::thread::tLock lock(GetStructureMutex(), false);
751    if (lock.TryLock())
752    {
753      tStructureCreatedMessage message;
754      message.Deserialize(stream, false);
755      network_transport::runtime_info::tRemoteFrameworkElementInfo framework_element_info;
756      framework_element_info.id.handle = message.Get<0>();
757      stream >> framework_element_info;
758      message.FinishDeserialize(stream);
759      AddRemotePort(framework_element_info);
760    }
761    else
762    {
763      return true; // We could not obtain lock - try again later
764    }
765  }
766  else if (opcode == tOpCode::STRUCTURE_CHANGED)
767  {
768    rrlib::thread::tLock lock(GetStructureMutex(), false);
769    if (lock.TryLock())
770    {
771      tStructureChangedMessage message;
772      message.Deserialize(stream, false);
773      runtime_info::tRemoteFrameworkElementInfo::tDynamicInfo dynamic_info;
774      stream >> dynamic_info;
775      message.FinishDeserialize(stream);
776
777      auto port_to_change = remote_port_map.find(message.Get<0>());
778      if (port_to_change != remote_port_map.end())
779      {
780        //port_to_change
781        port_to_change->second.dynamic_info.strategy = dynamic_info.strategy;
782        for (tNetworkPortInfoClient * client_port : port_to_change->second.client_ports)
783        {
784          data_ports::common::tAbstractDataPort& data_port = static_cast<data_ports::common::tAbstractDataPort&>(*client_port->GetPort());
785          data_port.SetPushStrategy(dynamic_info.strategy > 0);
786          //data_port.SetReversePushStrategy(dynamic_info.flags.Get(tFlag::PUSH_STRATEGY_REVERSE));
787          //data_port.SetMinNetUpdateIntervalRaw(dynamic_info.min_net_update_time);
788          //tNetworkPortInfo* network_port_info = data_port.GetAnnotation<tNetworkPortInfo>();
789          client_port->NetworkPortInfo().current_dynamic_connection_data.strategy = dynamic_info.strategy;
790          client_port->NetworkPortInfo().ChangeStrategy(dynamic_info.strategy);
791        }
792      }
793      else
794      {
795        FINROC_LOG_PRINT(WARNING, "There is no port to change with handle ", message.Get<0>());
796      }
797    }
798    else
799    {
800      return true; // We could not obtain lock - try again later
801    }
802  }
803  else if (opcode == tOpCode::STRUCTURE_DELETED)
804  {
805    rrlib::thread::tLock lock(GetStructureMutex(), false);
806    if (lock.TryLock())
807    {
808      tStructureDeletedMessage message;
809      message.Deserialize(stream);
810
811      auto port_to_delete = remote_port_map.find(message.Get<0>());
812      if (port_to_delete != remote_port_map.end())
813      {
814        for (tNetworkPortInfoClient * client_port : port_to_delete->second.client_ports)
815        {
816          for (tNetworkConnector * connector : client_port->used_by_connectors)
817          {
818            connector->UpdateStatus(core::tUriConnector::tStatus::DISCONNECTED);
819            connector->temporary_connector_port.reset();
820            connector->temporary_conversion_port.reset();
821          }
822        }
823        remote_port_map.erase(message.Get<0>());
824      }
825      else
826      {
827        FINROC_LOG_PRINT(WARNING, "There is no port to delete with handle ", message.Get<0>());
828      }
829    }
830    else
831    {
832      return true; // We could not obtain lock - try again later
833    }
834  }
835  else if (opcode == tOpCode::CONNECT_PORTS_ERROR)
836  {
837    tConnectPortsErrorMessage message;
838    message.Deserialize(stream, false);
839    std::string error = stream.ReadString();
840    message.FinishDeserialize(stream);
841
842    std::pair<core::tAbstractPort*, tNetworkPortInfo*> port = GetNetworkConnectorPort(server_port_map, message.Get<0>(), false);
843    if (port.first && port.first->IsReady() && port.second->GetClientInfo())
844    {
845      FINROC_LOG_PRINT(WARNING, "Could not connect to remote port '", port.second->GetClientInfo()->GetPort()->GetName(), "'. Reason: ", error);
846      port.second->GetClientInfo()->connected = false;
847      for (tNetworkConnector * connector : port.second->GetClientInfo()->used_by_connectors)
848      {
849        connector->UpdateStatus(core::tUriConnector::tStatus::ERROR);
850        connector->temporary_connector_port.reset();
851        connector->temporary_conversion_port.reset();
852      }
853    }
854    else
855    {
856      FINROC_LOG_PRINT(WARNING, "Received CONNECT_PORTS_ERROR message for unknown connection handle");
857    }
858  }
859  else if (opcode == tOpCode::SUBSCRIBE_LEGACY || opcode == tOpCode::UNSUBSCRIBE_LEGACY)
860  {
861    FINROC_LOG_PRINT(WARNING, "OpCode ", make_builder::GetEnumString(opcode), " is superseded and no longer served by this peer");
862    throw std::runtime_error("Superseded OpCode");
863  }
864  else
865  {
866    FINROC_LOG_PRINT(WARNING, "OpCode ", make_builder::GetEnumString(opcode), " is not served by this peer");
867    throw std::runtime_error("Invalid OpCode");
868  }
869
870  return false;
871}
872
873void tRemoteRuntime::ProcessStructurePacket(rrlib::serialization::tInputStream& stream)
874{
875  try
876  {
877    const runtime_info::tRemoteType& type = stream.ReadRegisterEntry<runtime_info::tRemoteType>();
878    if (type.GetName() != rrlib::rtti::tDataType<std::string>().GetName())
879    {
880      FINROC_LOG_PRINT(ERROR, "Type encoding does not seem to work");
881      return;
882    }
883
884    network_transport::runtime_info::tRemoteFrameworkElementInfo info;
885    while (stream.Remaining())
886    {
887      stream >> info.id.handle;
888      stream >> info;
889      AddRemotePort(info);
890    }
891  }
892  catch (const std::exception& e)
893  {
894    FINROC_LOG_PRINT(ERROR, "Error processing structure packet:", e);
895  }
896}
897
898void tRemoteRuntime::PublishStructureChange(const tLocalRuntimeInfo::tStructureChangeEventToPublish& structure_change_event)
899{
900  if (shared_connection_info->initial_reading_complete && shared_connection_info->initial_writing_complete)
901  {
902    if (shared_connection_info->initial_structure_writing_complete || structure_change_event.local_handle < shared_connection_info->framework_elements_in_full_structure_exchange_sent_until_handle)
903    {
904      structure_change_event.WriteToStream(GetPrimaryConnection()->CurrentWriteStream());
905    }
906  }
907}
908
909void tRemoteRuntime::SendCall(tCallPointer& call_to_send, const rrlib::time::tTimestamp& time_now)
910{
911  if (!call_to_send->ReadyForSending())
912  {
913    //FINROC_LOG_PRINT(ERROR, "Emplacing ", call_to_send.get());
914    not_ready_calls.emplace_back(std::move(call_to_send));
915  }
916  else
917  {
918    SendCallImplementation(call_to_send, time_now);
919  }
920}
921
922void tRemoteRuntime::SendCallImplementation(tCallPointer& call_to_send, const rrlib::time::tTimestamp& time_now)
923{
924  tConnection& connection = *GetExpressConnection();  // TODO: Should there be a possibility to specify which connection to use RPC calls? problem: message does not
925
926  FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Sending Call ", make_builder::GetEnumString(call_to_send->GetCallType()));
927  bool expects_response = call_to_send->ExpectsResponse();
928  if (expects_response)
929  {
930    call_to_send->SetCallId(next_call_id);
931    next_call_id++;
932  }
933  tRPCCall::Serialize(false, true, connection.CurrentWriteStream(), call_to_send->GetRemotePortHandle(), call_to_send->GetCallType());
934  call_to_send->GetCall()->Serialize(connection.CurrentWriteStream());
935  tRPCCall::FinishMessage(connection.CurrentWriteStream());
936  if (expects_response)
937  {
938    rrlib::time::tDuration timeout = call_to_send->ResponseTimeout();
939    calls_awaiting_response.emplace_back(time_now + timeout, std::move(call_to_send));
940  }
941}
942
943void tRemoteRuntime::SendPendingMessages(const rrlib::time::tTimestamp& time_now)
944{
945  if ((!GetPrimaryConnection()) || GetPrimaryConnection()->IsClosed())
946  {
947    return;
948  }
949
950  for (auto it = not_ready_calls.begin(); it < not_ready_calls.end();)
951  {
952    if ((*it)->ReadyForSending())
953    {
954      tCallPointer call_pointer = std::move(*it);
955      SendCallImplementation(call_pointer, time_now);
956      it = not_ready_calls.erase(it);
957    }
958    else
959    {
960      ++it;
961    }
962  }
963  for (auto it = calls_awaiting_response.begin(); it < calls_awaiting_response.end();)
964  {
965    if (time_now > it->first) // Did call time out?
966    {
967      it = calls_awaiting_response.erase(it);
968    }
969    else
970    {
971      ++it;
972    }
973  }
974  for (auto it = pull_calls_awaiting_response.begin(); it < pull_calls_awaiting_response.end();)
975  {
976    if (time_now > it->timeout_time) // Did call time out?
977    {
978      it = pull_calls_awaiting_response.erase(it);
979    }
980    else
981    {
982      ++it;
983    }
984  }
985
986  for (auto & connection : connections)
987  {
988    if (connection && (!connection->IsClosed()))
989    {
990      connection->SendPendingMessages(time_now);
991    }
992  }
993}
994
995void tRemoteRuntime::SendPullRequest(tLocalRuntimeInfo::tPullCallInfo& pull_call_info)
996{
997  // We do this here, because this is the TCP thread now (and next_call_id is not thread-safe)
998  pull_call_info.call_id = next_call_id;
999  next_call_id++;
1000  pull_calls_awaiting_response.push_back(pull_call_info);
1001
1002  // Send call
1003  rrlib::serialization::tOutputStream& stream = GetExpressConnection()->CurrentWriteStream();  // pull request is small -> we can always use express connection
1004  bool legacy = stream.GetTargetInfo().revision == 0;
1005  uint8_t message_flags = legacy ? message_flags::cBINARY_ENCODING : pull_call_info.message_flags;
1006  tPullCall::Serialize(true, true, stream, legacy ? pull_call_info.remote_port_handle : pull_call_info.connection_handle, pull_call_info.call_id, message_flags);
1007  // Send immediately? (pull calls are somewhat outdated -> no); ex-code: this->SendPendingMessages(rrlib::time::Now(true));
1008}
1009
1010void tRemoteRuntime::SendResponse(typename tResponseSender::tCallPointer && response_to_send)
1011{
1012  if (response_to_send->ReadyForSending())
1013  {
1014    rrlib::time::tTimestamp time_now = rrlib::time::Now();
1015    SendCallImplementation(response_to_send, time_now);
1016    SendPendingMessages(time_now);
1017  }
1018  else
1019  {
1020    not_ready_calls.emplace_back(std::move(response_to_send));
1021  }
1022}
1023
1024//----------------------------------------------------------------------
1025// End of namespace declaration
1026//----------------------------------------------------------------------
1027}
1028}
1029}
Note: See TracBrowser for help on using the repository browser.