Changeset 114:8b171b809697 in finroc_plugins_data_ports
- Timestamp:
- 07.06.2017 22:48:17 (6 years ago)
- Branch:
- 17.03
- Phase:
- public
- Rebase:
- 34343030376135656463383463303563336530363139663765366161613634643038333935346330
- Files:
-
- 13 edited
Legend:
- Unmodified
- Added
- Removed
-
api/tGenericPortImplementation.cpp
r106 r114 200 200 buffer->GetObject().DeepCopyFrom(data); 201 201 common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataThreadLocalBuffer> publish_operation(buffer.release(), true); 202 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port));202 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port)); 203 203 } 204 204 else … … 208 208 buffer->GetObject().DeepCopyFrom(data); 209 209 common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataGlobalBuffer> publish_operation(buffer); 210 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port));210 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tPortBase&>(port)); 211 211 } 212 212 #else -
api/tGenericPortImplementation.h
r106 r114 193 193 { 194 194 common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataThreadLocalBuffer> publish_operation(static_cast<optimized::tThreadLocalBufferManager*>(data_buffer.implementation.Release()), true); 195 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(cc_port);195 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(cc_port); 196 196 } 197 197 else … … 199 199 optimized::tCheapCopyPort::tUnusedManagerPointer pointer(static_cast<optimized::tCheaplyCopiedBufferManager*>(data_buffer.implementation.Release())); 200 200 common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataGlobalBuffer> publish_operation(pointer); 201 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(cc_port);201 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(cc_port); 202 202 } 203 203 #else -
api/tPortImplementation.h
r101 r114 198 198 tBase::Assign(buffer->GetObject().GetData<typename tBase::tPortBuffer>(), data); 199 199 common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataThreadLocalBuffer> publish_operation(buffer.release(), true); 200 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(port);200 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(port); 201 201 } 202 202 else … … 206 206 tBase::Assign(buffer->GetObject().GetData<typename tBase::tPortBuffer>(), data); 207 207 common::tPublishOperation<optimized::tCheapCopyPort, typename optimized::tCheapCopyPort::tPublishingDataGlobalBuffer> publish_operation(buffer); 208 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(port);208 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(port); 209 209 } 210 210 } -
common/tAbstractDataPort.cpp
r107 r114 102 102 } 103 103 104 void tAbstractDataPort::ConsiderInitialReversePush(tAbstractDataPort& target)105 {106 if (IsReady() && target.IsReady())107 {108 if (ReversePushStrategy() && CountOutgoingConnections() == 1)109 {110 FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", target, " to ", (*this));111 target.InitialPushTo(*this, true);112 }113 }114 }115 116 104 core::tConnector* tAbstractDataPort::CreateConnector(tAbstractPort& destination, const core::tConnectOptions& connect_options) 117 105 { … … 165 153 { 166 154 (static_cast<tAbstractDataPort&>(partner)).PropagateStrategy(nullptr, this); 167 168 // check whether we need an initial reverse push169 this->ConsiderInitialReversePush(static_cast<tAbstractDataPort&>(partner));170 155 } 171 156 } … … 224 209 { 225 210 bool source_port = (strategy >= 1 && max >= 1) || (!HasIncomingConnections()); 226 if (!source_port)227 {228 bool all_sources_reverse_pushers = true;229 for (auto it = IncomingConnectionsBegin(); it != IncomingConnectionsEnd(); ++it)230 {231 tAbstractDataPort& port = static_cast<tAbstractDataPort&>(it->Source());232 if (port.IsReady() && (!port.ReversePushStrategy()))233 {234 all_sources_reverse_pushers = false;235 break;236 }237 }238 source_port = all_sources_reverse_pushers;239 }240 211 if (source_port) 241 212 { … … 243 214 { 244 215 FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial push from ", (*this), " to ", (*push_wanter)); 245 InitialPushTo(*push_wanter, false); 216 217 // find connector 218 bool pushed = false; 219 for (auto it = OutgoingConnectionsBegin(); it != OutgoingConnectionsEnd(); ++it) 220 { 221 if (&it->Destination() == push_wanter) 222 { 223 InitialPushTo(*it); 224 pushed = true; 225 break; 226 } 227 } 228 if (!pushed) 229 { 230 FINROC_LOG_PRINT(DEBUG_WARNING, "Connector not found -> not initial pushing (programming error)"); 231 } 232 246 233 } 247 push_wanter = NULL;234 push_wanter = nullptr; 248 235 } 249 236 } … … 251 238 // okay... do we wish to receive a push? 252 239 // yes if... 253 // 1) we are target of a new connection, have a push strategy, no other sources, and partner is no reverse push source240 // 1) we are target of a new connection, have a push strategy, and no other sources 254 241 // 2) our strategy changed to push, and exactly one source 255 242 int other_sources = 0; … … 261 248 } 262 249 } 263 bool request_push = (new_connection_partner && (max >= 1) && (other_sources == 0) && (!new_connection_partner->ReversePushStrategy())) || ((max >= 1 && strategy < 1) && (other_sources == 1));250 bool request_push = (new_connection_partner && (max >= 1) && (other_sources == 0)) || ((max >= 1 && strategy < 1) && (other_sources == 1)); 264 251 265 252 // register strategy change … … 301 288 } 302 289 303 void tAbstractDataPort::SetReversePushStrategy(bool push)304 {305 if (push == GetFlag(tFlag::PUSH_STRATEGY_REVERSE))306 {307 return;308 }309 310 tLock lock(GetStructureMutex());311 SetFlag(tFlag::PUSH_STRATEGY_REVERSE, push);312 if (push && IsReady()) // strategy change313 {314 for (auto it = OutgoingConnectionsBegin(); it != OutgoingConnectionsEnd(); ++it)315 {316 tAbstractDataPort& port = static_cast<tAbstractDataPort&>(it->Destination());317 if (port.IsReady())318 {319 FINROC_LOG_PRINT(DEBUG_VERBOSE_1, "Performing initial reverse push from ", port, " to ", (*this));320 port.InitialPushTo(*this, true);321 break;322 }323 }324 }325 this->PublishUpdatedInfo(core::tRuntimeListener::tEvent::CHANGE);326 }327 328 290 void tAbstractDataPort::UpdateEdgeStatistics(tAbstractPort& source, tAbstractPort& target, rrlib::rtti::tGenericObject& data) 329 291 { -
common/tAbstractDataPort.h
r107 r114 168 168 169 169 /*! 170 * \return Is data to this port pushed or pulled (in reverse direction)?171 */172 inline bool ReversePushStrategy() const173 {174 return GetFlag(tFlag::PUSH_STRATEGY_REVERSE);175 }176 177 /*!178 170 * \param new_value New value for custom changed flag (for use by custom API - not used/accessed by core port classes.) 179 171 */ … … 209 201 void SetPushStrategy(bool push); 210 202 211 /*!212 * Set whether data should be pushed or pulled in reverse direction213 *214 * \param push Push data?215 */216 void SetReversePushStrategy(bool push);217 218 203 //---------------------------------------------------------------------- 219 204 // Protected methods … … 257 242 * Does this port "want" to receive a value via push strategy? 258 243 * 259 * \param cReverse direction? (typically we push forward) 260 * \param change_constant If this is about an initial push, this should be CHANGED_INITIAL - otherwise CHANGED 244 * \tparam cCHANGE_CONSTANT change_constant If this is about an initial push, this should be CHANGED_INITIAL - otherwise CHANGED 261 245 * \return Answer 262 246 * … … 264 248 * (Standard implementation for this) 265 249 */ 266 template < bool cREVERSE,tChangeStatus cCHANGE_CONSTANT>250 template <tChangeStatus cCHANGE_CONSTANT> 267 251 inline bool WantsPush() const 268 252 { 269 // The compiler should optimize branches away 270 if (cREVERSE) 271 { 272 if (cCHANGE_CONSTANT == tChangeStatus::CHANGED_INITIAL) 273 { 274 return GetFlag(tFlag::PUSH_STRATEGY_REVERSE) && CountOutgoingConnections() <= 1; 275 } 276 else 277 { 278 return GetFlag(tFlag::PUSH_STRATEGY_REVERSE); 279 } 280 } 281 else if (cCHANGE_CONSTANT == tChangeStatus::CHANGED_INITIAL) 253 if (cCHANGE_CONSTANT == tChangeStatus::CHANGED_INITIAL) 282 254 { 283 255 // We don't want initial pushes to ports with multiple inputs … … 333 305 334 306 /*! 335 * Should be called in situations where there might need to be an initial push336 * (e.g. connecting or strategy change)337 *338 * \param target Potential Target port339 */340 void ConsiderInitialReversePush(tAbstractDataPort& target);341 342 /*!343 307 * Forward current strategy to source ports (helper for above - and possibly variations of above) 344 308 * … … 360 324 361 325 /*! 362 * Push initial value to the specified port 363 * (checks etc. have been done by AbstractDataPort class) 364 * 365 * \param target Port to push data to 366 * \param reverse Is this a reverse push? 367 */ 368 virtual void InitialPushTo(tAbstractPort& target, bool reverse) = 0; 326 * Push initial value to destination port 327 * (checks etc. have been done by tAbstractDataPort class) 328 * 329 * \param connector Connector connecting both ports 330 */ 331 virtual void InitialPushTo(core::tConnector& connector) = 0; 369 332 370 333 virtual void OnConnect(tAbstractPort& partner, bool partner_is_destination) override; -
common/tPublishOperation.h
r107 r114 92 92 * 93 93 * \param port (Output) port to perform publishing operation on 94 * \tparam REVERSE Publish in reverse direction? (typical is forward)95 94 * \tparam CHANGE_CONSTANT changedConstant to use 96 * \tparam BROWSER_PUBLISH Inform this port's listeners on change and also publish in reverse direction? (only set from BrowserPublish())95 * \tparam BROWSER_PUBLISH Inform this port's listeners on change? (only set from BrowserPublish()) 97 96 */ 98 template < bool REVERSE,tChangeStatus CHANGE_CONSTANT, bool BROWSER_PUBLISH, bool NOTIFY_LISTENER_ON_THIS_PORT>97 template <tChangeStatus CHANGE_CONSTANT, bool BROWSER_PUBLISH, bool NOTIFY_LISTENER_ON_THIS_PORT> 99 98 inline void Execute(TPort& port) 100 99 { … … 128 127 #endif 129 128 130 if (!REVERSE)131 { 132 for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it)129 for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it) 130 { 131 if (static_cast<const common::tAbstractDataPort&>(it->Destination()).template WantsPush<CHANGE_CONSTANT>()) 133 132 { 134 if (static_cast<const common::tAbstractDataPort&>(it->Destination()).template WantsPush<REVERSE, CHANGE_CONSTANT>()) 135 { 136 if (it->Flags().Get(core::tConnectionFlag::CONVERSION)) 137 { 138 static_cast<const tConversionConnector&>(*it).Publish(this->GetObject(), CHANGE_CONSTANT); 139 } 140 else 141 { 142 TPort& destination_port = static_cast<TPort&>(it->Destination()); 143 Receive<REVERSE, CHANGE_CONSTANT>(*this, destination_port, port); 144 } 145 } 146 } 147 } 148 149 if (REVERSE || BROWSER_PUBLISH) 150 { 151 // reverse 152 for (auto it = port.IncomingConnectionsBegin(); it != port.IncomingConnectionsEnd(); ++it) 153 { 154 if (static_cast<const common::tAbstractDataPort&>(it->Source()).template WantsPush<true, CHANGE_CONSTANT>() && (!it->Flags().Get(core::tConnectionFlag::CONVERSION))) 155 { 156 TPort& destination_port = static_cast<TPort&>(it->Source()); 157 Receive<true, CHANGE_CONSTANT>(*this, destination_port, port); 133 if (it->Flags().Get(core::tConnectionFlag::CONVERSION)) 134 { 135 static_cast<const tConversionConnector&>(*it).Publish(this->GetObject(), CHANGE_CONSTANT); 136 } 137 else 138 { 139 TPort& destination_port = static_cast<TPort&>(it->Destination()); 140 Receive<CHANGE_CONSTANT>(*this, destination_port, port); 158 141 } 159 142 } … … 165 148 * \param port Port that receives data 166 149 * \param origin Port that value was received from 167 * \tparam REVERSE Publish in reverse direction? (typical is forward)168 150 * \tparam CHANGE_CONSTANT changedConstant to use 169 151 */ 170 template < bool REVERSE,tChangeStatus CHANGE_CONSTANT>152 template <tChangeStatus CHANGE_CONSTANT> 171 153 static inline void Receive(typename std::conditional<TPublishingData::cCOPY_ON_RECEIVE, TPublishingData, TPublishingData&>::type publishing_data, TPort& port, TPort& origin) 172 154 { … … 179 161 port.UpdateStatistics(publishing_data, origin, port); 180 162 181 if (!REVERSE)182 {183 // forward184 for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it)163 // forward 164 for (auto it = port.OutgoingConnectionsBegin(); it != port.OutgoingConnectionsEnd(); ++it) 165 { 166 if (static_cast<const common::tAbstractDataPort&>(it->Destination()).template WantsPush<CHANGE_CONSTANT>()) 185 167 { 186 if (static_cast<const common::tAbstractDataPort&>(it->Destination()).template WantsPush<REVERSE, CHANGE_CONSTANT>()) 187 { 188 if (it->Flags().Get(core::tConnectionFlag::CONVERSION)) 189 { 190 static_cast<const tConversionConnector&>(*it).Publish(publishing_data.GetObject(), CHANGE_CONSTANT); 191 } 192 else 193 { 194 TPort& destination_port = static_cast<TPort&>(it->Destination()); 195 Receive<false, CHANGE_CONSTANT>(publishing_data, destination_port, port); 196 } 197 } 198 } 199 200 // reverse 201 for (auto it = port.IncomingConnectionsBegin(); it != port.IncomingConnectionsEnd(); ++it) 202 { 203 if (&it->Source() != &origin && static_cast<const common::tAbstractDataPort&>(it->Source()).template WantsPush<true, CHANGE_CONSTANT>() && (!it->Flags().Get(core::tConnectionFlag::CONVERSION))) 204 { 205 TPort& destination_port = static_cast<TPort&>(it->Source()); 206 Receive<true, CHANGE_CONSTANT>(publishing_data, destination_port, port); 168 if (it->Flags().Get(core::tConnectionFlag::CONVERSION)) 169 { 170 static_cast<const tConversionConnector&>(*it).Publish(publishing_data.GetObject(), CHANGE_CONSTANT); 171 } 172 else 173 { 174 TPort& destination_port = static_cast<TPort&>(it->Destination()); 175 Receive<CHANGE_CONSTANT>(publishing_data, destination_port, port); 207 176 } 208 177 } -
optimized/tCheapCopyPort.cpp
r106 r114 170 170 if (change_constant == tChangeStatus::CHANGED_INITIAL) 171 171 { 172 data.Execute< false,tChangeStatus::CHANGED_INITIAL, true, true>(*this);172 data.Execute<tChangeStatus::CHANGED_INITIAL, true, true>(*this); 173 173 } 174 174 else 175 175 { 176 data.Execute< false,tChangeStatus::CHANGED, true, true>(*this);176 data.Execute<tChangeStatus::CHANGED, true, true>(*this); 177 177 } 178 178 } … … 181 181 if (change_constant == tChangeStatus::CHANGED_INITIAL) 182 182 { 183 data.Execute< false,tChangeStatus::CHANGED_INITIAL, true, false>(*this);183 data.Execute<tChangeStatus::CHANGED_INITIAL, true, false>(*this); 184 184 } 185 185 else 186 186 { 187 data.Execute< false,tChangeStatus::CHANGED, true, false>(*this);187 data.Execute<tChangeStatus::CHANGED, true, false>(*this); 188 188 } 189 189 } … … 196 196 if (change_constant == tChangeStatus::CHANGED_INITIAL) 197 197 { 198 data.Execute< false,tChangeStatus::CHANGED_INITIAL, true, true>(*this);198 data.Execute<tChangeStatus::CHANGED_INITIAL, true, true>(*this); 199 199 } 200 200 else 201 201 { 202 data.Execute< false,tChangeStatus::CHANGED, true, true>(*this);202 data.Execute<tChangeStatus::CHANGED, true, true>(*this); 203 203 } 204 204 } … … 207 207 if (change_constant == tChangeStatus::CHANGED_INITIAL) 208 208 { 209 data.Execute< false,tChangeStatus::CHANGED_INITIAL, true, false>(*this);209 data.Execute<tChangeStatus::CHANGED_INITIAL, true, false>(*this); 210 210 } 211 211 else 212 212 { 213 data.Execute< false,tChangeStatus::CHANGED, true, false>(*this);213 data.Execute<tChangeStatus::CHANGED, true, false>(*this); 214 214 } 215 215 } … … 278 278 { 279 279 common::tPublishOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer> data(static_cast<tThreadLocalBufferManager*>(current_buffer.GetPointer()), false); 280 data.Execute< false,tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other));280 data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 281 281 return; 282 282 } … … 294 294 { 295 295 common::tPublishOperation<tCheapCopyPort, tPublishingDataThreadLocalBuffer> data(static_cast<tThreadLocalBufferManager*>(current_buffer.GetPointer()), true); 296 data.Execute< false,tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other));296 data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 297 297 return; 298 298 } … … 305 305 CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL); 306 306 common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(unused_manager); 307 data.Execute< false,tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other));307 data.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tCheapCopyPort&>(other)); 308 308 } 309 309 } … … 379 379 // 380 380 381 void tCheapCopyPort::InitialPushTo( tAbstractPort& target, bool reverse)381 void tCheapCopyPort::InitialPushTo(core::tConnector& connector) 382 382 { 383 383 // this is a one-time event => use global buffer … … 385 385 CopyCurrentValueToManager(*unused_manager, tStrategy::NEVER_PULL); 386 386 common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer> data(unused_manager); 387 tCheapCopyPort& target_port = static_cast<tCheapCopyPort&>(target); 388 if (reverse) 389 { 390 common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer>::Receive<true, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 391 } 392 else 393 { 394 common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer>::Receive<false, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 395 } 387 tCheapCopyPort& target_port = static_cast<tCheapCopyPort&>(connector.Destination()); 388 common::tPublishOperation<tCheapCopyPort, tPublishingDataGlobalBuffer>::Receive<tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 396 389 } 397 390 -
optimized/tCheapCopyPort.h
r107 r114 819 819 // } 820 820 821 virtual void InitialPushTo( tAbstractPort& target, bool reverse) override;821 virtual void InitialPushTo(core::tConnector& connector) override; 822 822 823 823 /*! -
optimized/tSingleThreadedCheapCopyPortGeneric.cpp
r106 r114 126 126 if (change_constant == tChangeStatus::CHANGED_INITIAL) 127 127 { 128 publish_operation.Execute< false,tChangeStatus::CHANGED_INITIAL, true, true>(*this);128 publish_operation.Execute<tChangeStatus::CHANGED_INITIAL, true, true>(*this); 129 129 } 130 130 else 131 131 { 132 publish_operation.Execute< false,tChangeStatus::CHANGED, true, true>(*this);132 publish_operation.Execute<tChangeStatus::CHANGED, true, true>(*this); 133 133 } 134 134 } … … 137 137 if (change_constant == tChangeStatus::CHANGED_INITIAL) 138 138 { 139 publish_operation.Execute< false,tChangeStatus::CHANGED_INITIAL, true, false>(*this);139 publish_operation.Execute<tChangeStatus::CHANGED_INITIAL, true, false>(*this); 140 140 } 141 141 else 142 142 { 143 publish_operation.Execute< false,tChangeStatus::CHANGED, true, false>(*this);143 publish_operation.Execute<tChangeStatus::CHANGED, true, false>(*this); 144 144 } 145 145 } … … 152 152 153 153 common::tPublishOperation<tSingleThreadedCheapCopyPortGeneric, tPublishingData> publish_operation(current_value); 154 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(static_cast<tSingleThreadedCheapCopyPortGeneric&>(other));154 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(static_cast<tSingleThreadedCheapCopyPortGeneric&>(other)); 155 155 } 156 156 … … 160 160 } 161 161 162 void tSingleThreadedCheapCopyPortGeneric::InitialPushTo( tAbstractPort& target, bool reverse)162 void tSingleThreadedCheapCopyPortGeneric::InitialPushTo(core::tConnector& connector) 163 163 { 164 164 common::tPublishOperation<tSingleThreadedCheapCopyPortGeneric, tPublishingData> data(current_value); 165 tSingleThreadedCheapCopyPortGeneric& target_port = static_cast<tSingleThreadedCheapCopyPortGeneric&>(target); 166 if (reverse) 167 { 168 common::tPublishOperation<tSingleThreadedCheapCopyPortGeneric, tPublishingData>::Receive<true, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 169 } 170 else 171 { 172 common::tPublishOperation<tSingleThreadedCheapCopyPortGeneric, tPublishingData>::Receive<false, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 173 } 165 tSingleThreadedCheapCopyPortGeneric& target_port = static_cast<tSingleThreadedCheapCopyPortGeneric&>(connector.Destination()); 166 common::tPublishOperation<tSingleThreadedCheapCopyPortGeneric, tPublishingData>::Receive<tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 174 167 } 175 168 … … 186 179 current_value.timestamp = timestamp; 187 180 common::tPublishOperation<tSingleThreadedCheapCopyPortGeneric, tPublishingData> publish_operation(current_value); 188 publish_operation.Execute< false,tChangeStatus::CHANGED, false, false>(*this);181 publish_operation.Execute<tChangeStatus::CHANGED, false, false>(*this); 189 182 } 190 183 } -
optimized/tSingleThreadedCheapCopyPortGeneric.h
r107 r114 294 294 295 295 virtual int GetMaxQueueLengthImplementation() const override; 296 virtual void InitialPushTo( tAbstractPort& target, bool reverse) override;296 virtual void InitialPushTo(core::tConnector& connector) override; 297 297 298 298 /*! -
standard/tStandardPort.cpp
r107 r114 179 179 if (change_constant == tChangeStatus::CHANGED_INITIAL) 180 180 { 181 PublishImplementation< false,tChangeStatus::CHANGED_INITIAL, true, true>(data);181 PublishImplementation<tChangeStatus::CHANGED_INITIAL, true, true>(data); 182 182 } 183 183 else 184 184 { 185 PublishImplementation< false,tChangeStatus::CHANGED, true, true>(data);185 PublishImplementation<tChangeStatus::CHANGED, true, true>(data); 186 186 } 187 187 } … … 190 190 if (change_constant == tChangeStatus::CHANGED_INITIAL) 191 191 { 192 PublishImplementation< false,tChangeStatus::CHANGED_INITIAL, true, false>(data);192 PublishImplementation<tChangeStatus::CHANGED_INITIAL, true, false>(data); 193 193 } 194 194 else 195 195 { 196 PublishImplementation< false,tChangeStatus::CHANGED, true, false>(data);196 PublishImplementation<tChangeStatus::CHANGED, true, false>(data); 197 197 } 198 198 } … … 267 267 } 268 268 269 void tStandardPort::InitialPushTo( tAbstractPort& target, bool reverse)269 void tStandardPort::InitialPushTo(core::tConnector& connector) 270 270 { 271 271 tLockingManagerPointer manager = GetCurrentValueRaw(tStrategy::NEVER_PULL); … … 273 273 274 274 common::tPublishOperation<tStandardPort, tPublishingData> data(manager, 1000); 275 tStandardPort& target_port = static_cast<tStandardPort&>(target); 276 if (reverse) 277 { 278 common::tPublishOperation<tStandardPort, tPublishingData>::Receive<true, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 279 } 280 else 281 { 282 common::tPublishOperation<tStandardPort, tPublishingData>::Receive<false, tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 283 } 275 tStandardPort& target_port = static_cast<tStandardPort&>(connector.Destination()); 276 common::tPublishOperation<tStandardPort, tPublishingData>::Receive<tChangeStatus::CHANGED_INITIAL>(data, target_port, *this); 284 277 } 285 278 -
standard/tStandardPort.h
r107 r114 256 256 inline void Publish(tUnusedManagerPointer& data) 257 257 { 258 PublishImplementation< false,tChangeStatus::CHANGED, false, false>(data);258 PublishImplementation<tChangeStatus::CHANGED, false, false>(data); 259 259 } 260 260 inline void Publish(tLockingManagerPointer& data) 261 261 { 262 PublishImplementation< false,tChangeStatus::CHANGED, false, false>(data);262 PublishImplementation<tChangeStatus::CHANGED, false, false>(data); 263 263 } 264 264 … … 490 490 virtual int GetMaxQueueLengthImplementation() const override; 491 491 492 // quite similar to publish 493 virtual void InitialPushTo(tAbstractPort& target, bool reverse) override; 492 virtual void InitialPushTo(core::tConnector& connector) override; 494 493 495 494 /*! … … 538 537 * 539 538 * \param data Data to publish 540 * \param reverse Value received in reverse direction?541 539 * \param changed_constant changedConstant to use 542 540 */ 543 inline void Publish(tUnusedManagerPointer& data, bool reverse, tChangeStatus changed_constant) 544 { 545 if (!reverse) 546 { 547 if (changed_constant == tChangeStatus::CHANGED) 548 { 549 PublishImplementation<false, tChangeStatus::CHANGED, false, false>(data); 550 } 551 else 552 { 553 PublishImplementation<false, tChangeStatus::CHANGED_INITIAL, false, false>(data); 554 } 541 inline void Publish(tUnusedManagerPointer& data, tChangeStatus changed_constant) 542 { 543 if (changed_constant == tChangeStatus::CHANGED) 544 { 545 PublishImplementation<tChangeStatus::CHANGED, false, false>(data); 555 546 } 556 547 else 557 548 { 558 if (changed_constant == tChangeStatus::CHANGED) 559 { 560 PublishImplementation<true, tChangeStatus::CHANGED, false, false>(data); 561 } 562 else 563 { 564 PublishImplementation<true, tChangeStatus::CHANGED_INITIAL, false, false>(data); 565 } 549 PublishImplementation<tChangeStatus::CHANGED_INITIAL, false, false>(data); 566 550 } 567 551 } … … 575 559 * 576 560 * \param data Data buffer 577 * \tparam REVERSE Publish in reverse direction? (typical is forward)578 561 * \tparam CHANGE_CONSTANT changedConstant to use 579 562 * \tparam BROWSER_PUBLISH Inform this port's listeners on change and also publish in reverse direction? (only set from BrowserPublish()) 580 563 */ 581 template < bool REVERSE,tChangeStatus CHANGE_CONSTANT, bool BROWSER_PUBLISH, bool NOTIFY_LISTENER_ON_THIS_PORT, typename TDeleter>564 template <tChangeStatus CHANGE_CONSTANT, bool BROWSER_PUBLISH, bool NOTIFY_LISTENER_ON_THIS_PORT, typename TDeleter> 582 565 inline void PublishImplementation(std::unique_ptr<tPortBufferManager, TDeleter>& data) 583 566 { … … 589 572 590 573 common::tPublishOperation<tStandardPort, tPublishingData> publish_operation(data, 1000); 591 publish_operation.Execute< REVERSE,CHANGE_CONSTANT, BROWSER_PUBLISH, NOTIFY_LISTENER_ON_THIS_PORT>(*this);574 publish_operation.Execute<CHANGE_CONSTANT, BROWSER_PUBLISH, NOTIFY_LISTENER_ON_THIS_PORT>(*this); 592 575 } 593 576 -
tests/initial_pushing.cpp
r106 r114 97 97 tOutputPort<T> output_port("Output Port", parent); 98 98 tInputPort<T> input_port("Input Port", parent); 99 tOutputPort<T> output_port_ reverse("Output Port with reverse pushing", core::tFrameworkElement::tFlag::PUSH_STRATEGY_REVERSE, parent);99 tOutputPort<T> output_port_2("Another output port", parent); 100 100 core::tFrameworkElement::InitAll(); 101 101 … … 105 105 // Connect to other ports and check their values 106 106 output_port.ConnectTo(input_port); 107 output_port_ reverse.ConnectTo(input_port);107 output_port_2.ConnectTo(input_port); 108 108 CheckPortValue(input_port, test_values[0]); 109 CheckPortValue(output_port_reverse, test_values[0]);110 109 111 110 // Change strategy and see if everything behaves as expected … … 114 113 input_port.SetPushStrategy(true); 115 114 CheckPortValue(input_port, test_values[0]); // expects old value because we have two sources => no push 116 CheckPortValue(output_port_reverse, test_values[0]);117 output_port_reverse.SetReversePushStrategy(false);118 115 output_port.Publish(test_values[2]); 119 CheckPortValue(output_port_reverse, test_values[0]);120 output_port_reverse.SetReversePushStrategy(true);121 CheckPortValue(output_port_reverse, test_values[2]);122 116 123 117 // now for a complex net
Note: See TracChangeset
for help on using the changeset viewer.