Changeset 10566
- Timestamp:
- 02/07/10 03:24:00 (7 months ago)
- Location:
- trunk/Padre/lib/Padre
- Files:
-
- 3 modified
-
SlaveDriver.pm (modified) (8 diffs)
-
Task.pm (modified) (2 diffs)
-
TaskManager.pm (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/Padre/lib/Padre/SlaveDriver.pm
r10564 r10566 47 47 use Thread::Queue 2.11; 48 48 49 # This event is triggered by the worker thread main loop after 50 # finishing a task. 51 our $TASK_DONE_EVENT : shared; 52 BEGIN { $TASK_DONE_EVENT = Wx::NewEventType; } 53 54 # This event is triggered by the worker thread main loop before 55 # running a task. 56 our $TASK_START_EVENT : shared; 57 BEGIN { $TASK_START_EVENT = Wx::NewEventType; } 58 49 59 =head3 C<new> 50 60 … … 55 65 =cut 56 66 57 {67 SCOPE: { 58 68 my $SlaveDriver; 59 69 sub new { 60 70 my $class = shift; 61 71 return $SlaveDriver if defined $SlaveDriver; 62 $SlaveDriver = bless {} => $class;63 72 @_ = (); 64 $SlaveDriver->{cmd_queue} = Thread::Queue->new(); 65 $SlaveDriver->{tid_queue} = Thread::Queue->new(); 73 $SlaveDriver = bless { 74 cmd_queue => Thread::Queue->new(), 75 tid_queue => Thread::Queue->new(), 76 task_queue => Thread::Queue->new(), 77 } => $class; 66 78 $SlaveDriver->{master} = threads->create( 67 79 \&_slave_driver_loop, … … 73 85 74 86 END { 75 if (defined $SlaveDriver and defined $SlaveDriver->{master} and defined $SlaveDriver->{cmd_queue}) { 76 $SlaveDriver->{cmd_queue}->enqueue('STOP'); 77 $SlaveDriver->{master}->join(); 78 } 79 } 80 87 $SlaveDriver->cleanup(), undef $SlaveDriver if defined $SlaveDriver; 88 } 81 89 } 82 90 … … 95 103 my $task_manager = shift; 96 104 require Storable; 97 $self->{cmd_queue}->enqueue(Storable::freeze([$ main, $task_manager->task_queue]));105 $self->{cmd_queue}->enqueue(Storable::freeze([$task_manager->task_queue])); 98 106 my $tid = $self->{tid_queue}->dequeue(); 99 107 return threads->object($tid); 108 } 109 110 111 =head3 task_queue 112 113 Returns the task queue (C<Thread::Queue> object) for use by the 114 L<Padre::TaskManager> for passing processing tasks to the worker 115 threads. 116 117 This queue is instantiated by the slave driver because it needs to be available 118 early for passing to the master thread. 119 120 =cut 121 122 sub task_queue { 123 my $self = shift; 124 return $self->{task_queue}; 125 } 126 127 =head3 cleanup 128 129 Reaps the master thread. Will be called by the TaskManager on shutdown and 130 on global destruction. 131 132 =cut 133 134 sub cleanup { 135 my $self = shift; 136 if (defined $self->{master} and defined $self->{cmd_queue}) { 137 $self->{cmd_queue}->enqueue('STOP'); 138 require Time::HiRes; 139 Time::HiRes::usleep(5000); # 5 milli-sec 140 if ($self->{master}->is_joinable) { 141 $self->{master}->join; 142 } 143 } 144 # TaskManager does handle thread *killing* 145 } 146 147 sub DESTROY { 148 my $self = shift; 149 $self->cleanup(); 100 150 } 101 151 … … 103 153 # Worker thread main loop 104 154 sub _worker_loop { 105 my ( $ main, $queue ) = @_; @_ = (); # hack to avoid "Scalars leaked"155 my ( $queue ) = @_; @_ = (); # hack to avoid "Scalars leaked" 106 156 require Storable; 107 157 require Padre::TaskManager; 108 158 109 159 # Set the thread-specific main-window pointer 160 my $main = Wx::wxTheApp->GetTopWindow(); 110 161 $Padre::TaskManager::_main = $main; 111 162 … … 123 174 124 175 my $thread_start_event = 125 Wx::PlThreadEvent->new( -1, $ Padre::TaskManager::TASK_START_EVENT, $task->{__thread_id} . ";" . ref($task) );176 Wx::PlThreadEvent->new( -1, $TASK_START_EVENT, $task->{__thread_id} . ";" . ref($task) ); 126 177 Wx::PostEvent( $main, $thread_start_event ); 127 178 … … 133 184 $task->serialize( \$frozen_task ); 134 185 135 my $thread_done_event = Wx::PlThreadEvent->new( -1, $ Padre::TaskManager::TASK_DONE_EVENT, $frozen_task );186 my $thread_done_event = Wx::PlThreadEvent->new( -1, $TASK_DONE_EVENT, $frozen_task ); 136 187 Wx::PostEvent( $main, $thread_done_event ); 137 188 … … 150 201 last if $args eq 'STOP'; 151 202 152 my $worker_thread = threads->create(\&_worker_loop, @{Storable::thaw($args)}); 203 my $task_queue = Padre::SlaveDriver->new->task_queue; 204 my $worker_thread = threads->create(\&_worker_loop, $task_queue); 153 205 my $tid = $worker_thread->tid(); 154 206 $outqueue->enqueue($tid); -
trunk/Padre/lib/Padre/Task.pm
r10436 r10566 133 133 134 134 # set up the stdout/stderr printing events 135 our $STDOUT_EVENT : shared = Wx::NewEventType(); 136 our $STDERR_EVENT : shared = Wx::NewEventType(); 135 our $STDOUT_EVENT : shared; 136 BEGIN { $STDOUT_EVENT = Wx::NewEventType(); } 137 our $STDERR_EVENT : shared; 138 BEGIN { $STDERR_EVENT = Wx::NewEventType(); } 137 139 138 140 =pod … … 486 488 like this: 487 489 488 our $FUN_EVENT_TYPE = : shared = Wx::NewEventType(); 490 our $FUN_EVENT_TYPE : shared; 491 BEGIN { $FUN_EVENT_TYPE = Wx::NewEventType(); } 489 492 490 493 Then you have to setup the event handler (for example in the -
trunk/Padre/lib/Padre/TaskManager.pm
r10564 r10566 93 93 }; 94 94 95 # This event is triggered by the worker thread main loop after96 # finishing a task.97 our $TASK_DONE_EVENT : shared = Wx::NewEventType;98 99 # This event is triggered by the worker thread main loop before100 # running a task.101 our $TASK_START_EVENT : shared = Wx::NewEventType;102 103 95 # This event is triggered by a worker thread DURING ->run to incrementally 104 96 # communicate to the main thread over the life of a service. 105 our $SERVICE_POLL_EVENT : shared = Wx::NewEventType; 97 our $SERVICE_POLL_EVENT : shared; 98 BEGIN { $SERVICE_POLL_EVENT = Wx::NewEventType; } 106 99 107 100 # remember whether the event handlers were initialized... … … 129 122 @_, 130 123 workers => [], 131 task_queue => undef, 124 # grab a copy of the task_queue that's now handled by the slave driver 125 task_queue => Padre::SlaveDriver->new()->task_queue, 132 126 running_tasks => {}, 133 127 }, $class; … … 141 135 _init_events($main); 142 136 143 $self->{task_queue} = Thread::Queue->new; 137 # To be removed: Old task queue instantiation => Padre::SlaveDriver 138 #$self->{task_queue} = Thread::Queue->new; 144 139 145 140 # Set up a regular action for reaping dead workers … … 174 169 Wx::Event::EVT_COMMAND( 175 170 $main, -1, 176 $ TASK_DONE_EVENT,171 $Padre::SlaveDriver::TASK_DONE_EVENT, 177 172 \&on_task_done_event, 178 173 ); 179 174 Wx::Event::EVT_COMMAND( 180 175 $main, -1, 181 $ TASK_START_EVENT,176 $Padre::SlaveDriver::TASK_START_EVENT, 182 177 \&on_task_start_event, 183 178 ); … … 294 289 return unless $self->use_threads; 295 290 296 @_ = (); # avoid "Scalars leaked" 291 292 # To be removed: Old worker thread cration. => Padre::SlaveDriver 293 # @_ = (); # avoid "Scalars leaked" 297 294 # my $worker = threads->create( 298 295 # { 'exit' => 'thread_only' }, \&worker_loop, … … 413 410 my @workers = $self->workers; 414 411 $self->task_queue->insert( 0, ("STOP") x scalar(@workers) ); 415 while ( threads->list(threads::running) >= 1) {412 while ( threads->list(threads::running) >= 2 ) { 416 413 $_->join for threads->list(threads::joinable); 417 414 } … … 420 417 $thread->join; 421 418 } 419 420 # cleanup master thread, too 421 Padre::SlaveDriver->new->cleanup(); 422 422 423 423 # didn't work the nice way?
