Changeset 46:ea5236122831 in finroc_plugins_scheduling


Ignore:
Timestamp:
16.07.2019 17:10:01 (2 months ago)
Author:
Max Reichardt <max.reichardt@…>
Branch:
17.03
Phase:
public
Message:

Adds 'pause thread' port to thread containers - and tidies code around implementation

Files:
4 edited

Legend:

Unmodified
Added
Removed
  • tThreadContainerBase.cpp

    r45 r46  
    7979  execution_duration("Execution Duration", new core::tPortGroup(component, "Profiling", core::tFrameworkElementFlag::INTERFACE | core::tFrameworkElementFlag::INTERFACE_FOR_DATA_PORTS | core::tFrameworkElementFlag::INTERFACE_FOR_OUTPUTS, data_ports::cDEFAULT_OUTPUT_PORT_FLAGS)), 
    8080  execution_details("Details", execution_duration.GetParent(), IsProfilingEnabled() ? core::tFrameworkElementFlag::PORT : core::tFrameworkElementFlag::DELETED), 
     81  pause_thread("Pause Thread", new core::tPortGroup(component, "Thread Control", core::tFrameworkElementFlag::INTERFACE | core::tFrameworkElementFlag::INTERFACE_FOR_DATA_PORTS | core::tFrameworkElementFlag::INTERFACE_FOR_INPUTS, data_ports::cDEFAULT_INPUT_PORT_FLAGS)), 
    8182  cycle_time("Cycle Time", component, std::chrono::milliseconds(40), data_ports::tBounds<rrlib::time::tDuration>(rrlib::time::tDuration::zero(), std::chrono::seconds(60))), 
    8283  thread(), 
     
    8586{ 
    8687  component->AddAnnotation(*new tExecutionControl(*this)); 
     88  pause_thread.AddPortListenerSimple(*this); 
    8789} 
    8890 
     
    9294tThreadContainerBase::~tThreadContainerBase() 
    9395{ 
    94   if (thread.get()) 
     96  if (thread) 
    9597  { 
    9698    StopThread(); 
     
    101103void tThreadContainerBase::ExecuteCycle() 
    102104{ 
    103   if (!thread.get()) 
     105  if (!thread) 
    104106  { 
    105107    rrlib::thread::tLock l(mutex); 
    106     tThreadContainerThread* thread_tmp = new tThreadContainerThread(component, cycle_time.Get(), warn_on_cycle_time_exceed.Get(), execution_duration, execution_details); 
     108    tThreadContainerThread* thread_tmp = new tThreadContainerThread(*this, component); 
    107109    thread_tmp->SetAutoDelete(); 
    108110    thread = std::static_pointer_cast<tThreadContainerThread>(thread_tmp->GetSharedPtr()); 
     
    121123{ 
    122124  rrlib::thread::tLock l(mutex); 
    123   if (thread.get() != NULL) 
     125  if (thread) 
    124126  { 
    125127    thread->Join(); 
     
    127129  } 
    128130} 
     131 
     132void tThreadContainerBase::OnPortChange(data_ports::tChangeContext& change_context) 
     133{ 
     134  rrlib::thread::tLock l(mutex); 
     135  if (thread && (!pause_thread.Get()) && (thread->IsPausing())) 
     136  { 
     137    thread->ContinueThread(); 
     138  } 
     139} 
     140 
     141void tThreadContainerBase::PauseExecution() 
     142{ 
     143  rrlib::thread::tLock l(mutex); 
     144  StopThread(); 
     145  JoinThread(); 
     146} 
     147 
    129148 
    130149void tThreadContainerBase::StartExecution() 
     
    136155    return; 
    137156  } 
    138   tThreadContainerThread* thread_tmp = new tThreadContainerThread(component, cycle_time.Get(), warn_on_cycle_time_exceed.Get(), execution_duration, execution_details); 
     157  tThreadContainerThread* thread_tmp = new tThreadContainerThread(*this, component); 
    139158  thread_tmp->SetAutoDelete(); 
    140159  thread = std::static_pointer_cast<tThreadContainerThread>(thread_tmp->GetSharedPtr()); 
  • tThreadContainerBase.h

    r45 r46  
    9191  data_ports::tOutputPort<std::vector<tTaskProfile>> execution_details; 
    9292 
     93  /*! Port to pause thread execution */ 
     94  data_ports::tInputPort<bool> pause_thread; 
     95 
    9396 
    9497  tThreadContainerBase(core::tFrameworkElement* component); 
     
    123126  void JoinThread(); 
    124127 
    125   virtual void PauseExecution() override 
     128  rrlib::thread::tRecursiveMutex& Mutex() 
    126129  { 
    127     StopThread(); 
    128     JoinThread(); 
     130    return mutex; 
    129131  } 
     132 
     133  void OnPortChange(data_ports::tChangeContext& change_context); 
     134 
     135  virtual void PauseExecution() override; 
    130136 
    131137  /*! 
     
    159165 
    160166  /*! Mutex for operations on thread container */ 
    161   rrlib::thread::tOrderedMutex mutex; 
     167  rrlib::thread::tRecursiveMutex mutex; 
    162168 
    163169  core::tFrameworkElement& component; 
  • tThreadContainerThread.cpp

    r44 r46  
    4242#include "plugins/scheduling/tExecutionControl.h" 
    4343#include "plugins/scheduling/tPeriodicFrameworkElementTask.h" 
     44#include "plugins/scheduling/tThreadContainerBase.h" 
    4445 
    4546//---------------------------------------------------------------------- 
     
    116117 
    117118 
    118 tThreadContainerThread::tThreadContainerThread(core::tFrameworkElement& thread_container, rrlib::time::tDuration default_cycle_time, 
    119     bool warn_on_cycle_time_exceed, data_ports::tOutputPort<rrlib::time::tDuration> execution_duration, 
    120     data_ports::tOutputPort<std::vector<tTaskProfile>> execution_details) : 
    121   tLoopThread(default_cycle_time, true, warn_on_cycle_time_exceed), 
     119tThreadContainerThread::tThreadContainerThread(tThreadContainerBase& thread_container_base, core::tFrameworkElement& thread_container_element) : 
     120  tLoopThread(thread_container_base.GetCycleTime(), true, thread_container_base.warn_on_cycle_time_exceed.Get()), 
    122121  tWatchDogTask(true), 
    123   thread_container(thread_container), 
     122  thread_container_base(thread_container_base), 
     123  thread_container_element(thread_container_element), 
    124124  reschedule(true), 
    125125  schedule(), 
    126126  task_set_first_index { 0, 0, 0, 0 }, 
    127                      execution_duration(execution_duration), 
    128                      execution_details(execution_details), 
    129127                     total_execution_duration(0), 
    130128                     max_execution_duration(0), 
     
    133131                     current_cycle_start_application_time(rrlib::time::cNO_TIME) 
    134132{ 
    135   this->SetName("ThreadContainer " + thread_container.GetName()); 
    136   this->thread_container.GetRuntime().AddListener(*this); 
     133  this->SetName("ThreadContainer " + thread_container_element.GetName()); 
     134  this->thread_container_element.GetRuntime().AddListener(*this); 
    137135#ifdef RRLIB_SINGLE_THREADED 
    138136  assert(single_thread_container == nullptr); 
     
    143141tThreadContainerThread::~tThreadContainerThread() 
    144142{ 
    145   this->thread_container.GetRuntime().RemoveListener(*this); 
     143  this->thread_container_element.GetRuntime().RemoveListener(*this); 
    146144  single_thread_container = nullptr; 
    147145} 
     
    215213#endif 
    216214    tExecutionControl* execution_control = dest_aggregator ? tExecutionControl::Find(*dest_aggregator) : nullptr; 
    217     if ((!execution_control) || execution_control->GetAnnotated<core::tFrameworkElement>() != &thread_container) 
     215    if ((!execution_control) || execution_control->GetAnnotated<core::tFrameworkElement>() != &thread_container_element) 
    218216    { 
    219217      continue; 
     
    358356    reschedule = false; 
    359357    { 
    360       tLock lock(this->thread_container.GetStructureMutex()); 
     358      tLock lock(this->thread_container_element.GetStructureMutex()); 
    361359      schedule.clear(); 
    362360      rrlib::time::tTimestamp start_time = rrlib::time::Now(); 
     
    369367 
    370368      // find tasks and classified interfaces 
    371       for (auto it = thread_container.SubElementsBegin(true); it != thread_container.SubElementsEnd(); ++it) 
    372       { 
    373         if ((!it->IsReady()) || tExecutionControl::Find(*it)->GetAnnotated<core::tFrameworkElement>() != &thread_container)    // don't handle elements in nested thread containers 
     369      for (auto it = thread_container_element.SubElementsBegin(true); it != thread_container_element.SubElementsEnd(); ++it) 
     370      { 
     371        if ((!it->IsReady()) || tExecutionControl::Find(*it)->GetAnnotated<core::tFrameworkElement>() != &thread_container_element)    // don't handle elements in nested thread containers 
    374372        { 
    375373          continue; 
     
    608606  } 
    609607 
     608  if (thread_container_base.pause_thread.Get()) 
     609  { 
     610    rrlib::thread::tLock lock(thread_container_base.Mutex(), false); 
     611    if (lock.TryLock())  // this avoids dead-lock if ThreadContainerBase already wants to join; if it fails, thread can simply go to sleep on the next cycle 
     612    { 
     613      if (thread_container_base.pause_thread.Get()) 
     614      { 
     615        this->PauseThread(); 
     616      } 
     617    } 
     618    return; 
     619  } 
     620 
    610621  // execute tasks 
    611   SetDeadLine(rrlib::time::Now() + GetCycleTime() * 4 + std::chrono::seconds(4)); 
    612  
    613   if (execution_details.GetWrapped() == nullptr || execution_count == 0) // we skip profiling the first/initial execution 
     622  tWatchDogTask::SetDeadLine(rrlib::time::Now() + GetCycleTime() * 4 + std::chrono::seconds(4)); 
     623 
     624  if (thread_container_base.execution_details.GetWrapped() == nullptr || execution_count == 0) // we skip profiling the first/initial execution 
    614625  { 
    615626    current_cycle_start_application_time = IsUsingApplicationTime() && this->IsAlive() ? tLoopThread::GetCurrentCycleStartTime() : rrlib::time::Now(); 
    616627 
    617     execution_duration.Publish(GetLastCycleTime()); 
     628    thread_container_base.execution_duration.Publish(GetLastCycleTime()); 
    618629    for (size_t i = 0u; i < schedule.size(); i++) 
    619630    { 
     
    626637  else 
    627638  { 
    628     data_ports::tPortDataPointer<std::vector<tTaskProfile>> details = execution_details.GetUnusedBuffer(); 
     639    data_ports::tPortDataPointer<std::vector<tTaskProfile>> details = thread_container_base.execution_details.GetUnusedBuffer(); 
    629640    details->resize(schedule.size() + 1); 
    630641    rrlib::time::tTimestamp start = rrlib::time::Now(true); 
     
    672683    // Fill thread profile to publish 
    673684    tTaskProfile& profile = (*details)[0]; 
    674     profile.handle = thread_container.GetHandle(); 
     685    profile.handle = thread_container_element.GetHandle(); 
    675686    profile.last_execution_duration = duration; 
    676687    profile.max_execution_duration = this->max_execution_duration; 
     
    687698      } 
    688699    } 
    689     execution_duration.Publish(duration); 
    690     execution_details.Publish(details); 
     700    thread_container_base.execution_duration.Publish(duration); 
     701    thread_container_base.execution_details.Publish(details); 
    691702  } 
    692703 
     
    696707void tThreadContainerThread::OnConnectorChange(core::tRuntimeListener::tEvent change_type, core::tConnector& connector) 
    697708{ 
    698   if (connector.Source().IsChildOf(this->thread_container) && connector.Destination().IsChildOf(this->thread_container)) 
     709  if (connector.Source().IsChildOf(this->thread_container_element) && connector.Destination().IsChildOf(this->thread_container_element)) 
    699710  { 
    700711    reschedule = true; 
     
    704715void tThreadContainerThread::OnFrameworkElementChange(core::tRuntimeListener::tEvent change_type, core::tFrameworkElement& element) 
    705716{ 
    706   if (element.GetAnnotation<tPeriodicFrameworkElementTask>() && element.IsChildOf(this->thread_container, true)) 
     717  if (element.GetAnnotation<tPeriodicFrameworkElementTask>() && element.IsChildOf(this->thread_container_element, true)) 
    707718  { 
    708719    reschedule = true; 
  • tThreadContainerThread.h

    r44 r46  
    6262//---------------------------------------------------------------------- 
    6363struct tPeriodicFrameworkElementTask; 
     64class tThreadContainerBase; 
    6465 
    6566//---------------------------------------------------------------------- 
     
    7879public: 
    7980 
    80   tThreadContainerThread(core::tFrameworkElement& thread_container, rrlib::time::tDuration default_cycle_time, 
    81                          bool warn_on_cycle_time_exceed, data_ports::tOutputPort<rrlib::time::tDuration> execution_duration, 
    82                          data_ports::tOutputPort<std::vector<tTaskProfile>> execution_details); 
     81  tThreadContainerThread(tThreadContainerBase& thread_container_base, core::tFrameworkElement& thread_container_element); 
    8382 
    8483  virtual ~tThreadContainerThread(); 
     
    124123  virtual void Run() override; 
    125124 
     125 
    126126//---------------------------------------------------------------------- 
    127127// Private fields and methods 
     
    130130 
    131131  /*! Thread container that thread belongs to */ 
    132   core::tFrameworkElement& thread_container; 
     132  tThreadContainerBase& thread_container_base; 
     133  core::tFrameworkElement& thread_container_element; 
    133134 
    134135  /*! true, when thread needs to make a new schedule before next run */ 
     
    143144  /*! Indices where the different sets of tasks start in the schedule */ 
    144145  size_t task_set_first_index[4]; 
    145  
    146   /*! Port to publish time spent in last call to MainLoopCallback() */ 
    147   data_ports::tOutputPort<rrlib::time::tDuration> execution_duration; 
    148  
    149   /*! 
    150    * Port to publish details on execution (port is only created if profiling is enabled) 
    151    * The first element contains the profile the whole thread container. 
    152    * The other elements contain the profile the executed tasks - in the order of their execution 
    153    */ 
    154   data_ports::tOutputPort<std::vector<tTaskProfile>> execution_details; 
    155146 
    156147  /*! Total execution duration of thread */ 
Note: See TracChangeset for help on using the changeset viewer.