Changeset 5824
- Timestamp:
- 07/04/09 07:51:16 (3 years ago)
- Location:
- trunk/Padre
- Files:
-
- 5 edited
- 1 copied
-
lib/Padre/Service.pm (modified) (9 diffs)
-
lib/Padre/Task.pm (modified) (2 diffs)
-
lib/Padre/TaskManager.pm (modified) (15 diffs)
-
lib/Padre/Wx/Main.pm (modified) (3 diffs)
-
t/85-task-manager.t (modified) (2 diffs)
-
t/86-service.t (copied) (copied from branches/Padre-Taskmanager/t/86-service.t)
Legend:
- Unmodified
- Added
- Removed
-
trunk/Padre/lib/Padre/Service.pm
r5750 r5824 2 2 use strict; 3 3 use warnings; 4 4 use Carp qw( croak ); 5 6 use threads; 7 use threads::shared; 8 9 use Padre::Wx (); 10 use Padre::Task (); 11 use Thread::Queue (); 5 12 our @ISA = 'Padre::Task'; 6 13 … … 11 18 =head1 NAME 12 19 13 Padre::Service - API for non trivial Padre::Task20 Padre::Service - persistent Padre::Task API 14 21 15 22 =head2 SYNOPSIS … … 17 24 # Create your service, default implementation warns to output 18 25 # sleeps 1 second and loops over. 19 my $service = Padre::Service->new( 20 main_thread_only => \&show_my_dialog, 26 my $service = Padre::Service->new(); 27 Wx::Event::EVT_COMMAND( 28 $main , -1 , $service->event , 29 \&receive_data_from_service 21 30 ); 22 31 $service->schedule; 32 $service-> 33 23 34 24 35 # Later … … 36 47 37 48 C<service_loop> should not block forever. If there is no work for the service to do 38 then return immediately, allowing the C<<Task->run>> loop to 49 then return immediately, allowing the C<<Task->run>> loop to continue. 39 50 40 51 package Padre::Service::HTTPD … … 52 63 53 64 sub terminate { # Stop everything, brutally } 54 55 sub service_results { # Returned as the task return to Padre, }56 65 57 66 =head1 METHODS … … 66 75 67 76 =cut 77 { 78 my $running = 0; 79 sub running { $running }; 80 81 sub stop { $running = 0 }; 82 sub start{ $running = 1 }; #?? 68 83 69 84 sub run { 70 my ( $self, $queue ) = @_; 71 72 my $tid = threads->tid; 73 my $event = $self->{service_event}; 74 $self->post_event( $event, "$tid;ALIVE" ); 75 my $running = 1; 76 85 croak "Already running!" if $running; 86 87 my ($self) = @_; 88 my $queue = $self->queue; 89 Padre::Util::debug( "Running queue $queue" ); 90 my $tid = threads->tid; 91 my $event = $self->event; 92 93 94 # Now we're in the worker thread, start our service 95 # and begin the select orbit around the manager's queue 96 # , the service_loop and throwing ->event back at the main thread 97 $self->start; 98 $running = 1; 99 $self->post_event( $event , "ALIVE" ); 77 100 while ($running) { 101 # Let the service provider have first chance. 102 # and if nothing is waiting in the queue - tight loop. 78 103 $self->service_loop; 79 104 next unless $queue->pending; 80 105 81 my $incoming = $queue->peek(0); 82 83 # Peek at the queue - for something addressed to us 84 # YUK - how about a dedicated queue per service 85 # peek and poke went out in the dark ages didn't they 86 if ( $incoming =~ m{$tid;} ) { 87 my $instruction = $queue->dequeue; 88 my ($command) = $instruction =~ m{^$tid;(HANGUP|TERMINATE)}; 106 my $command = $queue->dequeue; 107 Padre::Util::debug( "Service dequeued input" ); 108 109 # Respond to HANGUP TERMINATE and PING - 110 if ( ref($command) ) { 111 $self->service_loop($command); 112 } 113 114 # Or possibly a signal from the main thread 115 else { 116 Padre::Util::debug( "Caught command signal '$command'" ); 89 117 if ( $command eq 'HANGUP' ) { 90 $self->hangup; 91 $running = 0; 118 $self->hangup( \$running ); 92 119 } elsif ( $command eq 'TERMINATE' ) { 93 $self->terminate; 94 $running = 0; 120 $self->terminate( \$running ); 95 121 } elsif ( $command eq 'PING' ) { 96 $self->post_event( $event, " $tid;ALIVE" );122 $self->post_event( $event, "ALIVE" ); 97 123 } else { 98 $self->task_warn( "$self : Unrecognised command event $command");124 Padre::Util::debug("Service does not recognise '$command' signal"); 99 125 } 100 101 126 } 102 103 127 } 128 129 # Loop broken - cleanup 130 #$self->shutdown; 104 131 return; 105 132 } 133 134 } 135 136 =head2 start 137 138 consider start the background_thread analog of C<prepare> and will be called 139 in the service thread immediatly prior to the service loop starting. 140 141 142 =cut 143 106 144 107 145 =head2 hangup … … 113 151 114 152 sub hangup { 115 my ($self ) = @_;116 153 my ($self,$running) = @_; 154 $$running = 0; 117 155 } 118 156 … … 126 164 127 165 sub terminate { 128 my ($self ) = @_;129 166 my ($self,$running) = @_; 167 $$running = 0; 130 168 } 131 169 … … 139 177 140 178 { 141 my $i = 0;179 142 180 143 181 sub service_loop { 144 my ($self) = @_; 182 my ($self,$incoming) = @_; 183 $self->{iterator} = 0 184 unless exists $self->{iterator}; 145 185 my $tid = threads->tid; 146 $self->task_print("Service ($tid) Looped [$i]\n"); 147 $i++; 186 $self->task_print('ok - entered service loop') 187 || print "ok - entered service loop\n"; 188 189 $self->task_print("# Service ($tid) Looped $self->{iterator}\n"); 190 if (defined $incoming) { 191 $self->task_print("ok - got incoming service data '$incoming'"); 192 } 193 # Tell the main thread some progress. 194 $self->post_event( $self->event , "$self->{iterator}" ); 195 196 $self->{iterator}++; 197 $self->tell('HANGUP') if $self->{iterator} > 10; 148 198 sleep 1; 149 199 } 150 200 } 201 202 =head2 event 203 204 Accessor for this service's instance event, in the running service 205 data may be posted to this event and the Wx subscribers will be notified 206 207 =cut 208 209 { 210 our %ServiceEvents : shared = (); 211 sub event { 212 my $self = shift; 213 if ( exists $ServiceEvents{$self->{__service_refid}} ) { 214 return $ServiceEvents{ $self->{__service_refid} } ; 215 } 216 else { 217 croak "Cannot lookup shared event for $self"; 218 } 219 } 220 221 222 my %Queues : shared; 223 sub prepare { 224 my $self = shift; 225 my $queue : shared; 226 $queue = new Thread::Queue; 227 $Queues{"$self"} = $queue; 228 $self->{_refid} = "$self"; 229 $self->SUPER::prepare(@_); 230 } 231 232 =head2 queue 233 234 accessor for the shared queue the service thread is polling for input. 235 Calling C<enqueue> on reference sends data to the service thread. Storable 236 serialization rules apply. See also L<"event"> for receiving data from 237 the service thread 238 239 =cut 240 241 sub queue { 242 my $self = shift; 243 if ( exists $self->{_refid} 244 && exists $Queues{$self->{_refid}} ) { 245 return $Queues{$self->{_refid}} ; 246 } 247 elsif ( exists $Queues{"$self"} ) { 248 return $Queues{"$self"}; 249 } 250 else { croak "No such service queue "; } 251 252 } 253 254 255 sub serialize { 256 my $self = shift; 257 # croak "Serialized!!"; 258 my $service_refid = "$self"; 259 $self->{__service_refid} = $service_refid; 260 261 # Wait until the last moment before we declare 262 # the event 263 my $service_event : shared = Wx::NewEventType; 264 $ServiceEvents{$service_refid} = $service_event; 265 266 # my $wx_attach; 267 # if ( exists $self->{_main_thread_only} 268 # && 269 # _INSTANCE( $self->{_main_thread_only}, 'Wx::Object' ) 270 # ) 271 # { 272 # $wx_attach = $self->{_main_thread_only}; 273 # } 274 # else { $wx_attach = Padre->ide->wx->main }; 275 276 277 # if (!exists $self->{__events_init} 278 # and !defined $self->{__events_init} ) 279 # { 280 # $self->{__events_init} = 281 # Wx::Event::EVT_COMMAND( 282 # $wx_attach, -1, 283 # $service_event, 284 # sub{ $self->receive(@_) } , 285 # ); 286 # } 287 288 289 # FILO 290 my $payload = $self->SUPER::serialize(@_); 291 292 return $payload; 293 } 294 295 sub deserialize_hook { 296 my $self = shift; 297 # FILO 298 # Shutdown the queue and event ?; 299 } 300 301 } 302 303 sub shutdown { 304 my $self = shift; 305 Padre::Util::debug( "shutdown - $self"); 306 my $queue =$self->queue; 307 $queue->enqueue( 'HANGUP' ); 308 } 309 310 311 sub cleanup { 312 my $self = shift; 313 Padre::Util::debug( "cleanup - $self" ); 314 } 315 316 =head2 tell 317 318 Accepts a reference as it's argument, this is serialized and sent to 319 the service thread 320 321 =cut 322 323 ## MAIN 324 sub tell { 325 my ($self,$ref) = @_; 326 my $queue = $self->queue; 327 $queue->enqueue($ref); 328 } 329 151 330 152 331 =head1 COPYRIGHT -
trunk/Padre/lib/Padre/Task.pm
r5750 r5824 326 326 327 327 my $obj = bless $padretask => $userclass; 328 # Xtra evil , let a subclass ducktype a hook here 329 $obj->deserialize_hook if $obj->can('deserialize_hook'); 330 328 331 return $obj; 329 332 } … … 506 509 =cut 507 510 511 512 use Carp qw( cluck ); 513 508 514 sub post_event { 509 my @stuff= @_;515 my ($self,$eventid,$data) = @_; 510 516 @_ = (); 517 cluck 'eventid is not defined' unless defined $eventid; 518 cluck "eventid[$eventid] , no data to post" 519 unless ( defined $data and length($data) ); 520 511 521 Wx::PostEvent( 512 522 $Padre::TaskManager::_main, 513 Wx::PlThreadEvent->new( -1, $ stuff[1], $stuff[2]),523 Wx::PlThreadEvent->new( -1, $eventid, $data ), 514 524 ); 515 525 return (); -
trunk/Padre/lib/Padre/TaskManager.pm
r5750 r5824 85 85 use Class::XSAccessor getters => { 86 86 task_queue => 'task_queue', 87 service_queue => 'service_queue',88 87 reap_interval => 'reap_interval', 89 88 use_threads => 'use_threads', … … 112 111 our $SINGLETON; 113 112 114 # This is set in the worker threads only! 113 # This is set in the worker threads only! 115 114 our $_main; 116 115 … … 121 120 122 121 my $self = $SINGLETON = bless { 123 min_no_workers => 2, 124 max_no_workers => 6, 122 min_no_workers => 2, # there were config settings for 123 max_no_workers => 6, # these long ago? 125 124 use_threads => 1, # can be explicitly disabled 126 125 reap_interval => 15000, … … 140 139 141 140 $self->{task_queue} = Thread::Queue->new; 142 $self->{service_queue} = Thread::Queue->new; 141 143 142 144 143 # Set up a regular action for reaping dead workers … … 208 207 or die "Invalid task scheduled!"; # TODO: grace 209 208 209 if ( _INSTANCE( $task , 'Padre::Service' ) ) { 210 $self->{running_services}{$task} = $task; 211 } 210 212 # Cleanup old threads and refill the pool 211 213 $self->reap(); … … 217 219 } 218 220 221 219 222 my $string; 220 223 $task->serialize( \$string ); 224 221 225 if ( $self->use_threads ) { 222 226 require Time::HiRes; … … 289 293 290 294 @_ = (); # avoid "Scalars leaked" 291 my $worker = threads->create( { 'exit' => 'thread_only' }, \&worker_loop, $main, $self->task_queue ); 295 my $worker = threads->create( 296 { 'exit' => 'thread_only' }, \&worker_loop, 297 $main, $self->task_queue 298 ); 292 299 push @{ $self->{workers} }, $worker; 293 300 } … … 351 358 unless $queue->pending() 352 359 and not ref( $queue->peek(0) ); 353 354 # We don't actually need to wait for the soon-to-be-joinable threads355 # since reap should be called regularly.356 #while (threads->list(threads::running) >= $target_n_threads) {357 # $_->join for threads->list(threads::joinable);358 #}359 360 } 360 361 … … 389 390 =head2 cleanup 390 391 391 Stops all worker threads. Called on editor shutdown. 392 Shutdown all services with a HANGUP, then stop all worker threads. 393 Called on editor shutdown. 392 394 393 395 =cut … … 396 398 my $self = shift; 397 399 return if not $self->use_threads; 398 400 401 # Send all services a HANGUP , they will (hopefully) 402 # catch this and break the run loop, returning below as 403 # regular tasks. :| 404 Padre::Util::debug( 'Tell services to hangup' ); 405 $self->shutdown_services; 406 399 407 # the nice way: 408 Padre::Util::debug( 'Tell all tasks to stop' ); 400 409 my @workers = $self->workers; 401 410 $self->task_queue->insert( 0, ("STOP") x scalar(@workers) ); … … 403 412 $_->join for threads->list(threads::joinable); 404 413 } 405 $_->join for threads->list(threads::joinable); 406 414 foreach my $thread ( threads->list(threads::joinable) ) { 415 Padre::Util::debug( 'Joining thread ' . $thread->tid ); 416 $thread->join; 417 } 418 407 419 # didn't work the nice way? 408 420 while ( threads->list(threads::running) >= 1 ) { 421 Padre::Util::debug( 'Killing thread ' . $_->tid ); 409 422 $_->detach(), $_->kill() for threads->list(threads::running); 410 423 } … … 452 465 =pod 453 466 454 =head2 shutdown 455 456 Gracefully shutdown the service queueby instructing them to hangup themselves467 =head2 shutdown_services 468 469 Gracefully shutdown the services by instructing them to hangup themselves 457 470 and return via the usual Task mechanism. 458 471 459 472 =cut 460 473 461 sub shutdown { 462 my $self = shift; 463 464 while ( my ( $type, $tasks ) = each %{ $self->{running_tasks} } ) { 465 next unless Params::Util::_CLASSISA( $type, 'Padre::Service' ); 466 foreach my $threadid ( keys %$tasks ) { 467 Padre::Util::debug("Hangup $type in $threadid !"); 468 $SINGLETON->service_queue->enqueue( 469 "$threadid;HANGUP", 470 ); 471 } 472 473 } 474 474 ## ERM FIXME where are is the {running_services} populated then eh? 475 sub shutdown_services { 476 my $self = shift; 477 Padre::Util::debug( 'Shutdown services' ); 478 479 while ( my ($sid,$service) = each %{ $self->{running_services} } ) { 480 Padre::Util::debug( "Hangup service $sid!" ); 481 $service->shutdown; 482 } 475 483 } 476 484 … … 505 513 my ( $main, $event ) = @_; @_ = (); # hack to avoid "Scalars leaked" 506 514 my $frozen = $event->GetData; 515 516 # FIXME - can we know the _real_ class so the an extender 517 # may hook de/serialize 507 518 my $task = Padre::Task->deserialize( \$frozen ); 508 519 … … 636 647 637 648 # RUN 638 if ( $task->isa('Padre::Service') ) { 639 $task->run( $SINGLETON->service_queue ); 640 } else { 641 $task->run; 642 } 649 $task->run; 643 650 644 651 # FREEZE THE PROCESS AND PASS IT BACK … … 667 674 Wx MainLoop isn't reached for processing finish events. 668 675 676 Polling services 'aliveness' in a useful way , something a Wx::Taskmanager 677 might like to display. Ability to selectivly kill tasks/services 678 669 679 =head1 SEE ALSO 670 680 -
trunk/Padre/lib/Padre/Wx/Main.pm
r5807 r5824 2132 2132 Padre::Util::debug("Files saved (or not), hiding window"); 2133 2133 2134 2134 2135 # Immediately hide the window so that the user 2135 2136 # perceives the application as closing faster. … … 2138 2139 $self->Show(0); 2139 2140 2140 Padre::Util::debug('Try to shutdown services'); 2141 $self->ide->task_manager->shutdown; 2142 2143 # Stop all Task Manager's worker threads 2144 $self->ide->task_manager->cleanup; 2145 2146 Padre::Util::debug("Finished TaskManager's cleanup"); 2141 2142 2143 2147 2144 2148 2145 # Save the window geometry … … 2180 2177 $ide->save_config; 2181 2178 $event->Skip; 2179 2180 Padre::Util::debug("Tell TaskManager to cleanup"); 2181 # Stop all Task Manager's worker threads 2182 $self->ide->task_manager->cleanup; 2182 2183 2183 2184 Padre::Util::debug("Closing Padre"); -
trunk/Padre/t/85-task-manager.t
r4985 r5824 17 17 } 18 18 19 plan( tests => 1 7);19 plan( tests => 18 ); 20 20 21 21 # need to load these before padre! … … 30 30 use_ok('Padre::TaskManager'); 31 31 use_ok('Padre::Task'); 32 use_ok('Padre::Service'); 32 33 require t::lib::Padre::Task::Test; 33 34
Note: See TracChangeset
for help on using the changeset viewer.
