source: rrlib_serialization/tOutputStream.cpp @ 161:7c780e113dd5

17.03
Last change on this file since 161:7c780e113dd5 was 161:7c780e113dd5, checked in by Max Reichardt <mreichardt@…>, 7 years ago

Fixes bug in register auto-publishing mechanism (possibly serialized entries from wrong register)

File size: 9.5 KB
Line 
1//
2// You received this file as part of RRLib
3// Robotics Research Library
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    rrlib/serialization/tOutputStream.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2013-05-17
27 *
28 */
29//----------------------------------------------------------------------
30#include "rrlib/serialization/tOutputStream.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35
36//----------------------------------------------------------------------
37// Internal includes with ""
38//----------------------------------------------------------------------
39#include "rrlib/serialization/tInputStream.h"
40#include "rrlib/serialization/PublishedRegisters.h"
41
42//----------------------------------------------------------------------
43// Debugging
44//----------------------------------------------------------------------
45#include <cassert>
46
47//----------------------------------------------------------------------
48// Namespace usage
49//----------------------------------------------------------------------
50
51//----------------------------------------------------------------------
52// Namespace declaration
53//----------------------------------------------------------------------
54namespace rrlib
55{
56namespace serialization
57{
58
59//----------------------------------------------------------------------
60// Forward declarations / typedefs / enums
61//----------------------------------------------------------------------
62
63//----------------------------------------------------------------------
64// Const values
65//----------------------------------------------------------------------
66const double tOutputStream::cBUFFER_COPY_FRACTION = 0.25;
67
68//----------------------------------------------------------------------
69// Implementation
70//----------------------------------------------------------------------
71
72void tOutputStream::Close()
73{
74  if (!closed)
75  {
76    Flush();
77    sink->Close(*this, buffer);
78  }
79  closed = true;
80}
81
82void tOutputStream::CommitData(int add_size_hint)
83{
84  if (GetPosition() > 0)
85  {
86    if (sink->Write(*this, buffer, add_size_hint))
87    {
88      assert((cur_skip_offset_placeholder < 0));
89    }
90    assert(add_size_hint < 0 || buffer.Remaining() >= 8);
91    buffer_copy_fraction = static_cast<size_t>((buffer.Capacity() * cBUFFER_COPY_FRACTION));
92  }
93}
94
95void tOutputStream::Println(const std::string& s)
96{
97  WriteString(s, false);
98  WriteByte('\n');
99  CheckFlush();
100}
101
102void tOutputStream::Reset(tSink& sink, const tSerializationInfo& serialization_target)
103{
104  Close();
105  this->sink = &sink;
106  this->shared_serialization_info.serialization_target = serialization_target;
107  // TODO: a possible optimization would be to not remove/add listeners if set of registers to be published on_change did not change (seems negligible currently)
108  if (!this->shared_serialization_info.published_register_status.unique())
109  {
110    this->shared_serialization_info.published_register_status.reset(serialization_target.HasPublishedRegisters() ? new tPublishedRegisterStatus() : nullptr);
111  }
112  else if (serialization_target.HasPublishedRegisters())
113  {
114    this->shared_serialization_info.published_register_status->Reset();
115  }
116  else
117  {
118    this->shared_serialization_info.published_register_status.reset();
119  }
120  if (serialization_target.HasPublishedRegisters())
121  {
122    tPublishedRegisterStatus& status = *shared_serialization_info.published_register_status;
123    std::function<void()> change_function = std::bind(&tPublishedRegisterStatus::OnRegisterUpdate, &status);
124    for (uint i = 0; i < cMAX_PUBLISHED_REGISTERS; i++)
125    {
126      if (serialization_target.GetRegisterEntryEncoding(i) == tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE)
127      {
128        PublishedRegisters::AddListener(i, change_function, &status);
129        status.OnRegisterUpdate();
130        status.registered_listeners.set(i, true);
131      }
132    }
133  }
134  Reset();
135}
136
137void tOutputStream::Reset(tSink& sink, tOutputStream& shared_serialization_info_from)
138{
139  Close();
140  this->sink = &sink;
141  this->shared_serialization_info = shared_serialization_info_from.shared_serialization_info;
142  Reset();
143}
144
145void tOutputStream::Reset()
146{
147  sink->Reset(*this, buffer);
148  assert((buffer.Remaining() >= 8));
149  closed = false;
150  buffer_copy_fraction = static_cast<size_t>((buffer.Capacity() * cBUFFER_COPY_FRACTION));
151  direct_write_support = sink->DirectWriteSupport();
152}
153
154void tOutputStream::Seek(size_t position)
155{
156  size_t desired_position = buffer.start + position;
157  if (desired_position > buffer.end)
158  {
159    throw std::invalid_argument("Position is out of bounds");
160  }
161  buffer.position = desired_position;
162}
163
164void tOutputStream::SkipTargetHere()
165{
166  assert(cur_skip_offset_placeholder >= 0);
167  if (short_skip_offset)
168  {
169    buffer.buffer->PutByte(cur_skip_offset_placeholder, buffer.position - cur_skip_offset_placeholder - 1);
170  }
171  else
172  {
173    buffer.buffer->PutInt(cur_skip_offset_placeholder, buffer.position - cur_skip_offset_placeholder - 4);
174  }
175  cur_skip_offset_placeholder = -1;
176}
177
178void tOutputStream::Write(const tFixedBuffer& bb, size_t off, size_t len)
179{
180  if ((Remaining() >= len) && (len < GetCopyFraction() || cur_skip_offset_placeholder >= 0))
181  {
182    buffer.buffer->Put(buffer.position, bb, off, len);
183    buffer.position += len;
184  }
185  else
186  {
187    if (direct_write_support && cur_skip_offset_placeholder < 0)
188    {
189      CommitData(-1);
190      sink->DirectWrite(*this, bb, off, len);
191    }
192    else
193    {
194      while (len > 0)
195      {
196        int write = std::min(len, Remaining());
197        buffer.buffer->Put(buffer.position, bb, off, write);
198        buffer.position += write;
199        len -= write;
200        off += write;
201        assert((len >= 0));
202        if (len == 0)
203        {
204          return;
205        }
206        CommitData(len);
207      }
208    }
209  }
210}
211
212void tOutputStream::WriteAllAvailable(tInputStream& input_stream)
213{
214  while (input_stream.MoreDataAvailable())
215  {
216    input_stream.EnsureAvailable(1u);
217    Write(*input_stream.cur_buffer->buffer, input_stream.cur_buffer->position, input_stream.cur_buffer->Remaining());
218    input_stream.cur_buffer->position = input_stream.cur_buffer->end;
219  }
220}
221
222void tOutputStream::WriteSkipOffsetPlaceholder(bool short_skip_offset)
223{
224  assert((cur_skip_offset_placeholder < 0));
225  cur_skip_offset_placeholder = buffer.position;
226  this->short_skip_offset = short_skip_offset;
227
228  if (short_skip_offset)
229  {
230    WriteByte(0x80);
231  }
232  else
233  {
234    WriteInt(0x80000000);
235  }
236}
237
238void tOutputStream::WriteRegisterUpdatesImplementation(uint register_uid, uint entry_handle, size_t handle_size)
239{
240  bool escape_signal_written = false;
241  uint escape_signal = 0xFFFFFFFF;
242
243  // Update on_change registers
244  for (size_t i = 0; i < cMAX_PUBLISHED_REGISTERS; i++)
245  {
246    tPublishedRegisterStatus& status = *shared_serialization_info.published_register_status;
247    bool may_require_update = i == register_uid || shared_serialization_info.serialization_target.GetRegisterEntryEncoding(i) == tRegisterEntryEncoding::PUBLISH_REGISTER_ON_CHANGE;
248    if (may_require_update)
249    {
250      size_t current_size = PublishedRegisters::Size(i);
251      if (current_size > status.elements_written[i])
252      {
253        if (!escape_signal_written)
254        {
255          Write(&escape_signal, handle_size);
256          escape_signal_written = true;
257        }
258
259        if (GetTargetInfo().revision == 0)
260        {
261          if (status.elements_written[i] == 0)
262          {
263            WriteShort(40);
264          }
265
266          // compatibility with legacy parts
267          PublishedRegisters::SerializeEntries(*this, i, status.elements_written[i], current_size);
268          WriteShort(-1);
269        }
270        else
271        {
272          WriteByte(i);
273          WriteInt(current_size - status.elements_written[i]);
274          PublishedRegisters::SerializeEntries(*this, i, status.elements_written[i], current_size);
275        }
276        status.elements_written[i] = current_size;
277      }
278    }
279  }
280  if (escape_signal_written && GetTargetInfo().revision) // non-legacy parts require this terminator
281  {
282    WriteByte(-1);
283  }
284}
285
286void tOutputStream::WriteString(const std::string& s, bool terminate)
287{
288  size_t len = terminate ? (s.size() + 1) : s.size();
289  Write(tFixedBuffer((char*)s.c_str(), len));
290}
291
292void tOutputStream::tPublishedRegisterStatus::Reset()
293{
294  for (int i = 0; i < cMAX_PUBLISHED_REGISTERS; i++)
295  {
296    if (registered_listeners[i])
297    {
298      bool removed __attribute__((unused)) = PublishedRegisters::RemoveListener(i, this);
299      assert(removed);
300    }
301  }
302  registered_listeners.reset();
303  elements_written.fill(0);
304  on_register_change_update_counter = 0;
305  counter_on_last_update = 0;
306}
307
308
309//----------------------------------------------------------------------
310// End of namespace declaration
311//----------------------------------------------------------------------
312}
313}
Note: See TracBrowser for help on using the repository browser.