source: finroc_plugins_data_ports/standard/tStandardPort.cpp @ 114:8b171b809697

17.03
Last change on this file since 114:8b171b809697 was 114:8b171b809697, checked in by Max Reichardt <mreichardt@…>, 2 years ago

Removes obsolete reverse pushing mechanism (as this is an unused feature in C++ that comes with a performance penalty, makes the implementation more complex, and does not properly work with the new type conversion feature).

File size: 10.8 KB
Line 
1//
2// You received this file as part of Finroc
3// A framework for intelligent robot control
4//
5// Copyright (C) Finroc GbR (finroc.org)
6//
7// This program is free software; you can redistribute it and/or modify
8// it under the terms of the GNU General Public License as published by
9// the Free Software Foundation; either version 2 of the License, or
10// (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU General Public License along
18// with this program; if not, write to the Free Software Foundation, Inc.,
19// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20//
21//----------------------------------------------------------------------
22/*!\file    plugins/data_ports/standard/tStandardPort.cpp
23 *
24 * \author  Max Reichardt
25 *
26 * \date    2012-10-23
27 *
28 */
29//----------------------------------------------------------------------
30#include "plugins/data_ports/standard/tStandardPort.h"
31
32//----------------------------------------------------------------------
33// External includes (system with <>, local with "")
34//----------------------------------------------------------------------
35#include "core/port/tPortFactory.h"
36#include "core/port/tPortWrapperBase.h"
37
38//----------------------------------------------------------------------
39// Internal includes with ""
40//----------------------------------------------------------------------
41#include "plugins/data_ports/type_traits.h"
42#include "plugins/data_ports/common/tPullOperation.h"
43#include "plugins/data_ports/standard/tMultiTypePortBufferPool.h"
44#include "plugins/data_ports/optimized/tCheapCopyPort.h"
45#include "plugins/data_ports/optimized/tSingleThreadedCheapCopyPortGeneric.h"
46#include "plugins/data_ports/standard/tPullRequestHandlerRaw.h"
47
48//----------------------------------------------------------------------
49// Debugging
50//----------------------------------------------------------------------
51#include <cassert>
52
53//----------------------------------------------------------------------
54// Namespace usage
55//----------------------------------------------------------------------
56
57//----------------------------------------------------------------------
58// Namespace declaration
59//----------------------------------------------------------------------
60namespace finroc
61{
62namespace data_ports
63{
64namespace standard
65{
66
67//----------------------------------------------------------------------
68// Forward declarations / typedefs / enums
69//----------------------------------------------------------------------
70
71//----------------------------------------------------------------------
72// Const values
73//----------------------------------------------------------------------
74
75//----------------------------------------------------------------------
76// Implementation
77//----------------------------------------------------------------------
78namespace internal
79{
80class tDataPortFactory : public core::tPortFactory
81{
82  virtual core::tAbstractPort& CreatePortImplementation(const std::string& port_name, core::tFrameworkElement& parent,
83      const rrlib::rtti::tType& type, core::tFrameworkElement::tFlags flags) override
84  {
85    core::tPortWrapperBase::tConstructorArguments<common::tAbstractDataPortCreationInfo> creation_info(port_name, &parent, type, flags);
86    return IsCheaplyCopiedType(type) ? // TODO: put it
87#ifndef RRLIB_SINGLE_THREADED
88           * static_cast<core::tAbstractPort*>(new optimized::tCheapCopyPort(creation_info)) :
89#else
90           *static_cast<core::tAbstractPort*>(new optimized::tSingleThreadedCheapCopyPortGeneric(creation_info)) :
91#endif
92           *static_cast<core::tAbstractPort*>(new tStandardPort(creation_info));
93    //return *static_cast<core::tAbstractPort*>(new tStandardPort(creation_info));
94  }
95
96  virtual bool HandlesDataType(const rrlib::rtti::tType& dt) override
97  {
98    return IsDataFlowType(dt);
99  }
100};
101
102tDataPortFactory default_data_port_factory;
103
104} // namespace internal
105
106
107tStandardPort::tStandardPort(common::tAbstractDataPortCreationInfo creation_info) :
108  common::tAbstractDataPort(creation_info),
109  buffer_pool(this->GetDataType(), IsOutputPort() ? 2 : 0),
110  multi_type_buffer_pool(GetFlag(tFlag::MULTI_TYPE_BUFFER_POOL) ? new tMultiTypePortBufferPool(buffer_pool, GetDataType()) : NULL),
111  default_value(CreateDefaultValue(creation_info, buffer_pool)),
112  current_value(0),
113  standard_assign(!GetFlag(tFlag::NON_STANDARD_ASSIGN) && (!GetFlag(tFlag::HAS_QUEUE))),
114  compression_active_status(-2),
115  data_compressor_mutex("tStandardPort data compressor"),
116  input_queue(),
117  pull_request_handler(NULL)
118{
119  if ((!IsDataFlowType(GetDataType())) || IsCheaplyCopiedType(GetDataType()))
120  {
121    FINROC_LOG_PRINT(ERROR, "Data type ", GetDataType().GetName(), " is not suitable for standard port implementation.");
122    abort();
123  }
124
125  // Initialize value
126  tPortBufferManager* initial = default_value.get();
127  if (!initial)
128  {
129    initial = GetUnusedBufferRaw().release();
130    initial->InitReferenceCounter(0);
131  }
132  int pointer_tag = initial->GetPointerTag();
133  initial->AddLocks(1, pointer_tag);
134  initial->SetUnused(false);
135  current_value.store(tTaggedBufferPointer(initial, pointer_tag));
136
137  // Initialize queue
138  if (GetFlag(tFlag::HAS_QUEUE))
139  {
140    input_queue.reset(new common::tPortQueue<tLockingManagerPointer>(!GetFlag(tFlag::HAS_DEQUEUE_ALL_QUEUE)));
141    if (creation_info.max_queue_size > 0)
142    {
143      input_queue->SetMaxQueueLength(creation_info.max_queue_size);
144    }
145  }
146
147  PropagateStrategy(NULL, NULL);  // initialize strategy
148}
149
150
151tStandardPort::~tStandardPort()
152{
153  tTaggedBufferPointer cur_pointer = current_value.load();
154  tPortBufferUnlocker unlocker;
155  unlocker(cur_pointer.GetPointer()); // thread safe, since nobody should publish to port anymore
156
157  delete multi_type_buffer_pool;
158}
159
160/*!
161 * Set current value to default value
162 */
163void tStandardPort::ApplyDefaultValue()
164{
165  if (!default_value)
166  {
167    FINROC_LOG_PRINT(ERROR, "No default value has been set. Doing nothing.");
168    return;
169  }
170  default_value->AddLocks(1);
171  tLockingManagerPointer pointer(default_value.get());
172  Publish(pointer);
173}
174
175void tStandardPort::BrowserPublish(tUnusedManagerPointer& data, bool notify_listener_on_this_port, tChangeStatus change_constant)
176{
177  if (notify_listener_on_this_port)
178  {
179    if (change_constant == tChangeStatus::CHANGED_INITIAL)
180    {
181      PublishImplementation<tChangeStatus::CHANGED_INITIAL, true, true>(data);
182    }
183    else
184    {
185      PublishImplementation<tChangeStatus::CHANGED, true, true>(data);
186    }
187  }
188  else
189  {
190    if (change_constant == tChangeStatus::CHANGED_INITIAL)
191    {
192      PublishImplementation<tChangeStatus::CHANGED_INITIAL, true, false>(data);
193    }
194    else
195    {
196      PublishImplementation<tChangeStatus::CHANGED, true, false>(data);
197    }
198  }
199}
200
201void tStandardPort::CallPullRequestHandler(tPublishingData& publishing_data)
202{
203  tUniversalManagerPointer mgr = pull_request_handler->RawPullRequest(*this);
204  if (mgr)
205  {
206    if (mgr->IsUnused())
207    {
208      mgr->InitReferenceCounter(publishing_data.added_locks);
209    }
210    else
211    {
212      mgr->AddLocks(publishing_data.added_locks - 1); // -1 because we release lock of this pointer later
213    }
214    publishing_data.Init(mgr.release());
215  }
216}
217
218tPortBufferManager* tStandardPort::CreateDefaultValue(const common::tAbstractDataPortCreationInfo& creation_info, tBufferPool& buffer_pool)
219{
220  if (creation_info.DefaultValueSet() || creation_info.flags.Get(tFlag::DEFAULT_ON_DISCONNECT))
221  {
222    tPortBufferManager* pdm = buffer_pool.GetUnusedBuffer(creation_info.data_type).release();
223    pdm->InitReferenceCounter(1);
224    if (creation_info.DefaultValueSet())
225    {
226      rrlib::serialization::tInputStream input(creation_info.GetDefaultGeneric());
227      pdm->GetObject().Deserialize(input);
228    }
229    return pdm;
230  }
231  return NULL;
232}
233
234void tStandardPort::ForwardData(tAbstractDataPort& other)
235{
236  assert(IsDataFlowType(other.GetDataType()) && (!IsCheaplyCopiedType(other.GetDataType())));
237  tLockingManagerPointer pointer = GetCurrentValueRaw();
238  (static_cast<tStandardPort&>(other)).Publish(pointer);
239}
240
241rrlib::rtti::tGenericObject& tStandardPort::GetDefaultBufferRaw()
242{
243  if (IsReady())
244  {
245    FINROC_LOG_PRINT(ERROR, "Please set default value _before_ initializing port");
246    abort();
247  }
248  if (!default_value)
249  {
250    default_value.reset(GetUnusedBufferRaw().release());
251    default_value->InitReferenceCounter(1);
252  }
253  return default_value->GetObject();
254}
255
256int tStandardPort::GetMaxQueueLengthImplementation() const
257{
258  return input_queue ? input_queue->GetMaxQueueLength() : -1;
259}
260
261tStandardPort::tUnusedManagerPointer tStandardPort::GetUnusedBufferRaw(const rrlib::rtti::tType& dt)
262{
263  assert(multi_type_buffer_pool);
264  tUnusedManagerPointer buffer = multi_type_buffer_pool->GetUnusedBuffer(dt);
265  buffer->SetUnused(true);
266  return buffer;
267}
268
269void tStandardPort::InitialPushTo(core::tConnector& connector)
270{
271  tLockingManagerPointer manager = GetCurrentValueRaw(tStrategy::NEVER_PULL);
272  assert(IsReady());
273
274  common::tPublishOperation<tStandardPort, tPublishingData> data(manager, 1000);
275  tStandardPort& target_port = static_cast<tStandardPort&>(connector.Destination());
276  common::tPublishOperation<tStandardPort, tPublishingData>::Receive<tChangeStatus::CHANGED_INITIAL>(data, target_port, *this);
277}
278
279void tStandardPort::LockCurrentValueForPublishing(tPublishingData& publishing_data)
280{
281  tLockingManagerPointer locked_buffer = LockCurrentValueForRead(publishing_data.added_locks);
282  publishing_data.Init(locked_buffer.release());
283}
284
285void tStandardPort::NonStandardAssign(tPublishingData& publishing_data, tChangeStatus change_constant)
286{
287  if (GetFlag(tFlag::USES_QUEUE) && change_constant != tChangeStatus::CHANGED_INITIAL)
288  {
289    assert(GetFlag(tFlag::HAS_QUEUE) && input_queue);
290
291    // enqueue
292    publishing_data.AddLock();
293    input_queue->Enqueue(tLockingManagerPointer(publishing_data.published_buffer));
294  }
295}
296
297tStandardPort::tLockingManagerPointer tStandardPort::PullValueRaw(bool ignore_pull_request_handler_on_this_port)
298{
299  common::tPullOperation<tStandardPort, tPublishingData, tPortBufferManager> pull_operation(200);
300  pull_operation.Execute(*this);
301  return tLockingManagerPointer(pull_operation.published_buffer);
302}
303
304//void tStandardPort::SetMaxQueueLengthImplementation(int length)
305//{
306//  assert((GetFlag(tFlag::HAS_QUEUE) && queue != NULL));
307//  assert((!IsOutputPort()));
308//  assert((length >= 1));
309//  queue->SetMaxLength(length);
310//}
311
312void tStandardPort::SetPullRequestHandler(tPullRequestHandlerRaw* pull_request_handler_)
313{
314  if (pull_request_handler_ != NULL)
315  {
316    this->pull_request_handler = pull_request_handler_;
317  }
318}
319
320//----------------------------------------------------------------------
321// End of namespace declaration
322//----------------------------------------------------------------------
323}
324}
325}
Note: See TracBrowser for help on using the repository browser.