source: finroc_plugins_tcp/internal/tConnection.cpp @ 157:e7bd4484bb70

17.03
Last change on this file since 157:e7bd4484bb70 was 157:e7bd4484bb70, checked in by Max Reichardt <mreichardt@…>, 7 years ago

Applies rework on how different connection partners agree on serialization in input and output streams

File size: 27.1 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/tcp/internal/tConnection.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2013-01-04
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/tcp/internal/tConnection.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include <boost/asio.hpp>
36#include "core/tRuntimeEnvironment.h"
37
38//----------------------------------------------------------------------
39// Internal includes with ""
40//----------------------------------------------------------------------
41#include "plugins/tcp/internal/tPeerImplementation.h"
42
43//----------------------------------------------------------------------
44// Debugging
45//----------------------------------------------------------------------
46#include <cassert>
47
48//----------------------------------------------------------------------
49// Namespace usage
50//----------------------------------------------------------------------
51
52//----------------------------------------------------------------------
53// Namespace declaration
54//----------------------------------------------------------------------
55namespace finroc
56{
57namespace tcp
58{
59namespace internal
60{
61
62//----------------------------------------------------------------------
63// Forward declarations / typedefs / enums
64//----------------------------------------------------------------------
65typedef network_transport::runtime_info::tStructureExchange tStructureExchange;
66
67//----------------------------------------------------------------------
68// Const values
69//----------------------------------------------------------------------
70static const size_t c1ST_INITIAL_MESSAGE_LENGTH = strlen(cGREET_MESSAGE) + 7;
71
72static const size_t cINITIAL_READ_BUFFER_SIZE = 32768;
73static const size_t cINITIAL_WRITE_BUFFER_SIZE = 32768;
74
75static const size_t cMAX_MESSAGE_BATCH_SIZE = 300000000; // more than 30 MB
76
77/** Serialization info for initial message */
78static const rrlib::serialization::tSerializationInfo cINITIAL_SERIALIZATION_INFO(0, rrlib::serialization::tRegisterEntryEncoding::UID, network_transport::runtime_info::cDEBUG_PROTOCOL);
79
80//----------------------------------------------------------------------
81// Implementation
82//----------------------------------------------------------------------
83
84namespace
85{
86
87rrlib::serialization::tMemoryBuffer SerializeSharedPorts(const network_transport::generic_protocol::tLocalRuntimeInfo::tSharedPortInfo& info, rrlib::serialization::tOutputStream& stream_prototype)
88{
89  rrlib::thread::tLock lock(core::tRuntimeEnvironment::GetInstance().GetStructureMutex());
90  rrlib::serialization::tMemoryBuffer buffer(info.size() * 200);
91  rrlib::serialization::tOutputStream stream(buffer, stream_prototype);
92  stream.WriteInt(0); // Placeholder for size
93  stream << rrlib::rtti::tDataType<std::string>(); // write a data type for initialization (only necessary in legacy mode - kept for backward-compatibility)
94  for (auto & entry : info)
95  {
96    stream << entry.second;
97  }
98  stream.WriteInt(0); // size of next packet
99  stream.Close();
100  buffer.GetBuffer().PutInt(0, buffer.GetSize() - 8);
101  return buffer;
102}
103
104}
105
106/*!
107 * Writes things necessary for connection initialization to socket.
108 * Connections are initialized as follows:
109 *
110 * 1st part: [GREET_MESSAGE][2 byte protocol version major][4 byte 2nd part message length]
111 * 2nd part: [my UUID][peer type][peer name][structure exchange][connection flags][your address]
112 */
113class tInitialWriteHandler
114{
115public:
116
117  tInitialWriteHandler(std::shared_ptr<tConnection>& connection) :
118    initial_message_buffers(new std::array<rrlib::serialization::tStackMemoryBuffer<200>, 2>()),
119    initial_message_asio_buffers(),
120    connection(connection)
121  {
122    rrlib::serialization::tOutputStream stream1((*initial_message_buffers)[0]);
123    rrlib::serialization::tOutputStream stream2((*initial_message_buffers)[1], cINITIAL_SERIALIZATION_INFO);
124    stream1 << cGREET_MESSAGE;
125    stream1.WriteShort(network_transport::generic_protocol::cPROTOCOL_VERSION_MAJOR);
126    uint32_t flags = connection->flags | (core::internal::tFrameworkElementRegister::cSTAMP_BIT_WIDTH << 8) | ((network_transport::generic_protocol::cPROTOCOL_VERSION_MINOR & 0xFFFF) << 16) | (connection->peer.par_debug_tcp.Get() ? 0 : (static_cast<uint>(tConnectionFlag::NO_DEBUG)));
127    tConnectionInitMessage::Serialize(true, false, stream2, connection->peer.GetPeerInfo().uuid, connection->peer.GetPeerInfo().peer_type,
128                                      connection->peer.GetPeerInfo().name, tStructureExchange::SHARED_PORTS, flags, connection->socket->remote_endpoint().address());
129
130    stream2.Close();
131    stream1.Close();
132
133    initial_message_asio_buffers[0] = boost::asio::const_buffer((*initial_message_buffers)[0].GetBufferPointer(0), (*initial_message_buffers)[0].GetSize());
134    initial_message_asio_buffers[1] = boost::asio::const_buffer((*initial_message_buffers)[1].GetBufferPointer(0), (*initial_message_buffers)[1].GetSize());
135
136    assert((*initial_message_buffers)[0].GetSize() == c1ST_INITIAL_MESSAGE_LENGTH - 4);
137
138    boost::asio::async_write(*(connection->socket), initial_message_asio_buffers, *this);
139  }
140
141  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
142  {
143    if (error)
144    {
145      connection->Close();
146    }
147    else
148    {
149      connection->SharedConnectionInfo().initial_writing_complete = true;
150      if (connection->SharedConnectionInfo().initial_reading_complete && connection->SharedConnectionInfo().initial_writing_complete)
151      {
152        connection->DoInitialStructureExchange(connection);
153      }
154    }
155  }
156
157private:
158
159  std::shared_ptr<std::array<rrlib::serialization::tStackMemoryBuffer<200>, 2>> initial_message_buffers;
160  std::array<boost::asio::const_buffer, 2> initial_message_asio_buffers;
161  std::shared_ptr<tConnection> connection;
162};
163
164/*!
165 * Reads and interprets initial message from connection partner
166 */
167class tInitialReadHandler
168{
169public:
170
171  tInitialReadHandler(std::shared_ptr<tConnection>& connection) :
172    initial_message_buffer(new std::array<uint8_t, 300>()),
173    connection(connection),
174    first_message_received(false)
175  {
176    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(initial_message_buffer->begin(), c1ST_INITIAL_MESSAGE_LENGTH), *this);
177  }
178
179  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
180  {
181    if (error)
182    {
183      connection->Close();
184      return;
185    }
186    try
187    {
188      rrlib::serialization::tMemoryBuffer read_buffer(initial_message_buffer->begin(), bytes_transferred);
189      rrlib::serialization::tInputStream stream(read_buffer, cINITIAL_SERIALIZATION_INFO);
190      if (!first_message_received)
191      {
192        if (strcmp(stream.ReadString().c_str(), cGREET_MESSAGE) != 0)
193        {
194          connection->Close();
195          return;
196        }
197        if (stream.ReadShort() != network_transport::generic_protocol::cPROTOCOL_VERSION_MAJOR)
198        {
199          connection->Close();
200          return;
201        }
202        int size = stream.ReadInt();
203        if (size > 300)
204        {
205          connection->Close();
206          return;
207        }
208
209        first_message_received = true;
210
211        boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(initial_message_buffer->begin(), size), *this);
212      }
213      else if (!connection->IsClosed())
214      {
215        tConnectionInitMessage message;
216        message.Deserialize(stream);
217
218        if (!network_transport::generic_protocol::tLocalRuntimeInfo::IsServingStructure())
219        {
220          connection->Close(); // Not serving structure yet
221          return;
222        }
223
224        connection->peer.AddAddress(message.Get<5>());
225        connection->SharedConnectionInfo().remote_runtime = connection->peer.GetRemoteRuntime(connection, message.Get<0>(), message.Get<1>(), message.Get<2>(),
226            connection->socket->remote_endpoint().address(), connection->never_forget);
227        connection->flags |= message.Get<4>() & 0xFF;
228
229        // Setup shared connection info stream prototypes
230        bool java_partner = connection->flags & static_cast<uint>(tConnectionFlag::JAVA_PEER);
231        bool primary_connection = connection->flags & static_cast<uint>(tConnectionFlag::PRIMARY_CONNECTION);
232        bool express_only_connection = (connection->flags & static_cast<uint>(tConnectionFlag::EXPRESS_DATA)) && (!primary_connection);
233        uint serialization_revision = std::min<uint>(network_transport::generic_protocol::cPROTOCOL_VERSION_MINOR, static_cast<uint>(message.Get<4>()) >> 16);
234        bool debug_tcp = serialization_revision || (!((!connection->peer.par_debug_tcp.Get()) || (connection->flags & static_cast<uint>(tConnectionFlag::NO_DEBUG))));
235        rrlib::serialization::tSerializationInfo serialization_info(serialization_revision, rrlib::serialization::tRegisterEntryEncoding::UID, static_cast<int>(message.Get<3>()) | (java_partner ? network_transport::runtime_info::cJAVA_CLIENT : 0) | (debug_tcp ? network_transport::runtime_info::cDEBUG_PROTOCOL : 0));
236        rrlib::serialization::tSerializationInfo deserialization_info(serialization_revision, rrlib::serialization::tRegisterEntryEncoding::UID, static_cast<int>(tStructureExchange::SHARED_PORTS) | (debug_tcp ? network_transport::runtime_info::cDEBUG_PROTOCOL : 0));
237        serialization_info.SetRegisterEntryEncoding(static_cast<uint>(network_transport::runtime_info::tRegisterUIDs::TYPE), rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE);
238        deserialization_info.SetRegisterEntryEncoding(static_cast<uint>(network_transport::runtime_info::tRegisterUIDs::TYPE), rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE);
239
240        auto& unused_initialization_buffer = connection->peer.UnusedInitializationBuffer();
241
242        if (java_partner)
243        {
244          serialization_info.SetRegisterEntryEncoding(static_cast<uint>(network_transport::runtime_info::tRegisterUIDs::CONVERSION_OPERATION), rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE);
245          serialization_info.SetRegisterEntryEncoding(static_cast<uint>(network_transport::runtime_info::tRegisterUIDs::STATIC_CAST), rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE);
246          serialization_info.SetRegisterEntryEncoding(static_cast<uint>(network_transport::runtime_info::tRegisterUIDs::SCHEME_HANDLER), rrlib::serialization::tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE);
247        }
248        connection->SharedConnectionInfo().input_stream_prototype.Reset(unused_initialization_buffer, deserialization_info);
249        connection->SharedConnectionInfo().output_stream_prototype.Reset(unused_initialization_buffer, serialization_info);
250
251        connection->InitFrontBuffer();
252
253        if (primary_connection && connection->SharedConnectionInfo().remote_runtime->GetPrimaryConnection() != connection)
254        {
255          connection->SharedConnectionInfo().remote_runtime = nullptr;
256          connection->Close(); // we already have a connection of this type
257          return;
258        }
259        else if (express_only_connection)
260        {
261          if (!connection->SharedConnectionInfo().remote_runtime->AddConnection(connection, true))
262          {
263            connection->SharedConnectionInfo().remote_runtime = nullptr;
264            connection->Close(); // we already have a connection of this type
265            return;
266          }
267        }
268        connection->peer.RunEventLoop();
269
270        connection->SharedConnectionInfo().initial_reading_complete = true;
271        if (connection->SharedConnectionInfo().initial_reading_complete && connection->SharedConnectionInfo().initial_writing_complete)
272        {
273          connection->DoInitialStructureExchange(connection);
274        }
275      }
276    }
277    catch (const std::exception& exception)
278    {
279      FINROC_LOG_PRINT_STATIC(WARNING, "Rejected TCP connection because invalid connection initialization data was received.");
280      connection->Close();
281    }
282  }
283
284private:
285
286  std::shared_ptr<std::array<uint8_t, 300>> initial_message_buffer;
287  std::shared_ptr<tConnection> connection;
288  bool first_message_received;
289};
290
291/*!
292 * Writes a complete batch of message to stream
293 */
294class tMessageBatchWriteHandler
295{
296public:
297  tMessageBatchWriteHandler(std::shared_ptr<tConnection::tBase>& connection_base, const rrlib::serialization::tMemoryBuffer& buffer_to_send, bool& back_buffer_lock_variable) :
298    connection(std::static_pointer_cast<tConnection>(connection_base)),
299    buffer_to_send(&buffer_to_send),
300    back_buffer_lock_variable(&back_buffer_lock_variable)
301  {
302    back_buffer_lock_variable = true;
303    boost::asio::async_write(*(connection->socket), boost::asio::const_buffers_1(buffer_to_send.GetBufferPointer(0), buffer_to_send.GetSize()), *this);
304  }
305
306  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
307  {
308    if (error || bytes_transferred != buffer_to_send->GetSize())
309    {
310      connection->Close();
311    }
312    else
313    {
314      *back_buffer_lock_variable = false;
315    }
316  }
317
318private:
319  std::shared_ptr<tConnection> connection;
320  const rrlib::serialization::tMemoryBuffer* buffer_to_send;
321  bool* back_buffer_lock_variable;
322};
323
324/*!
325 * Reads a complete batch of messages from stream and processes them
326 */
327class tMessageBatchReadHandler
328{
329public:
330
331  tMessageBatchReadHandler(std::shared_ptr<tConnection>& connection) : connection(connection), continue_at(0)
332  {
333    // read size of next structure buffer
334    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
335  }
336
337  void operator()(const boost::system::error_code& error)
338  {
339    if (error)
340    {
341      connection->Close();
342      return;
343    }
344    ProcessMessageBatch();
345  }
346
347  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
348  {
349    if (error)
350    {
351      FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Closing connection: ", error.message());
352      connection->Close();
353      return;
354    }
355    size_t bytes_required = connection->bytes_to_read ? connection->bytes_to_read : 4;
356    if (bytes_transferred != bytes_required)
357    {
358      FINROC_LOG_PRINT(DEBUG_WARNING, "Not enough bytes transferred");
359      connection->Close();
360      return;
361    }
362
363    if (!connection->bytes_to_read)
364    {
365      size_t bytes = connection->GetBytesToRead();
366
367      if (connection->read_buffer.Capacity() < bytes)
368      {
369        if (bytes > cMAX_MESSAGE_BATCH_SIZE)
370        {
371          FINROC_LOG_PRINT(WARNING, "Client wanted to send packet of size ", bytes, "! Closing connection.");
372          connection->Close();
373          return;
374        }
375
376        connection->read_buffer = rrlib::serialization::tFixedBuffer(bytes + 5000);
377      }
378      if (!connection->IsClosed())
379      {
380        boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_buffer.GetPointer(), bytes), *this);
381      }
382    }
383    else
384    {
385      ProcessMessageBatch();
386    }
387  }
388
389  void ProcessMessageBatch()
390  {
391    continue_at = connection->ProcessIncomingMessageBatch(rrlib::serialization::tFixedBuffer(connection->read_buffer.GetPointer(), connection->GetBytesToRead()), continue_at);
392
393    if (continue_at == 0)
394    {
395      // read size of next structure buffer
396      connection->bytes_to_read = 0;
397      boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
398    }
399    else
400    {
401      // Defer processing 2ms
402      connection->defer_read_timer.expires_from_now(boost::posix_time::milliseconds(2));
403      connection->defer_read_timer.async_wait(*this);
404    }
405  }
406
407private:
408
409  std::shared_ptr<tConnection> connection;
410  size_t continue_at;
411};
412
413/*!
414 * Writes structure to stream
415 */
416class tStructureWriteHandler
417{
418public:
419
420  tStructureWriteHandler(std::shared_ptr<tConnection> connection) :
421    connection(connection),
422    buffer(),
423    ready_after_write(false)
424  {
425    if (connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::SHARED_PORTS)
426    {
427      buffer.reset(new rrlib::serialization::tMemoryBuffer(0));
428      *buffer = SerializeSharedPorts(connection->peer.LocalRuntimeInfo()->SharedPortInfo(), connection->SharedConnectionInfo().output_stream_prototype);
429      connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle = std::numeric_limits<core::tFrameworkElement::tHandle>::max(); // we want to receive any updates now
430      ready_after_write = true;
431      boost::asio::async_write(*(connection->socket), boost::asio::const_buffers_1(buffer->GetBufferPointer(0), buffer->GetSize()), *this);
432    }
433    else
434    {
435      SerializeNextElements();
436    }
437  }
438
439  void operator()(const boost::system::error_code& error)
440  {
441    if (error)
442    {
443      connection->Close();
444      return;
445    }
446    SerializeNextElements();
447  }
448
449  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
450  {
451    if (error || bytes_transferred != buffer->GetSize())
452    {
453      connection->Close();
454    }
455    else if (ready_after_write)
456    {
457      connection->SharedConnectionInfo().initial_structure_writing_complete = true;
458      if (connection->SharedConnectionInfo().initial_structure_reading_complete)
459      {
460        connection->ready = true;
461        connection->peer.SetPeerListChanged();
462        tMessageBatchReadHandler handler(connection);
463        return;
464      }
465    }
466    else
467    {
468      SerializeNextElements();
469    }
470  }
471
472  void SerializeNextElements()
473  {
474    typedef core::tFrameworkElement::tHandle tHandle;
475    assert(connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::FINSTRUCT ||
476           connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::COMPLETE_STRUCTURE);
477    rrlib::thread::tLock lock(core::tRuntimeEnvironment::GetInstance().GetStructureMutex(), false);
478    if (lock.TryLock())
479    {
480      connection->peer.ProcessLocalRuntimeStructureChanges();  // to make sure we don't get any shared port events twice
481      core::tFrameworkElement* framework_element_buffer[1000];
482      size_t element_count = 0;
483      if (connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle <= std::numeric_limits<tHandle>::max())
484      {
485        element_count = core::tRuntimeEnvironment::GetInstance().GetAllElements(framework_element_buffer, 1000,
486                        connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle);
487      }
488      if (!buffer)
489      {
490        buffer.reset(new rrlib::serialization::tMemoryBuffer(element_count * 200 + 4));
491      }
492      rrlib::serialization::tOutputStream stream(*buffer, connection->SharedConnectionInfo().output_stream_prototype);
493      stream.WriteInt(0); // placeholder for size
494      for (size_t i = 0; i < element_count; i++)
495      {
496        bool relevant = framework_element_buffer[i]->IsReady() &&
497                        ((connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::FINSTRUCT) ||
498                         (!framework_element_buffer[i]->GetFlag(core::tFrameworkElement::tFlag::NETWORK_ELEMENT)));
499        if (relevant)
500        {
501          network_transport::runtime_info::tLocalFrameworkElementInfo::Serialize(stream, *framework_element_buffer[i], connection->GetRemoteRuntime()->GetDesiredStructureInfo() == tStructureExchange::FINSTRUCT);
502          FINROC_LOG_PRINT(DEBUG_VERBOSE_2, "Serializing ", *framework_element_buffer[i]);
503        }
504        connection->SharedConnectionInfo().framework_elements_in_full_structure_exchange_sent_until_handle = framework_element_buffer[i]->GetHandle() + 1;
505      }
506      stream.Close();
507      size_t payload_size = buffer->GetSize() - 4;
508      buffer->GetBuffer().PutInt(0, payload_size);
509      ready_after_write = (payload_size == 0);
510      boost::asio::async_write(*(connection->socket), boost::asio::const_buffers_1(buffer->GetBufferPointer(0), buffer->GetSize()), *this);
511    }
512    else
513    {
514      connection->defer_write_timer.expires_from_now(boost::posix_time::milliseconds(2));
515      connection->defer_write_timer.async_wait(*this);
516    }
517  }
518
519private:
520
521  std::shared_ptr<tConnection> connection;
522  std::shared_ptr<rrlib::serialization::tMemoryBuffer> buffer;
523  bool ready_after_write;
524};
525
526/*!
527 * Reads and interprets initial message from connection partner
528 */
529class tStructureReadHandler
530{
531public:
532
533  tStructureReadHandler(std::shared_ptr<tConnection> connection) : connection(connection), read_buffer()
534  {
535    // read size of next structure buffer
536    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
537  }
538
539  void operator()(const boost::system::error_code& error)
540  {
541    if (error)
542    {
543      connection->Close();
544      return;
545    }
546    ProcessPacket();
547  }
548
549  void operator()(const boost::system::error_code& error, size_t bytes_transferred)
550  {
551    if (error)
552    {
553      connection->Close();
554      return;
555    }
556    size_t bytes_required = connection->bytes_to_read ? connection->bytes_to_read : 4;
557    if (bytes_transferred != bytes_required)
558    {
559      FINROC_LOG_PRINT(DEBUG_WARNING, "Not enough bytes transferred");
560      connection->Close();
561      return;
562    }
563
564    if (!connection->bytes_to_read)
565    {
566      size_t bytes = connection->GetBytesToRead();
567      if (bytes == 0)
568      {
569        // ready
570        connection->SharedConnectionInfo().initial_structure_reading_complete = true;
571        if (connection->SharedConnectionInfo().initial_structure_writing_complete)
572        {
573          connection->ready = true;
574          connection->peer.SetPeerListChanged();
575          FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Reading structure complete, waiting for message batch");
576          tMessageBatchReadHandler handler(connection);
577        }
578        return;
579      }
580
581      if (bytes > cMAX_MESSAGE_BATCH_SIZE)
582      {
583        FINROC_LOG_PRINT(WARNING, "Client wanted to send packet of size ", bytes, "! Closing connection.");
584        connection->Close();
585        return;
586      }
587
588      if ((!read_buffer) || read_buffer->Capacity() < bytes)
589      {
590        read_buffer.reset(new rrlib::serialization::tFixedBuffer(bytes));
591      }
592      boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(read_buffer->GetPointer(), bytes), *this);
593    }
594    else
595    {
596      ProcessPacket();
597    }
598  }
599
600  void ProcessPacket()
601  {
602    // try to create structure
603    rrlib::serialization::tMemoryBuffer packet_buffer(read_buffer->GetPointer(), connection->bytes_to_read);
604    rrlib::serialization::tInputStream stream(packet_buffer, connection->SharedConnectionInfo().input_stream_prototype);
605    connection->GetRemoteRuntime()->ProcessStructurePacket(stream);
606
607    // read size of next structure buffer
608    connection->bytes_to_read = 0;
609    boost::asio::async_read(*(connection->socket), boost::asio::mutable_buffers_1(connection->read_size_buffer.GetPointer(), 4), *this);
610  }
611
612private:
613
614  std::shared_ptr<tConnection> connection;
615  std::shared_ptr<rrlib::serialization::tFixedBuffer> read_buffer;
616};
617
618
619tConnection::tConnection(tPeerImplementation& peer, std::shared_ptr<boost::asio::ip::tcp::socket>& socket, int flags,
620                         std::shared_ptr<tPeerInfo::tActiveConnect> active_connect_indicator, bool never_forget) :
621  network_transport::generic_protocol::tConnection(*peer.LocalRuntimeInfo(), true),
622  peer(peer),
623  flags(flags),
624  socket(socket),
625  ready(false),
626  read_size_buffer(4),
627  read_buffer(cINITIAL_READ_BUFFER_SIZE),
628  bytes_to_read(0),
629  active_connect_indicator(active_connect_indicator),
630  never_forget(never_forget),
631  defer_read_timer(peer.IOService()),
632  defer_write_timer(peer.IOService())
633{
634}
635
636tConnection::~tConnection()
637{}
638
639void tConnection::CloseImplementation()
640{
641  try
642  {
643    // socket->shutdown(); TODO: Add with newer boost version (?)
644    socket->close();
645  }
646  catch (const std::exception& ex)
647  {
648    FINROC_LOG_PRINT(WARNING, "Closing connection failed: ", ex);
649  }
650  if (ready)
651  {
652    peer.SetPeerListChanged();
653  }
654}
655
656void tConnection::DoInitialStructureExchange(std::shared_ptr<tConnection>& connection)
657{
658  if (connection == connection->GetRemoteRuntime()->GetPrimaryConnection())
659  {
660    assert(this == connection.get());
661    tStructureReadHandler structure_read_handler(connection);
662    if (GetRemoteRuntime()->GetDesiredStructureInfo() != tStructureExchange::NONE)
663    {
664      tStructureWriteHandler structure_write_handler(connection);
665    }
666    else
667    {
668      SharedConnectionInfo().initial_structure_writing_complete = true;
669    }
670  }
671  else
672  {
673    SharedConnectionInfo().initial_structure_reading_complete = true;
674    SharedConnectionInfo().initial_structure_writing_complete = true;
675    ready = true;
676    tMessageBatchReadHandler handler(connection);
677  }
678}
679
680size_t tConnection::GetBytesToRead()
681{
682  rrlib::serialization::tMemoryBuffer buffer(read_size_buffer.GetPointer(), read_size_buffer.Capacity());
683  rrlib::serialization::tInputStream stream(buffer);
684  bytes_to_read = stream.ReadInt();
685  return bytes_to_read;
686}
687
688void tConnection::ProcessPeerInfoMessage(rrlib::serialization::tMemoryBuffer& buffer)
689{
690  network_transport::generic_protocol::tPeerInfoMessage message;
691  rrlib::serialization::tInputStream stream(buffer, SharedConnectionInfo().input_stream_prototype);
692  message.Deserialize(stream, false);
693  while (stream.ReadBoolean())
694  {
695    tPeerInfo peer(tPeerType::UNSPECIFIED);
696    this->peer.DeserializePeerInfo(stream, peer);
697    RRLIB_LOG_PRINT(DEBUG_VERBOSE_1, "Deserialized peer ", peer.ToString());
698    this->peer.ProcessIncomingPeerInfo(peer);
699  }
700  message.FinishDeserialize(stream);
701}
702
703void tConnection::SendMessagePacket(std::shared_ptr<tBase>& self, const rrlib::serialization::tMemoryBuffer& buffer_to_send, bool& back_buffer_lock_variable)
704{
705  tMessageBatchWriteHandler write_handler(self, buffer_to_send, back_buffer_lock_variable);
706}
707
708void tConnection::TryToEstablishConnection(tPeerImplementation& peer, std::shared_ptr<boost::asio::ip::tcp::socket>& socket, int flags,
709    std::shared_ptr<tPeerInfo::tActiveConnect> active_connect_indicator, bool never_forget)
710{
711  std::shared_ptr<tConnection> connection(new tConnection(peer, socket, flags, active_connect_indicator, never_forget));
712
713  tInitialWriteHandler write_handler(connection);
714  tInitialReadHandler read_handler(connection);
715}
716
717
718//----------------------------------------------------------------------
719// End of namespace declaration
720//----------------------------------------------------------------------
721}
722}
723}
Note: See TracBrowser for help on using the repository browser.