Changeset 5824


Ignore:
Timestamp:
07/04/09 07:51:16 (3 years ago)
Author:
submersible_toaster
Message:

merge branches/Padre-Taskmanager , introducing Padre::Service for persistent Padre::Tasks

Location:
trunk/Padre
Files:
5 edited
1 copied

Legend:

Unmodified
Added
Removed
  • trunk/Padre/lib/Padre/Service.pm

    r5750 r5824  
    22use strict; 
    33use warnings; 
    4  
     4use Carp qw( croak ); 
     5 
     6use threads; 
     7use threads::shared; 
     8 
     9use Padre::Wx     (); 
     10use Padre::Task   (); 
     11use Thread::Queue (); 
    512our @ISA = 'Padre::Task'; 
    613 
     
    1118=head1 NAME 
    1219 
    13 Padre::Service - API for non trivial Padre::Task 
     20Padre::Service - persistent Padre::Task API 
    1421 
    1522=head2 SYNOPSIS 
     
    1724  # Create your service, default implementation warns to output 
    1825  #  sleeps 1 second and loops over. 
    19   my $service = Padre::Service->new(  
    20     main_thread_only => \&show_my_dialog, 
     26  my $service = Padre::Service->new(); 
     27  Wx::Event::EVT_COMMAND( 
     28    $main , -1 , $service->event , 
     29    \&receive_data_from_service 
    2130  ); 
    2231  $service->schedule; 
     32  $service-> 
     33   
    2334   
    2435  # Later 
     
    3647 
    3748C<service_loop> should not block forever. If there is no work for the service to do 
    38 then return immediately, allowing the C<<Task->run>> loop to 
     49then return immediately, allowing the C<<Task->run>> loop to continue. 
    3950 
    4051  package Padre::Service::HTTPD 
     
    5263   
    5364  sub terminate { # Stop everything, brutally } 
    54    
    55   sub service_results { # Returned as the task return to Padre, } 
    5665   
    5766=head1 METHODS 
     
    6675 
    6776=cut 
     77{ 
     78my $running = 0; 
     79sub running { $running }; 
     80 
     81sub stop { $running = 0 }; 
     82sub start{ $running = 1 }; #?? 
    6883 
    6984sub run { 
    70     my ( $self, $queue ) = @_; 
    71  
    72     my $tid   = threads->tid; 
    73     my $event = $self->{service_event}; 
    74     $self->post_event( $event, "$tid;ALIVE" ); 
    75     my $running = 1; 
    76  
     85    croak "Already running!" if $running; 
     86     
     87    my ($self) = @_; 
     88    my $queue = $self->queue; 
     89    Padre::Util::debug( "Running queue $queue" ); 
     90    my $tid = threads->tid; 
     91    my $event  = $self->event; 
     92     
     93     
     94    # Now we're in the worker thread, start our service 
     95    # and begin the select orbit around the manager's queue 
     96    #  , the service_loop and throwing ->event back at the main thread 
     97    $self->start; 
     98    $running = 1; 
     99    $self->post_event(  $event , "ALIVE" ); 
    77100    while ($running) { 
     101        # Let the service provider have first chance. 
     102        #   and if nothing is waiting in the queue - tight loop. 
    78103        $self->service_loop; 
    79104        next unless $queue->pending; 
    80105 
    81         my $incoming = $queue->peek(0); 
    82  
    83         # Peek at the queue - for something addressed to us 
    84         # YUK - how about a dedicated queue per service 
    85         # peek and poke went out in the dark ages didn't they 
    86         if ( $incoming =~ m{$tid;} ) { 
    87             my $instruction = $queue->dequeue; 
    88             my ($command) = $instruction =~ m{^$tid;(HANGUP|TERMINATE)}; 
     106        my $command = $queue->dequeue; 
     107        Padre::Util::debug( "Service dequeued input" ); 
     108         
     109        # Respond to HANGUP TERMINATE and PING -  
     110        if ( ref($command) ) { 
     111            $self->service_loop($command); 
     112        } 
     113         
     114        # Or possibly a signal from the main thread 
     115        else { 
     116            Padre::Util::debug( "Caught command signal '$command'" ); 
    89117            if ( $command eq 'HANGUP' ) { 
    90                 $self->hangup; 
    91                 $running = 0; 
     118                $self->hangup( \$running ); 
    92119            } elsif ( $command eq 'TERMINATE' ) { 
    93                 $self->terminate; 
    94                 $running = 0; 
     120                $self->terminate( \$running ); 
    95121            } elsif ( $command eq 'PING' ) { 
    96                 $self->post_event( $event, "$tid;ALIVE" ); 
     122                $self->post_event( $event, "ALIVE" ); 
    97123            } else { 
    98                 $self->task_warn( "$self : Unrecognised command event $command" ); 
     124                Padre::Util::debug("Service does not recognise '$command' signal"); 
    99125            } 
    100  
    101126        } 
    102  
    103127    } 
     128     
     129    # Loop broken - cleanup 
     130    #$self->shutdown; 
    104131    return; 
    105132} 
     133 
     134} 
     135 
     136=head2 start 
     137 
     138consider start the background_thread analog of C<prepare> and will be called 
     139in the service thread immediatly prior to the service loop starting. 
     140 
     141 
     142=cut 
     143 
    106144 
    107145=head2 hangup 
     
    113151 
    114152sub hangup { 
    115     my ($self) = @_; 
    116  
     153    my ($self,$running) = @_; 
     154    $$running = 0; 
    117155} 
    118156 
     
    126164 
    127165sub terminate { 
    128     my ($self) = @_; 
    129  
     166    my ($self,$running) = @_; 
     167    $$running = 0; 
    130168} 
    131169 
     
    139177 
    140178{ 
    141     my $i = 0; 
     179     
    142180 
    143181    sub service_loop { 
    144         my ($self) = @_; 
     182        my ($self,$incoming) = @_; 
     183        $self->{iterator} = 0 
     184            unless exists $self->{iterator}; 
    145185        my $tid = threads->tid; 
    146         $self->task_print("Service ($tid) Looped [$i]\n"); 
    147         $i++; 
     186        $self->task_print('ok - entered service loop') 
     187            || print "ok - entered service loop\n"; 
     188         
     189        $self->task_print("# Service ($tid) Looped $self->{iterator}\n"); 
     190        if (defined $incoming) { 
     191            $self->task_print("ok - got incoming service data '$incoming'"); 
     192        } 
     193        # Tell the main thread some progress. 
     194        $self->post_event( $self->event , "$self->{iterator}" ); 
     195         
     196        $self->{iterator}++; 
     197        $self->tell('HANGUP') if $self->{iterator} > 10; 
    148198        sleep 1; 
    149199    } 
    150200} 
     201 
     202=head2 event 
     203 
     204Accessor for this service's instance event, in the running service 
     205data may be posted to this event and the Wx subscribers will be notified 
     206  
     207=cut 
     208 
     209{ 
     210our %ServiceEvents : shared = (); 
     211  sub event { 
     212    my $self = shift; 
     213    if ( exists $ServiceEvents{$self->{__service_refid}} ) { 
     214        return $ServiceEvents{ $self->{__service_refid} } ; 
     215    } 
     216    else { 
     217        croak "Cannot lookup shared event for $self"; 
     218    } 
     219  } 
     220  
     221 
     222my %Queues : shared; 
     223 sub prepare { 
     224    my $self = shift; 
     225    my $queue : shared; 
     226    $queue = new Thread::Queue; 
     227    $Queues{"$self"} = $queue; 
     228    $self->{_refid} = "$self"; 
     229    $self->SUPER::prepare(@_); 
     230  } 
     231  
     232=head2 queue 
     233 
     234accessor for the shared queue the service thread is polling for input. 
     235Calling C<enqueue> on reference sends data to the service thread. Storable 
     236serialization rules apply. See also L<"event"> for receiving data from  
     237the service thread 
     238  
     239=cut 
     240  
     241 sub queue {  
     242    my $self = shift; 
     243    if ( exists $self->{_refid}  
     244        && exists $Queues{$self->{_refid}} ) { 
     245        return $Queues{$self->{_refid}} ; 
     246    } 
     247    elsif  ( exists $Queues{"$self"} ) { 
     248        return $Queues{"$self"}; 
     249    } 
     250    else { croak "No such service queue "; } 
     251  
     252 } 
     253  
     254 
     255  sub serialize { 
     256    my $self = shift; 
     257  # croak "Serialized!!"; 
     258    my $service_refid = "$self"; 
     259    $self->{__service_refid} = $service_refid; 
     260     
     261    # Wait until the last moment before we declare  
     262    # the event 
     263    my $service_event : shared = Wx::NewEventType; 
     264    $ServiceEvents{$service_refid} = $service_event; 
     265     
     266#   my $wx_attach; 
     267#   if ( exists $self->{_main_thread_only} 
     268#        &&  
     269#        _INSTANCE( $self->{_main_thread_only}, 'Wx::Object' ) 
     270#       ) 
     271#   { 
     272#       $wx_attach = $self->{_main_thread_only}; 
     273#   } 
     274#   else {  $wx_attach = Padre->ide->wx->main }; 
     275 
     276 
     277#   if (!exists $self->{__events_init} 
     278#       and !defined $self->{__events_init} )  
     279#   { 
     280#       $self->{__events_init} = 
     281#           Wx::Event::EVT_COMMAND( 
     282#           $wx_attach, -1, 
     283#           $service_event, 
     284#           sub{ $self->receive(@_) } , 
     285#       ); 
     286#   } 
     287 
     288 
     289    # FILO 
     290    my $payload = $self->SUPER::serialize(@_); 
     291     
     292    return $payload; 
     293  } 
     294   
     295  sub deserialize_hook { 
     296    my $self = shift; 
     297    # FILO 
     298    # Shutdown the queue and event ?; 
     299  } 
     300 
     301} 
     302 
     303  sub shutdown { 
     304    my $self = shift; 
     305    Padre::Util::debug( "shutdown - $self"); 
     306    my $queue =$self->queue; 
     307    $queue->enqueue( 'HANGUP' ); 
     308  } 
     309   
     310   
     311  sub cleanup { 
     312    my $self = shift; 
     313    Padre::Util::debug( "cleanup - $self" ); 
     314  } 
     315 
     316=head2 tell 
     317 
     318Accepts a reference as it's argument, this is serialized and sent to 
     319the service thread 
     320 
     321=cut   
     322   
     323  ## MAIN 
     324  sub tell { 
     325    my ($self,$ref) = @_; 
     326    my $queue = $self->queue; 
     327    $queue->enqueue($ref); 
     328  } 
     329 
    151330 
    152331=head1 COPYRIGHT 
  • trunk/Padre/lib/Padre/Task.pm

    r5750 r5824  
    326326 
    327327        my $obj = bless $padretask => $userclass; 
     328        # Xtra evil , let a subclass ducktype a hook here 
     329        $obj->deserialize_hook if $obj->can('deserialize_hook'); 
     330         
    328331        return $obj; 
    329332    } 
     
    506509=cut 
    507510 
     511 
     512use Carp qw( cluck ); 
     513 
    508514sub post_event { 
    509     my @stuff = @_; 
     515    my ($self,$eventid,$data) = @_; 
    510516    @_ = (); 
     517    cluck 'eventid is not defined' unless defined $eventid; 
     518    cluck "eventid[$eventid] , no data to post"  
     519        unless ( defined $data and length($data) ); 
     520         
    511521    Wx::PostEvent( 
    512522        $Padre::TaskManager::_main, 
    513         Wx::PlThreadEvent->new( -1, $stuff[1], $stuff[2] ), 
     523        Wx::PlThreadEvent->new( -1, $eventid, $data ), 
    514524    ); 
    515525    return (); 
  • trunk/Padre/lib/Padre/TaskManager.pm

    r5750 r5824  
    8585use Class::XSAccessor getters => { 
    8686    task_queue     => 'task_queue', 
    87     service_queue  => 'service_queue', 
    8887    reap_interval  => 'reap_interval', 
    8988    use_threads    => 'use_threads', 
     
    112111our $SINGLETON; 
    113112 
    114 # This is set in the worker threads only! 
     113# This is set in the worker threads only!  
    115114our $_main; 
    116115 
     
    121120 
    122121    my $self = $SINGLETON = bless { 
    123         min_no_workers => 2, 
    124         max_no_workers => 6, 
     122        min_no_workers => 2, # there were config settings for  
     123        max_no_workers => 6, #  these long ago? 
    125124        use_threads    => 1,       # can be explicitly disabled 
    126125        reap_interval  => 15000, 
     
    140139 
    141140    $self->{task_queue}    = Thread::Queue->new; 
    142     $self->{service_queue} = Thread::Queue->new; 
     141 
    143142 
    144143    # Set up a regular action for reaping dead workers 
     
    208207        or die "Invalid task scheduled!";    # TODO: grace 
    209208 
     209    if ( _INSTANCE( $task , 'Padre::Service' ) ) { 
     210        $self->{running_services}{$task} = $task; 
     211    } 
    210212    # Cleanup old threads and refill the pool 
    211213    $self->reap(); 
     
    217219    } 
    218220 
     221 
    219222    my $string; 
    220223    $task->serialize( \$string ); 
     224     
    221225    if ( $self->use_threads ) { 
    222226        require Time::HiRes; 
     
    289293 
    290294    @_ = ();    # avoid "Scalars leaked" 
    291     my $worker = threads->create( { 'exit' => 'thread_only' }, \&worker_loop, $main, $self->task_queue ); 
     295    my $worker = threads->create(  
     296        { 'exit' => 'thread_only' }, \&worker_loop, 
     297         $main, $self->task_queue  
     298    ); 
    292299    push @{ $self->{workers} }, $worker; 
    293300} 
     
    351358            unless $queue->pending() 
    352359                and not ref( $queue->peek(0) ); 
    353  
    354         # We don't actually need to wait for the soon-to-be-joinable threads 
    355         # since reap should be called regularly. 
    356         #while (threads->list(threads::running) >= $target_n_threads) { 
    357         #  $_->join for threads->list(threads::joinable); 
    358         #} 
    359360    } 
    360361 
     
    389390=head2 cleanup 
    390391 
    391 Stops all worker threads. Called on editor shutdown. 
     392Shutdown all services with a HANGUP, then stop all worker threads. 
     393Called on editor shutdown. 
    392394 
    393395=cut 
     
    396398    my $self = shift; 
    397399    return if not $self->use_threads; 
    398  
     400     
     401    # Send all services a HANGUP , they will (hopefully) 
     402    # catch this and break the run loop, returning below as 
     403    # regular tasks. :| 
     404    Padre::Util::debug( 'Tell services to hangup' ); 
     405    $self->shutdown_services; 
     406     
    399407    # the nice way: 
     408    Padre::Util::debug( 'Tell all tasks to stop' ); 
    400409    my @workers = $self->workers; 
    401410    $self->task_queue->insert( 0, ("STOP") x scalar(@workers) ); 
     
    403412        $_->join for threads->list(threads::joinable); 
    404413    } 
    405     $_->join for threads->list(threads::joinable); 
    406  
     414    foreach my $thread ( threads->list(threads::joinable) ) { 
     415        Padre::Util::debug( 'Joining thread ' . $thread->tid ); 
     416        $thread->join; 
     417    } 
     418     
    407419    # didn't work the nice way? 
    408420    while ( threads->list(threads::running) >= 1 ) { 
     421        Padre::Util::debug( 'Killing thread ' . $_->tid ); 
    409422        $_->detach(), $_->kill() for threads->list(threads::running); 
    410423    } 
     
    452465=pod 
    453466 
    454 =head2 shutdown 
    455  
    456 Gracefully shutdown the service queue by instructing them to hangup themselves 
     467=head2 shutdown_services 
     468 
     469Gracefully shutdown the services by instructing them to hangup themselves 
    457470and return via the usual Task mechanism. 
    458471 
    459472=cut 
    460473 
    461 sub shutdown { 
    462     my $self = shift; 
    463  
    464     while ( my ( $type, $tasks ) = each %{ $self->{running_tasks} } ) { 
    465         next unless Params::Util::_CLASSISA( $type, 'Padre::Service' ); 
    466         foreach my $threadid ( keys %$tasks ) { 
    467             Padre::Util::debug("Hangup $type in $threadid !"); 
    468             $SINGLETON->service_queue->enqueue( 
    469                 "$threadid;HANGUP", 
    470             ); 
    471         } 
    472  
    473     } 
    474  
     474## ERM FIXME where are is the {running_services} populated then eh? 
     475sub shutdown_services { 
     476    my $self = shift; 
     477    Padre::Util::debug( 'Shutdown services' ); 
     478     
     479    while ( my  ($sid,$service) = each %{ $self->{running_services} } ) { 
     480        Padre::Util::debug( "Hangup service $sid!" ); 
     481        $service->shutdown; 
     482    } 
    475483} 
    476484 
     
    505513    my ( $main, $event ) = @_; @_ = ();    # hack to avoid "Scalars leaked" 
    506514    my $frozen = $event->GetData; 
     515     
     516    # FIXME - can we know the _real_ class so the an extender  
     517    #  may hook de/serialize 
    507518    my $task   = Padre::Task->deserialize( \$frozen ); 
    508519 
     
    636647 
    637648        # RUN 
    638         if ( $task->isa('Padre::Service') ) { 
    639             $task->run( $SINGLETON->service_queue ); 
    640         } else { 
    641             $task->run; 
    642         } 
     649        $task->run; 
    643650 
    644651        # FREEZE THE PROCESS AND PASS IT BACK 
     
    667674Wx MainLoop isn't reached for processing finish events. 
    668675 
     676Polling services 'aliveness' in a useful way , something a Wx::Taskmanager 
     677might like to display. Ability to selectivly kill tasks/services 
     678 
    669679=head1 SEE ALSO 
    670680 
  • trunk/Padre/lib/Padre/Wx/Main.pm

    r5807 r5824  
    21322132    Padre::Util::debug("Files saved (or not), hiding window"); 
    21332133 
     2134 
    21342135    # Immediately hide the window so that the user 
    21352136    # perceives the application as closing faster. 
     
    21382139    $self->Show(0); 
    21392140 
    2140     Padre::Util::debug('Try to shutdown services'); 
    2141     $self->ide->task_manager->shutdown; 
    2142  
    2143     # Stop all Task Manager's worker threads 
    2144     $self->ide->task_manager->cleanup; 
    2145  
    2146     Padre::Util::debug("Finished TaskManager's cleanup"); 
     2141 
     2142 
     2143 
    21472144 
    21482145    # Save the window geometry 
     
    21802177    $ide->save_config; 
    21812178    $event->Skip; 
     2179 
     2180    Padre::Util::debug("Tell TaskManager to cleanup"); 
     2181    # Stop all Task Manager's worker threads 
     2182    $self->ide->task_manager->cleanup; 
    21822183 
    21832184    Padre::Util::debug("Closing Padre"); 
  • trunk/Padre/t/85-task-manager.t

    r4985 r5824  
    1717} 
    1818 
    19 plan( tests => 17 ); 
     19plan( tests => 18 ); 
    2020 
    2121# need to load these before padre! 
     
    3030use_ok('Padre::TaskManager'); 
    3131use_ok('Padre::Task'); 
     32use_ok('Padre::Service'); 
    3233require t::lib::Padre::Task::Test; 
    3334 
Note: See TracChangeset for help on using the changeset viewer.