source: finroc_plugins_data_ports/optimized/tCheapCopyPort.cpp @ 115:fd7b0e5ec9ea

17.03
Last change on this file since 115:fd7b0e5ec9ea was 115:fd7b0e5ec9ea, checked in by Max Reichardt <mreichardt@…>, 23 months ago

Makes initial pushing mechanism use/respect type conversion

File size: 18.6 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/data_ports/optimized/tCheapCopyPort.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-10-24
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/data_ports/optimized/tCheapCopyPort.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35
36//----------------------------------------------------------------------
37// Internal includes with ""
38//----------------------------------------------------------------------
39#include "plugins/data_ports/type_traits.h"
40#include "plugins/data_ports/common/tPullOperation.h"
41
42//----------------------------------------------------------------------
43// Debugging
44//----------------------------------------------------------------------
45#include <cassert>
46
47//----------------------------------------------------------------------
48// Namespace usage
49//----------------------------------------------------------------------
50
51//----------------------------------------------------------------------
52// Namespace declaration
53//----------------------------------------------------------------------
54namespace finroc
55{
56namespace data_ports
57{
58namespace optimized
59{
60
61//----------------------------------------------------------------------
62// Forward declarations / typedefs / enums
63//----------------------------------------------------------------------
64
65//----------------------------------------------------------------------
66// Const values
67//----------------------------------------------------------------------
68
69//----------------------------------------------------------------------
70// Implementation
71//----------------------------------------------------------------------
72#ifndef RRLIB_SINGLE_THREADED
73
74namespace internal
75{
76
77/*! Creates default value */
78static rrlib::rtti::tGenericObject* CreateDefaultValue(const common::tAbstractDataPortCreationInfo& creation_info)
79{
80  if (creation_info.DefaultValueSet() || creation_info.flags.Get(core::tFrameworkElement::tFlag::DEFAULT_ON_DISCONNECT))
81  {
82    rrlib::rtti::tGenericObject* result = creation_info.data_type.CreateGenericObject();
83    if (creation_info.DefaultValueSet())
84    {
85      rrlib::serialization::tInputStream input(creation_info.GetDefaultGeneric());
86      result->Deserialize(input);
87    }
88    return result;
89  }
90  return NULL;
91}
92
93} // namespace internal
94
95tCheapCopyPort::tCheapCopyPort(common::tAbstractDataPortCreationInfo creation_info) :
96  common::tAbstractDataPort(creation_info),
97  cheaply_copyable_type_index(RegisterPort(creation_info.data_type)),
98  default_value(internal::CreateDefaultValue(creation_info)),
99  current_value(0),
100  standard_assign(!GetFlag(tFlag::NON_STANDARD_ASSIGN) && (!GetFlag(tFlag::HAS_QUEUE))),
101  input_queue(),
102  pull_request_handler(NULL)
103{
104  if ((!IsDataFlowType(GetDataType())) || (!IsCheaplyCopiedType(GetDataType())))
105  {
106    FINROC_LOG_PRINT(ERROR, "Data type ", GetDataType().GetName(), " is not suitable for cheap copy port implementation.");
107    abort();
108  }
109
110  // Initialize value
111  tCheaplyCopiedBufferManager* initial = tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release();
112  assert(initial->GetObject().GetType() == GetDataType());
113  initial->InitReferenceCounter(1);
114  int pointer_tag = initial->GetPointerTag();
115  current_value.store(tTaggedBufferPointer(initial, pointer_tag));
116
117  // set initial value to default?
118  if (creation_info.DefaultValueSet())
119  {
120    initial->GetObject().DeepCopyFrom(*default_value);
121  }
122  else
123  {
124    std::unique_ptr<rrlib::rtti::tGenericObject> object_with_default_value(GetDataType().CreateGenericObject());
125    initial->GetObject().DeepCopyFrom(*object_with_default_value);
126  }
127
128  // Initialize queue
129  if (GetFlag(tFlag::HAS_QUEUE))
130  {
131    input_queue.reset(new common::tPortQueue<tLockingManagerPointer>(!GetFlag(tFlag::HAS_DEQUEUE_ALL_QUEUE)));
132    if (creation_info.max_queue_size > 0)
133    {
134      input_queue->SetMaxQueueLength(creation_info.max_queue_size);
135    }
136  }
137
138  PropagateStrategy(NULL, NULL);  // initialize strategy
139}
140
141tCheapCopyPort::~tCheapCopyPort()
142{
143  tTaggedBufferPointer cur_pointer = current_value.exchange(0);
144  tPortBufferUnlocker unlocker;
145  unlocker(cur_pointer.GetPointer());
146}
147
148void tCheapCopyPort::ApplyDefaultValue()
149{
150  if (!default_value)
151  {
152    FINROC_LOG_PRINT(ERROR, "No default value has been set. Doing nothing.");
153    return;
154  }
155
156  tUnusedManagerPointer buffer(tGlobalBufferPools::Instance().GetUnusedBuffer(GetCheaplyCopyableTypeIndex()).release());
157  buffer->GetObject().DeepCopyFrom(*default_value);
158  buffer->SetTimestamp(rrlib::time::cNO_TIME);
159  BrowserPublishRaw(buffer, true);
160}
161
162std::string tCheapCopyPort::BrowserPublishRaw(tUnusedManagerPointer& buffer, bool notify_listener_on_this_port, tChangeStatus change_constant)
163{
164  if (buffer->GetThreadLocalOrigin()) // Is current thread the owner?
165  {
166    assert(buffer->GetThreadLocalOrigin() == tThreadLocalBufferPools::Get() && "Thread must be owner of thread-local buffer");
167    common::tPublishOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer> data(buffer);
168    if (notify_listener_on_this_port)
169    {
170      if (change_constant == tChangeStatus::CHANGED_INITIAL)
171      {
172        data.Execute<tChangeStatus::CHANGED_INITIAL, true, true>(*this);
173      }
174      else
175      {
176        data.Execute<tChangeStatus::CHANGED, true, true>(*this);
177      }
178    }
179    else
180    {
181      if (change_constant == tChangeStatus::CHANGED_INITIAL)
182      {
183        data.Execute<tChangeStatus::CHANGED_INITIAL, true, false>(*this);
184      }
185      else
186      {
187        data.Execute<tChangeStatus::CHANGED, true, false>(*this);
188      }
189    }
190  }
191  else
192  {
193    common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(buffer);
194    if (notify_listener_on_this_port)
195    {
196      if (change_constant == tChangeStatus::CHANGED_INITIAL)
197      {
198        data.Execute<tChangeStatus::CHANGED_INITIAL, true, true>(*this);
199      }
200      else
201      {
202        data.Execute<tChangeStatus::CHANGED, true, true>(*this);
203      }
204    }
205    else
206    {
207      if (change_constant == tChangeStatus::CHANGED_INITIAL)
208      {
209        data.Execute<tChangeStatus::CHANGED_INITIAL, true, false>(*this);
210      }
211      else
212      {
213        data.Execute<tChangeStatus::CHANGED, true, false>(*this);
214      }
215    }
216  }
217  return "";
218}
219
220void tCheapCopyPort::CallPullRequestHandler(tPublishingDataGlobalBuffer& publishing_data)
221{
222  tUnusedManagerPointer result = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release());
223  if (pull_request_handler->RawPullRequest(*this, *result))
224  {
225    publishing_data.Init(result);
226  }
227}
228
229void tCheapCopyPort::CallPullRequestHandler(tPublishingDataThreadLocalBuffer& publishing_data)
230{
231  tUnusedManagerPointer result = tUnusedManagerPointer(tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index).release());
232  if (pull_request_handler->RawPullRequest(*this, *result))
233  {
234    publishing_data.Init(result);
235  }
236}
237
238//bool tCheapCopyPort::ContainsDefaultValue()
239//{
240//  tCCPortDataManager* c = GetInInterThreadContainer();
241//  bool result = c->GetObject()->GetType() == default_value->GetObject()->GetType() && c->GetObject()->Equals(*default_value->GetObject());
242//  c->Recycle2();
243//  return result;
244//}
245
246void tCheapCopyPort::CopyCurrentValueToGenericObject(rrlib::rtti::tGenericObject& buffer, rrlib::time::tTimestamp& timestamp, tStrategy strategy)
247{
248  if ((strategy == tStrategy::DEFAULT && PushStrategy()) || strategy == tStrategy::NEVER_PULL)
249  {
250    for (; ;)
251    {
252      tTaggedBufferPointer current = current_value.load();
253      buffer.DeepCopyFrom(current->GetObject());
254      timestamp = current->GetTimestamp();
255      tTaggedBufferPointer::tStorage current_raw = current;
256      if (current_raw == current_value.load())    // still valid??
257      {
258        return;
259      }
260    }
261  }
262  else
263  {
264    tLockingManagerPointer dc = PullValueRaw(strategy == tStrategy::PULL_IGNORING_HANDLER_ON_THIS_PORT);
265    buffer.DeepCopyFrom(dc->GetObject());
266    timestamp = dc->GetTimestamp();
267  }
268}
269
270void tCheapCopyPort::ForwardData(tAbstractDataPort& other)
271{
272  assert(IsDataFlowType(other.GetDataType()) && (IsCheaplyCopiedType(other.GetDataType())));
273
274  if (tThreadLocalBufferPools::Get())
275  {
276    tTaggedBufferPointer current_buffer = current_value.load();
277    if (tThreadLocalBufferPools::Get() == current_buffer->GetThreadLocalOrigin()) // Is current thread the owner thread?
278    {
279      common::tPublishOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer> data(static_cast<tThreadLocalBufferManager*>(current_buffer.GetPointer()), false);
280      data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other));
281      return;
282    }
283
284    // there obviously will not arrive any buffer from current thread in the meantime
285
286    auto unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index);
287    for (; ;)
288    {
289      unused_manager->GetObject().DeepCopyFrom(current_buffer->GetObject());
290      unused_manager->SetTimestamp(current_buffer->GetTimestamp());
291      typename tTaggedBufferPointer::tStorage current_raw = current_buffer;
292      typename tTaggedBufferPointer::tStorage current_raw_2 = current_value.load();
293      if (current_raw == current_raw_2)    // still valid??
294      {
295        common::tPublishOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer> data(static_cast<tThreadLocalBufferManager*>(current_buffer.GetPointer()), true);
296        data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other));
297        return;
298      }
299      current_buffer = current_raw_2;
300    }
301  }
302  else
303  {
304    tUnusedManagerPointer unused_manager = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release());
305    CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL);
306    common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(unused_manager);
307    data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other));
308  }
309}
310
311//const rrlib::rtti::tGenericObject* tCheapCopyPort::GetAutoLockedRaw()
312//{
313//  tThreadLocalCache* tc = tThreadLocalCache::Get();
314//
315//  if (PushStrategy())
316//  {
317//    tCCPortDataManagerTL* mgr = GetLockedUnsafeInContainer();
318//    tc->AddAutoLock(mgr);
319//    return mgr->GetObject();
320//  }
321//  else
322//  {
323//    tCCPortDataManager* mgr = GetInInterThreadContainer();
324//    tc->AddAutoLock(mgr);
325//    return mgr->GetObject();
326//  }
327//}
328
329int tCheapCopyPort::GetMaxQueueLengthImplementation() const
330{
331  return input_queue ? input_queue->GetMaxQueueLength() : -1;
332}
333
334//tCCPortDataManager* tCheapCopyPort::GetInInterThreadContainer(bool dont_pull)
335//{
336//  tCCPortDataManager* ccitc = tThreadLocalCache::Get()->GetUnusedInterThreadBuffer(GetDataType());
337//  rrlib::time::tTimestamp timestamp;
338//  GetRaw(*ccitc->GetObject(), timestamp, dont_pull);
339//  ccitc->SetTimestamp(timestamp);
340//  return ccitc;
341//}
342//
343//tCCPortDataManagerTL* tCheapCopyPort::GetLockedUnsafeInContainer()
344//{
345//  tCCPortDataRef* val = value;
346//  tCCPortDataManagerTL* val_c = val->GetContainer();
347//  if (val_c->GetOwnerThread() == rrlib::thread::tThread::CurrentThreadId())    // if same thread: simply add read lock
348//  {
349//    val_c->AddLock();
350//    return val_c;
351//  }
352//
353//  // not the same thread: create auto-locked new container
354//  tThreadLocalCache* tc = tThreadLocalCache::Get();
355//  tCCPortDataManagerTL* ccitc = tc->GetUnusedBuffer(this->data_type);
356//  ccitc->ref_counter = 1;
357//  for (; ;)
358//  {
359//    ccitc->GetObject()->DeepCopyFrom(val_c->GetObject(), NULL);
360//    ccitc->SetTimestamp(val_c->GetTimestamp());
361//    if (val == value)    // still valid??
362//    {
363//      return ccitc;
364//    }
365//    val = value;
366//    val_c = val->GetContainer();
367//  }
368//}
369//
370//tCCPortDataManager* tCheapCopyPort::GetPullInInterthreadContainerRaw(bool ignore_pull_request_handler_on_this_port)
371//{
372//  tCCPortDataManagerTL* tmp = PullValueRaw(ignore_pull_request_handler_on_this_port);
373//  tCCPortDataManager* ret = tThreadLocalCache::GetFast()->GetUnusedInterThreadBuffer(this->data_type);
374//  ret->GetObject()->DeepCopyFrom(tmp->GetObject(), NULL);
375//  ret->SetTimestamp(tmp->GetTimestamp());
376//  tmp->ReleaseLock();
377//  return ret;
378//}
379//
380
381void tCheapCopyPort::InitialPushTo(core::tConnector& connector)
382{
383  // this is a one-time event => use global buffer
384  tUnusedManagerPointer unused_manager(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release());
385  CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL);
386  if (typeid(connector) == typeid(common::tConversionConnector))
387  {
388    static_cast<common::tConversionConnector&>(connector).Publish(unused_manager->GetObject(), tChangeStatus::CHANGED_INITIAL);
389  }
390  else
391  {
392    common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(unused_manager);
393    tCheapCopyPort& target_port = static_cast<tCheapCopyPort&>(connector.Destination());
394    common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer>::Receive<tChangeStatus::CHANGED_INITIAL>(data, target_port, *this);
395  }
396}
397
398void tCheapCopyPort::LockCurrentValueForPublishing(tPublishingDataGlobalBuffer& publishing_data)
399{
400  while (true)
401  {
402    tTaggedBufferPointer current_buffer = current_value.load();
403    if (current_buffer->GetThreadLocalOrigin())
404    {
405      tUnusedManagerPointer unused_manager = tUnusedManagerPointer(tGlobalBufferPools::Instance().GetUnusedBuffer(cheaply_copyable_type_index).release());
406      CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL);
407      publishing_data.Init(unused_manager);
408      return;
409    }
410    else
411    {
412      if (current_buffer->TryLock(tPublishingDataGlobalBuffer::cADD_LOCKS, current_buffer.GetStamp()))
413      {
414        // successful
415        publishing_data.InitSuccessfullyLocked(current_buffer.GetPointer());
416        return;
417      }
418    }
419  }
420}
421
422void tCheapCopyPort::LockCurrentValueForPublishing(tPublishingDataThreadLocalBuffer& publishing_data)
423{
424  assert(tThreadLocalBufferPools::Get());
425  tTaggedBufferPointer current_buffer = current_value.load();
426  if (tThreadLocalBufferPools::Get() == current_buffer->GetThreadLocalOrigin()) // Is current thread the owner thread?
427  {
428    publishing_data.Init(static_cast<tThreadLocalBufferManager*>(current_buffer.GetPointer()), false);
429    return;
430  }
431
432  // there obviously will not arrive any buffer from current thread in the meantime
433
434  auto unused_manager = tThreadLocalBufferPools::Get()->GetUnusedBuffer(cheaply_copyable_type_index);
435  for (; ;)
436  {
437    unused_manager->GetObject().DeepCopyFrom(current_buffer->GetObject());
438    unused_manager->SetTimestamp(current_buffer->GetTimestamp());
439    typename tTaggedBufferPointer::tStorage current_raw = current_buffer;
440    typename tTaggedBufferPointer::tStorage current_raw_2 = current_value.load();
441    if (current_raw == current_raw_2)    // still valid??
442    {
443      publishing_data.Init(unused_manager.release(), true);
444      return;
445    }
446    current_buffer = current_raw_2;
447  }
448}
449
450bool tCheapCopyPort::NonStandardAssign(tPublishingDataGlobalBuffer& publishing_data, tChangeStatus change_constant)
451{
452  if (GetFlag(tFlag::USES_QUEUE) && change_constant != tChangeStatus::CHANGED_INITIAL)
453  {
454    assert(GetFlag(tFlag::HAS_QUEUE));
455
456    // enqueue
457    publishing_data.AddLock();
458    input_queue->Enqueue(tLockingManagerPointer(publishing_data.published_buffer));
459  }
460  return true;
461}
462
463bool tCheapCopyPort::NonStandardAssign(tPublishingDataThreadLocalBuffer& publishing_data, tChangeStatus change_constant)
464{
465  if (GetFlag(tFlag::USES_QUEUE) && change_constant != tChangeStatus::CHANGED_INITIAL)
466  {
467    assert(GetFlag(tFlag::HAS_QUEUE));
468
469    // Enqueue
470    publishing_data.AddLock();
471    input_queue->Enqueue(tLockingManagerPointer(publishing_data.published_buffer));
472  }
473  return true;
474}
475
476tCheapCopyPort::tLockingManagerPointer tCheapCopyPort::PullValueRaw(bool ignore_pull_request_handler_on_this_port)
477{
478  if (tThreadLocalBufferPools::Get())
479  {
480    common::tPullOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer, tThreadLocalBufferManager> pull_operation;
481    pull_operation.Execute(*this);
482    return tLockingManagerPointer(pull_operation.published_buffer);
483  }
484  else
485  {
486    common::tPullOperation<tCheapCopyPort, tPublishingDataGlobalBuffer, tCheaplyCopiedBufferManager> pull_operation;
487    pull_operation.Execute(*this);
488    return tLockingManagerPointer(pull_operation.published_buffer);
489  }
490}
491
492void tCheapCopyPort::SetDefault(rrlib::rtti::tGenericObject& new_default)
493{
494  if (IsReady())
495  {
496    FINROC_LOG_PRINT(ERROR, "Please set default value _before_ initializing port");
497    abort();
498  }
499  if (new_default.GetType() != this->GetDataType())
500  {
501    FINROC_LOG_PRINT(ERROR, "New default value has wrong type: ", new_default.GetType().GetName());
502    abort();
503  }
504
505  if (!default_value)
506  {
507    default_value.reset(new_default.GetType().CreateGenericObject());
508  }
509  else if (default_value->GetType() != new_default.GetType())
510  {
511    FINROC_LOG_PRINT(ERROR, "Provided default value has wrong type. Ignoring.");
512    return;
513  }
514  default_value->DeepCopyFrom(new_default);
515
516  tTaggedBufferPointer cur_pointer = current_value.load();
517  cur_pointer->GetObject().DeepCopyFrom(*default_value);
518}
519
520//void tCheapCopyPort::SetMaxQueueLengthImpl(int length)
521//{
522//  assert((GetFlag(tFlag::HAS_QUEUE) && queue != NULL));
523//  assert((!IsOutputPort()));
524//  assert((length >= 1));
525//  queue->SetMaxLength(length);
526//}
527
528void tCheapCopyPort::SetPullRequestHandler(tPullRequestHandlerRaw* pull_request_handler_)
529{
530  if (pull_request_handler_ != NULL)
531  {
532    this->pull_request_handler = pull_request_handler_;
533  }
534}
535
536#endif // RRLIB_SINGLE_THREADED
537
538//----------------------------------------------------------------------
539// End of namespace declaration
540//----------------------------------------------------------------------
541}
542}
543}
Note: See TracBrowser for help on using the repository browser.