source: finroc_plugins_tcp/internal/tConnection.cpp @ 156:95ed8ff21d92

17.03
Last change on this file since 156:95ed8ff21d92 was 156:95ed8ff21d92, checked in by Max Reichardt <mreichardt@…>, 7 years ago

Makes connectors to be sent to finstruct on initial structure exchange and sends info on handles (stamp width) to connection partner

File size: 26.7 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/tcp/internal/tConnection.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2013-01-04
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/tcp/internal/tConnection.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include <boost/asio.hpp>
36#include "core/tRuntimeEnvironment.h"
37
38//----------------------------------------------------------------------
39// Internal includes with ""
40//----------------------------------------------------------------------
41#include "plugins/tcp/internal/tPeerImplementation.h"
42
43//----------------------------------------------------------------------
44// Debugging
45//----------------------------------------------------------------------
46#include <cassert>
47
48//----------------------------------------------------------------------
49// Namespace usage
50//----------------------------------------------------------------------
51
52//----------------------------------------------------------------------
53// Namespace declaration
54//----------------------------------------------------------------------
55namespace finroc
56{
57namespace tcp
58{
59namespace internal
60{
61
62//----------------------------------------------------------------------
63// Forward declarations / typedefs / enums
64//----------------------------------------------------------------------
65typedef network_transport::runtime_info::tStructureExchange tStructureExchange;
66
67//----------------------------------------------------------------------
68// Const values
69//----------------------------------------------------------------------
70static const size_t c1ST_INITIAL_MESSAGE_LENGTH = strlen(cGREET_MESSAGE) + 7;
71
72static const size_t cINITIAL_READ_BUFFER_SIZE = 32768;
73static const size_t cINITIAL_WRITE_BUFFER_SIZE = 32768;
74
75static const size_t cMAX_MESSAGE_BATCH_SIZE = 300000000; // more than 30 MB
76
77//----------------------------------------------------------------------
78// Implementation
79//----------------------------------------------------------------------
80
81namespace
82{
83
84rrlib::serialization::tMemoryBuffer SerializeSharedPorts(const network_transport::generic_protocol::tLocalRuntimeInfo::tSharedPortInfo& info, rrlib::serialization::tOutputStream& stream_prototype)
85{
86  rrlib::thread::tLock lock(core::tRuntimeEnvironment::GetInstance().GetStructureMutex());
87  rrlib::serialization::tMemoryBuffer buffer(info.size() * 200);
88  rrlib::serialization::tOutputStream stream(buffer, stream_prototype);
89  stream.WriteInt(0); // Placeholder for size
90  stream << rrlib::rtti::tDataType<std::string>(); // write a data type for initialization (only necessary in legacy mode - kept for backward-compatibility)
91  for (auto & entry : info)
92  {
93    stream << entry.second;
94  }
95  stream.WriteInt(0); // size of next packet
96  stream.Close();
97  buffer.GetBuffer().PutInt(0, buffer.GetSize() - 8);
98  return buffer;
99}
100
101}
102
103/*!
104 * Writes things necessary for connection initialization to socket.
105 * Connections are initialized as follows:
106 *
107 * 1st part: [GREET_MESSAGE][2 byte protocol version major][4 byte 2nd part message length]
108 * 2nd part: [my UUID][peer type][peer name][structure exchange][connection flags][your address]
109 */
110class tInitialWriteHandler
111{
112public:
113
114  tInitialWriteHandler(std::shared_ptr<tConnection>& connection) :
115    initial_message_buffers(new std::array<rrlib::serialization::tStackMemoryBuffer<200>, 2>()),
116    initial_message_asio_buffers(),
117    connection(connection)
118  {
119    rrlib::serialization::tOutputStream stream1((*initial_message_buffers)[0]);
120    rrlib::serialization::tOutputStream stream2((*initial_message_buffers)[1]);
121    stream1 << cGREET_MESSAGE;
122    stream1.WriteShort(network_transport::generic_protocol::cPROTOCOL_VERSION_MAJOR);
123    uint32_t flags = connection->flags | (core::internal::tFrameworkElementRegister::cSTAMP_BIT_WIDTH << 8) | ((network_transport::generic_protocol::cPROTOCOL_VERSION_MINOR & 0xFFFF) << 16) | (connection->peer.par_debug_tcp.Get() ? 0 : (static_cast<uint>(tConnectionFlag::NO_DEBUG)));
124    tConnectionInitMessage::Serialize(true, false, stream2, connection->peer.GetPeerInfo().uuid, connection->peer.GetPeerInfo().peer_type,
125                                      connection->peer.GetPeerInfo().name, tStructureExchange::SHARED_PORTS, flags, connection->socket->remote_endpoint().address());
126
127    stream2.Close();
128    stream1.Close();
129
130    initial_message_asio_buffers[0] = boost::asio::const_buffer((*initial_message_buffers)[0].GetBufferPointer(0), (*initial_message_buffers)[0].GetSize());
131    initial_message_asio_buffers[1] = boost::asio::const_buffer((*initial_message_buffers)[1].GetBufferPointer(0), (*initial_message_buffers)[1].GetSize());
132
133    assert((*initial_message_buffers)[0].GetSize() == c1ST_INITIAL_MESSAGE_LENGTH - 4);
134
135    boost::asio::async_write(*(connection->socket), initial_message_asio_buffers, *this);
136  }
137
138  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
139  {
140    if (error)
141    {
142      connection->Close();
143    }
144    else
145    {
146      connection->SharedConnectionInfo().initial_writing_complete = true;
147      if (connection->SharedConnectionInfo().initial_reading_complete && connection->SharedConnectionInfo().initial_writing_complete)
148      {
149        connection->DoInitialStructureExchange(connection);
150      }
151    }
152  }
153
154private:
155
156  std::shared_ptr<std::array<rrlib::serialization::tStackMemoryBuffer<200>, 2>> initial_message_buffers;
157  std::array<boost::asio::const_buffer, 2> initial_message_asio_buffers;
158  std::shared_ptr<tConnection> connection;
159};
160
161/*!
162 * Reads and interprets initial message from connection partner
163 */
164class tInitialReadHandler
165{
166public:
167
168  tInitialReadHandler(std::shared_ptr<tConnection>& connection) :
169    initial_message_buffer(new std::array<uint8_t, 300>()),
170    connection(connection),
171    first_message_received(false)
172  {
173    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(initial_message_buffer->begin(), c1ST_INITIAL_MESSAGE_LENGTH), *this);
174  }
175
176  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
177  {
178    if (error)
179    {
180      connection->Close();
181      return;
182    }
183    try
184    {
185      rrlib::serialization::tMemoryBuffer read_buffer(initial_message_buffer->begin(), bytes_transferred);
186      rrlib::serialization::tInputStream stream(read_buffer, connection->SharedConnectionInfo().input_stream_prototype);
187      if (!first_message_received)
188      {
189        if (strcmp(stream.ReadString().c_str(), cGREET_MESSAGE) != 0)
190        {
191          connection->Close();
192          return;
193        }
194        if (stream.ReadShort() != network_transport::generic_protocol::cPROTOCOL_VERSION_MAJOR)
195        {
196          connection->Close();
197          return;
198        }
199        int size = stream.ReadInt();
200        if (size > 300)
201        {
202          connection->Close();
203          return;
204        }
205
206        first_message_received = true;
207
208        boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(initial_message_buffer->begin(), size), *this);
209      }
210      else if (!connection->IsClosed())
211      {
212        tConnectionInitMessage message;
213        message.Deserialize(stream);
214
215        if (!network_transport::generic_protocol::tLocalRuntimeInfo::IsServingStructure())
216        {
217          connection->Close(); // Not serving structure yet
218          return;
219        }
220
221        connection->peer.AddAddress(message.Get<5>());
222        connection->SharedConnectionInfo().remote_runtime = connection->peer.GetRemoteRuntime(connection, message.Get<0>(), message.Get<1>(), message.Get<2>(),
223            connection->socket->remote_endpoint().address(), connection->never_forget);
224        connection->flags |= message.Get<4>() & 0xFF;
225
226        // Setup shared connection info stream prototypes
227        bool java_partner = connection->flags & static_cast<uint>(tConnectionFlag::JAVA_PEER);
228        bool primary_connection = connection->flags & static_cast<uint>(tConnectionFlag::PRIMARY_CONNECTION);
229        bool express_only_connection = (connection->flags & static_cast<uint>(tConnectionFlag::EXPRESS_DATA)) && (!primary_connection);
230        rrlib::serialization::tSerializationInfo input_info(std::min<uint>(network_transport::generic_protocol::cPROTOCOL_VERSION_MINOR, static_cast<uint>(message.Get<4>()) >> 16),
231            rrlib::serialization::tRegisterEntryEncoding::UID,
232            static_cast<int>(message.Get<3>()) | (java_partner ? network_transport::runtime_info::cJAVA_CLIENT : 0) | (connection->flags & static_cast<uint>(tConnectionFlag::NO_DEBUG) ? 0 : network_transport::runtime_info::cDEBUG_PROTOCOL));
233        input_info.SetRegisterEntryEncoding(static_cast<uint>(network_transport::runtime_info::tRegisterUIDs::TYPE), rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE);
234        auto& unused_initialization_buffer = connection->peer.UnusedInitializationBuffer();
235        connection->SharedConnectionInfo().input_stream_prototype.Reset(unused_initialization_buffer, input_info);
236
237        if (java_partner)
238        {
239          rrlib::serialization::tSerializationInfo output_info(input_info.revision, rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE, input_info.custom_info);
240          input_info.SetRegisterEntryEncoding(static_cast<uint>(network_transport::runtime_info::tRegisterUIDs::CREATE_ACTION), rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_DEMAND);
241          connection->SharedConnectionInfo().output_stream_prototype.Reset(unused_initialization_buffer, output_info);
242        }
243        else if (input_info.revision == 0)
244        {
245          input_info.custom_info = static_cast<uint>(network_transport::runtime_info::tStructureExchange::SHARED_PORTS) | network_transport::runtime_info::cDEBUG_PROTOCOL;
246          connection->SharedConnectionInfo().output_stream_prototype.Reset(unused_initialization_buffer, input_info);
247        }
248        else
249        {
250          input_info.custom_info = static_cast<uint>(network_transport::runtime_info::tStructureExchange::SHARED_PORTS) | (connection->peer.par_debug_tcp.Get() ? network_transport::runtime_info::cDEBUG_PROTOCOL : 0);
251          connection->SharedConnectionInfo().output_stream_prototype.Reset(unused_initialization_buffer, input_info);
252        }
253        connection->InitFrontBuffer();
254
255        if (primary_connection && connection->SharedConnectionInfo().remote_runtime->GetPrimaryConnection() != connection)
256        {
257          connection->SharedConnectionInfo().remote_runtime = nullptr;
258          connection->Close(); // we already have a connection of this type
259          return;
260        }
261        else if (express_only_connection)
262        {
263          if (!connection->SharedConnectionInfo().remote_runtime->AddConnection(connection, true))
264          {
265            connection->SharedConnectionInfo().remote_runtime = nullptr;
266            connection->Close(); // we already have a connection of this type
267            return;
268          }
269        }
270        connection->peer.RunEventLoop();
271
272        connection->SharedConnectionInfo().initial_reading_complete = true;
273        if (connection->SharedConnectionInfo().initial_reading_complete && connection->SharedConnectionInfo().initial_writing_complete)
274        {
275          connection->DoInitialStructureExchange(connection);
276        }
277      }
278    }
279    catch (const std::exception& exception)
280    {
281      FINROC_LOG_PRINT_STATIC(WARNING, "Rejected TCP connection because invalid connection initialization data was received.");
282      connection->Close();
283    }
284  }
285
286private:
287
288  std::shared_ptr<std::array<uint8_t, 300>> initial_message_buffer;
289  std::shared_ptr<tConnection> connection;
290  bool first_message_received;
291};
292
293/*!
294 * Writes a complete batch of message to stream
295 */
296class tMessageBatchWriteHandler
297{
298public:
299  tMessageBatchWriteHandler(std::shared_ptr<tConnection::tBase>& connection_base, const rrlib::serialization::tMemoryBuffer& buffer_to_send, bool& back_buffer_lock_variable) :
300    connection(std::static_pointer_cast<tConnection>(connection_base)),
301    buffer_to_send(&buffer_to_send),
302    back_buffer_lock_variable(&back_buffer_lock_variable)
303  {
304    back_buffer_lock_variable = true;
305    boost::asio::async_write(*(connection->socket), boost::asio::const_buffers_1(buffer_to_send.GetBufferPointer(0), buffer_to_send.GetSize()), *this);
306  }
307
308  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
309  {
310    if (error || bytes_transferred != buffer_to_send->GetSize())
311    {
312      connection->Close();
313    }
314    else
315    {
316      *back_buffer_lock_variable = false;
317    }
318  }
319
320private:
321  std::shared_ptr<tConnection> connection;
322  const rrlib::serialization::tMemoryBuffer* buffer_to_send;
323  bool* back_buffer_lock_variable;
324};
325
326/*!
327 * Reads a complete batch of messages from stream and processes them
328 */
329class tMessageBatchReadHandler
330{
331public:
332
333  tMessageBatchReadHandler(std::shared_ptr<tConnection>& connection) : connection(connection), continue_at(0)
334  {
335    // read size of next structure buffer
336    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
337  }
338
339  void operator()(const boost::system::error_code& error)
340  {
341    if (error)
342    {
343      connection->Close();
344      return;
345    }
346    ProcessMessageBatch();
347  }
348
349  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
350  {
351    if (error)
352    {
353      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Closing connection: ", error.message());
354      connection->Close();
355      return;
356    }
357    size_t bytes_required = connection->bytes_to_read ? connection->bytes_to_read : 4;
358    if (bytes_transferred != bytes_required)
359    {
360      FINROC_LOG_PRINT(DEBUG_WARNING, "Not enough bytes transferred");
361      connection->Close();
362      return;
363    }
364
365    if (!connection->bytes_to_read)
366    {
367      size_t bytes = connection->GetBytesToRead();
368
369      if (connection->read_buffer.Capacity() < bytes)
370      {
371        if (bytes > cMAX_MESSAGE_BATCH_SIZE)
372        {
373          FINROC_LOG_PRINT(WARNING, "Client wanted to send packet of size ", bytes, "! Closing connection.");
374          connection->Close();
375          return;
376        }
377
378        connection->read_buffer = rrlib::serialization::tFixedBuffer(bytes + 5000);
379      }
380      if (!connection->IsClosed())
381      {
382        boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_buffer.GetPointer(), bytes), *this);
383      }
384    }
385    else
386    {
387      ProcessMessageBatch();
388    }
389  }
390
391  void ProcessMessageBatch()
392  {
393    continue_at = connection->ProcessIncomingMessageBatch(rrlib::serialization::tFixedBuffer(connection->read_buffer.GetPointer(), connection->GetBytesToRead()), continue_at);
394
395    if (continue_at == 0)
396    {
397      // read size of next structure buffer
398      connection->bytes_to_read = 0;
399      boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
400    }
401    else
402    {
403      // Defer processing 2ms
404      connection->defer_read_timer.expires_from_now(boost::posix_time::milliseconds(2));
405      connection->defer_read_timer.async_wait(*this);
406    }
407  }
408
409private:
410
411  std::shared_ptr<tConnection> connection;
412  size_t continue_at;
413};
414
415/*!
416 * Writes structure to stream
417 */
418class tStructureWriteHandler
419{
420public:
421
422  tStructureWriteHandler(std::shared_ptr<tConnection> connection) :
423    connection(connection),
424    buffer(),
425    ready_after_write(false)
426  {
427    if (connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::SHARED_PORTS)
428    {
429      buffer.reset(new rrlib::serialization::tMemoryBuffer(0));
430      *buffer = SerializeSharedPorts(connection->peer.LocalRuntimeInfo()->SharedPortInfo(), connection->SharedConnectionInfo().output_stream_prototype);
431      connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle = std::numeric_limits<core::tFrameworkElement::tHandle>::max(); // we want to receive any updates now
432      ready_after_write = true;
433      boost::asio::async_write(*(connection->socket), boost::asio::const_buffers_1(buffer->GetBufferPointer(0), buffer->GetSize()), *this);
434    }
435    else
436    {
437      SerializeNextElements();
438    }
439  }
440
441  void operator()(const boost::system::error_code& error)
442  {
443    if (error)
444    {
445      connection->Close();
446      return;
447    }
448    SerializeNextElements();
449  }
450
451  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
452  {
453    if (error || bytes_transferred != buffer->GetSize())
454    {
455      connection->Close();
456    }
457    else if (ready_after_write)
458    {
459      connection->SharedConnectionInfo().initial_structure_writing_complete = true;
460      if (connection->SharedConnectionInfo().initial_structure_reading_complete)
461      {
462        connection->ready = true;
463        connection->peer.SetPeerListChanged();
464        tMessageBatchReadHandler handler(connection);
465        return;
466      }
467    }
468    else
469    {
470      SerializeNextElements();
471    }
472  }
473
474  void SerializeNextElements()
475  {
476    typedef core::tFrameworkElement::tHandle tHandle;
477    assert(connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::FINSTRUCT ||
478           connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::COMPLETE_STRUCTURE);
479    rrlib::thread::tLock lock(core::tRuntimeEnvironment::GetInstance().GetStructureMutex(), false);
480    if (lock.TryLock())
481    {
482      connection->peer.ProcessLocalRuntimeStructureChanges();  // to make sure we don't get any shared port events twice
483      core::tFrameworkElement* framework_element_buffer[1000];
484      size_t element_count = 0;
485      if (connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle <= std::numeric_limits<tHandle>::max())
486      {
487        element_count = core::tRuntimeEnvironment::GetInstance().GetAllElements(framework_element_buffer, 1000,
488                        connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle);
489      }
490      if (!buffer)
491      {
492        buffer.reset(new rrlib::serialization::tMemoryBuffer(element_count * 200 + 4));
493      }
494      rrlib::serialization::tOutputStream stream(*buffer, connection->SharedConnectionInfo().output_stream_prototype);
495      stream.WriteInt(0); // placeholder for size
496      for (size_t i = 0; i < element_count; i++)
497      {
498        bool relevant = framework_element_buffer[i]->IsReady() &&
499                        ((connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::FINSTRUCT) ||
500                         (!framework_element_buffer[i]->GetFlag(core::tFrameworkElement::tFlag::NETWORK_ELEMENT)));
501        if (relevant)
502        {
503          stream.WriteInt(framework_element_buffer[i]->GetHandle());
504          network_transport::runtime_info::tLocalFrameworkElementInfo::Serialize(stream, *framework_element_buffer[i], connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::FINSTRUCT);
505          FINROC_LOG_PRINT(DEBUG_VERBOSE_2, "Serializing ", *framework_element_buffer[i]);
506        }
507        connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle = framework_element_buffer[i]->GetHandle() + 1;
508      }
509      stream.Close();
510      size_t payload_size = buffer->GetSize() - 4;
511      buffer->GetBuffer().PutInt(0, payload_size);
512      ready_after_write = (payload_size == 0);
513      boost::asio::async_write(*(connection->socket), boost::asio::const_buffers_1(buffer->GetBufferPointer(0), buffer->GetSize()), *this);
514    }
515    else
516    {
517      connection->defer_write_timer.expires_from_now(boost::posix_time::milliseconds(2));
518      connection->defer_write_timer.async_wait(*this);
519    }
520  }
521
522private:
523
524  std::shared_ptr<tConnection> connection;
525  std::shared_ptr<rrlib::serialization::tMemoryBuffer> buffer;
526  bool ready_after_write;
527};
528
529/*!
530 * Reads and interprets initial message from connection partner
531 */
532class tStructureReadHandler
533{
534public:
535
536  tStructureReadHandler(std::shared_ptr<tConnection> connection) : connection(connection), read_buffer()
537  {
538    // read size of next structure buffer
539    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
540  }
541
542  void operator()(const boost::system::error_code& error)
543  {
544    if (error)
545    {
546      connection->Close();
547      return;
548    }
549    ProcessPacket();
550  }
551
552  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
553  {
554    if (error)
555    {
556      connection->Close();
557      return;
558    }
559    size_t bytes_required = connection->bytes_to_read ? connection->bytes_to_read : 4;
560    if (bytes_transferred != bytes_required)
561    {
562      FINROC_LOG_PRINT(DEBUG_WARNING, "Not enough bytes transferred");
563      connection->Close();
564      return;
565    }
566
567    if (!connection->bytes_to_read)
568    {
569      size_t bytes = connection->GetBytesToRead();
570      if (bytes == 0)
571      {
572        // ready
573        connection->SharedConnectionInfo().initial_structure_reading_complete = true;
574        if (connection->SharedConnectionInfo().initial_structure_writing_complete)
575        {
576          connection->ready = true;
577          connection->peer.SetPeerListChanged();
578          FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Reading structure complete, waiting for message batch");
579          tMessageBatchReadHandler handler(connection);
580        }
581        return;
582      }
583
584      if (bytes > cMAX_MESSAGE_BATCH_SIZE)
585      {
586        FINROC_LOG_PRINT(WARNING, "Client wanted to send packet of size ", bytes, "! Closing connection.");
587        connection->Close();
588        return;
589      }
590
591      if ((!read_buffer) || read_buffer->Capacity() < bytes)
592      {
593        read_buffer.reset(new rrlib::serialization::tFixedBuffer(bytes));
594      }
595      boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(read_buffer->GetPointer(), bytes), *this);
596    }
597    else
598    {
599      ProcessPacket();
600    }
601  }
602
603  void ProcessPacket()
604  {
605    // try to create structure
606    rrlib::serialization::tMemoryBuffer packet_buffer(read_buffer->GetPointer(), connection->bytes_to_read);
607    rrlib::serialization::tInputStream stream(packet_buffer, connection->SharedConnectionInfo().input_stream_prototype);
608    connection->GetRemoteRuntime()->ProcessStructurePacket(stream);
609
610    // read size of next structure buffer
611    connection->bytes_to_read = 0;
612    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
613  }
614
615private:
616
617  std::shared_ptr<tConnection> connection;
618  std::shared_ptr<rrlib::serialization::tFixedBuffer> read_buffer;
619};
620
621
622tConnection::tConnection(tPeerImplementation& peer, std::shared_ptr<boost::asio::ip::tcp::socket>& socket, int flags,
623                         std::shared_ptr<tPeerInfo::tActiveConnect> active_connect_indicator, bool never_forget) :
624  network_transport::generic_protocol::tConnection(*peer.LocalRuntimeInfo(), true),
625  peer(peer),
626  flags(flags),
627  socket(socket),
628  ready(false),
629  read_size_buffer(4),
630  read_buffer(cINITIAL_READ_BUFFER_SIZE),
631  bytes_to_read(0),
632  active_connect_indicator(active_connect_indicator),
633  never_forget(never_forget),
634  defer_read_timer(peer.IOService()),
635  defer_write_timer(peer.IOService())
636{
637}
638
639tConnection::~tConnection()
640{}
641
642void tConnection::CloseImplementation()
643{
644  try
645  {
646    // socket->shutdown(); TODO: Add with newer boost version (?)
647    socket->close();
648  }
649  catch (const std::exception& ex)
650  {
651    FINROC_LOG_PRINT(WARNING, "Closing connection failed: ", ex);
652  }
653  if (ready)
654  {
655    peer.SetPeerListChanged();
656  }
657}
658
659void tConnection::DoInitialStructureExchange(std::shared_ptr<tConnection>& connection)
660{
661  if (connection == connection->GetRemoteRuntime()->GetPrimaryConnection())
662  {
663    assert(this == connection.get());
664    tStructureReadHandler structure_read_handler(connection);
665    if (GetRemoteRuntime()->GetDesiredStructureInfo() != tStructureExchange::NONE)
666    {
667      tStructureWriteHandler structure_write_handler(connection);
668    }
669    else
670    {
671      SharedConnectionInfo().initial_structure_writing_complete = true;
672    }
673  }
674  else
675  {
676    SharedConnectionInfo().initial_structure_reading_complete = true;
677    SharedConnectionInfo().initial_structure_writing_complete = true;
678    ready = true;
679    tMessageBatchReadHandler handler(connection);
680  }
681}
682
683size_t tConnection::GetBytesToRead()
684{
685  rrlib::serialization::tMemoryBuffer buffer(read_size_buffer.GetPointer(), read_size_buffer.Capacity());
686  rrlib::serialization::tInputStream stream(buffer);
687  bytes_to_read = stream.ReadInt();
688  return bytes_to_read;
689}
690
691void tConnection::ProcessPeerInfoMessage(rrlib::serialization::tMemoryBuffer& buffer)
692{
693  network_transport::generic_protocol::tPeerInfoMessage message;
694  rrlib::serialization::tInputStream stream(buffer, SharedConnectionInfo().input_stream_prototype);
695  message.Deserialize(stream, false);
696  while (stream.ReadBoolean())
697  {
698    tPeerInfo peer(tPeerType::UNSPECIFIED);
699    this->peer.DeserializePeerInfo(stream, peer);
700    RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Deserialized peer ", peer.ToString());
701    this->peer.ProcessIncomingPeerInfo(peer);
702  }
703  message.FinishDeserialize(stream);
704}
705
706void tConnection::SendMessagePacket(std::shared_ptr<tBase>& self, const rrlib::serialization::tMemoryBuffer& buffer_to_send, bool& back_buffer_lock_variable)
707{
708  tMessageBatchWriteHandler write_handler(self, buffer_to_send, back_buffer_lock_variable);
709}
710
711void tConnection::TryToEstablishConnection(tPeerImplementation& peer, std::shared_ptr<boost::asio::ip::tcp::socket>& socket, int flags,
712    std::shared_ptr<tPeerInfo::tActiveConnect> active_connect_indicator, bool never_forget)
713{
714  std::shared_ptr<tConnection> connection(new tConnection(peer, socket, flags, active_connect_indicator, never_forget));
715
716  tInitialWriteHandler write_handler(connection);
717  tInitialReadHandler read_handler(connection);
718}
719
720
721//----------------------------------------------------------------------
722// End of namespace declaration
723//----------------------------------------------------------------------
724}
725}
726}
Note: See TracBrowser for help on using the repository browser.