Changeset 10566


Ignore:
Timestamp:
02/07/10 03:24:00 (2 years ago)
Author:
tsee
Message:

got experimental slave driving to work

Location:
trunk/Padre/lib/Padre
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/Padre/lib/Padre/SlaveDriver.pm

    r10564 r10566  
    4747use Thread::Queue 2.11; 
    4848 
     49# This event is triggered by the worker thread main loop after 
     50# finishing a task. 
     51our $TASK_DONE_EVENT : shared; 
     52BEGIN { $TASK_DONE_EVENT = Wx::NewEventType; } 
     53 
     54# This event is triggered by the worker thread main loop before 
     55# running a task. 
     56our $TASK_START_EVENT : shared; 
     57BEGIN { $TASK_START_EVENT = Wx::NewEventType; } 
     58 
    4959=head3 C<new> 
    5060 
     
    5565=cut 
    5666 
    57 { 
     67SCOPE: { 
    5868    my $SlaveDriver; 
    5969    sub new { 
    6070        my $class = shift; 
    6171        return $SlaveDriver if defined $SlaveDriver; 
    62         $SlaveDriver = bless {} => $class; 
    6372        @_ = (); 
    64         $SlaveDriver->{cmd_queue}  = Thread::Queue->new(); 
    65         $SlaveDriver->{tid_queue}  = Thread::Queue->new(); 
     73        $SlaveDriver = bless { 
     74            cmd_queue  => Thread::Queue->new(), 
     75            tid_queue  => Thread::Queue->new(), 
     76            task_queue => Thread::Queue->new(), 
     77        } => $class; 
    6678        $SlaveDriver->{master} = threads->create( 
    6779            \&_slave_driver_loop, 
     
    7385 
    7486    END { 
    75         if (defined $SlaveDriver and defined $SlaveDriver->{master} and defined $SlaveDriver->{cmd_queue}) { 
    76             $SlaveDriver->{cmd_queue}->enqueue('STOP'); 
    77             $SlaveDriver->{master}->join(); 
    78         } 
    79     } 
    80  
     87        $SlaveDriver->cleanup(), undef $SlaveDriver if defined $SlaveDriver; 
     88    } 
    8189} 
    8290 
     
    95103    my $task_manager = shift; 
    96104    require Storable; 
    97     $self->{cmd_queue}->enqueue(Storable::freeze([$main, $task_manager->task_queue])); 
     105    $self->{cmd_queue}->enqueue(Storable::freeze([$task_manager->task_queue])); 
    98106    my $tid = $self->{tid_queue}->dequeue(); 
    99107    return threads->object($tid); 
     108} 
     109 
     110 
     111=head3 task_queue 
     112 
     113Returns the task queue (C<Thread::Queue> object) for use by the 
     114L<Padre::TaskManager> for passing processing tasks to the worker 
     115threads. 
     116 
     117This queue is instantiated by the slave driver because it needs to be available 
     118early for passing to the master thread. 
     119 
     120=cut 
     121 
     122sub task_queue { 
     123    my $self = shift; 
     124    return $self->{task_queue}; 
     125} 
     126 
     127=head3 cleanup 
     128 
     129Reaps the master thread. Will be called by the TaskManager on shutdown and 
     130on global destruction. 
     131 
     132=cut 
     133 
     134sub cleanup { 
     135    my $self = shift; 
     136    if (defined $self->{master} and defined $self->{cmd_queue}) { 
     137        $self->{cmd_queue}->enqueue('STOP'); 
     138        require Time::HiRes; 
     139        Time::HiRes::usleep(5000); # 5 milli-sec 
     140        if ($self->{master}->is_joinable) { 
     141            $self->{master}->join; 
     142        } 
     143    } 
     144    # TaskManager does handle thread *killing* 
     145} 
     146 
     147sub DESTROY { 
     148    my $self = shift; 
     149    $self->cleanup(); 
    100150} 
    101151 
     
    103153# Worker thread main loop 
    104154sub _worker_loop { 
    105     my ( $main, $queue ) = @_; @_ = (); # hack to avoid "Scalars leaked" 
     155    my ( $queue ) = @_; @_ = (); # hack to avoid "Scalars leaked" 
    106156    require Storable; 
    107157    require Padre::TaskManager; 
    108158 
    109159    # Set the thread-specific main-window pointer 
     160    my $main = Wx::wxTheApp->GetTopWindow(); 
    110161    $Padre::TaskManager::_main = $main; 
    111162 
     
    123174 
    124175        my $thread_start_event = 
    125             Wx::PlThreadEvent->new( -1, $Padre::TaskManager::TASK_START_EVENT, $task->{__thread_id} . ";" . ref($task) ); 
     176            Wx::PlThreadEvent->new( -1, $TASK_START_EVENT, $task->{__thread_id} . ";" . ref($task) ); 
    126177        Wx::PostEvent( $main, $thread_start_event ); 
    127178 
     
    133184        $task->serialize( \$frozen_task ); 
    134185 
    135         my $thread_done_event = Wx::PlThreadEvent->new( -1, $Padre::TaskManager::TASK_DONE_EVENT, $frozen_task ); 
     186        my $thread_done_event = Wx::PlThreadEvent->new( -1, $TASK_DONE_EVENT, $frozen_task ); 
    136187        Wx::PostEvent( $main, $thread_done_event ); 
    137188 
     
    150201        last if $args eq 'STOP'; 
    151202         
    152         my $worker_thread = threads->create(\&_worker_loop, @{Storable::thaw($args)}); 
     203        my $task_queue = Padre::SlaveDriver->new->task_queue; 
     204        my $worker_thread = threads->create(\&_worker_loop, $task_queue); 
    153205        my $tid = $worker_thread->tid(); 
    154206        $outqueue->enqueue($tid); 
  • trunk/Padre/lib/Padre/Task.pm

    r10436 r10566  
    133133 
    134134# set up the stdout/stderr printing events 
    135 our $STDOUT_EVENT : shared = Wx::NewEventType(); 
    136 our $STDERR_EVENT : shared = Wx::NewEventType(); 
     135our $STDOUT_EVENT : shared; 
     136BEGIN { $STDOUT_EVENT = Wx::NewEventType(); } 
     137our $STDERR_EVENT : shared; 
     138BEGIN { $STDERR_EVENT = Wx::NewEventType(); } 
    137139 
    138140=pod 
     
    486488like this: 
    487489 
    488   our $FUN_EVENT_TYPE =  : shared = Wx::NewEventType(); 
     490  our $FUN_EVENT_TYPE : shared; 
     491  BEGIN { $FUN_EVENT_TYPE = Wx::NewEventType(); } 
    489492 
    490493Then you have to setup the event handler (for example in the 
  • trunk/Padre/lib/Padre/TaskManager.pm

    r10564 r10566  
    9393}; 
    9494 
    95 # This event is triggered by the worker thread main loop after 
    96 # finishing a task. 
    97 our $TASK_DONE_EVENT : shared = Wx::NewEventType; 
    98  
    99 # This event is triggered by the worker thread main loop before 
    100 # running a task. 
    101 our $TASK_START_EVENT : shared = Wx::NewEventType; 
    102  
    10395# This event is triggered by a worker thread DURING ->run to incrementally 
    10496# communicate to the main thread over the life of a service. 
    105 our $SERVICE_POLL_EVENT : shared = Wx::NewEventType; 
     97our $SERVICE_POLL_EVENT : shared; 
     98BEGIN { $SERVICE_POLL_EVENT = Wx::NewEventType; } 
    10699 
    107100# remember whether the event handlers were initialized... 
     
    129122        @_, 
    130123        workers       => [], 
    131         task_queue    => undef, 
     124                # grab a copy of the task_queue that's now handled by the slave driver 
     125        task_queue    => Padre::SlaveDriver->new()->task_queue, 
    132126        running_tasks => {}, 
    133127    }, $class; 
     
    141135    _init_events($main); 
    142136 
    143     $self->{task_queue} = Thread::Queue->new; 
     137        # To be removed: Old task queue instantiation => Padre::SlaveDriver 
     138    #$self->{task_queue} = Thread::Queue->new; 
    144139 
    145140    # Set up a regular action for reaping dead workers 
     
    174169        Wx::Event::EVT_COMMAND( 
    175170            $main, -1, 
    176             $TASK_DONE_EVENT, 
     171            $Padre::SlaveDriver::TASK_DONE_EVENT, 
    177172            \&on_task_done_event, 
    178173        ); 
    179174        Wx::Event::EVT_COMMAND( 
    180175            $main, -1, 
    181             $TASK_START_EVENT, 
     176            $Padre::SlaveDriver::TASK_START_EVENT, 
    182177            \&on_task_start_event, 
    183178        ); 
     
    294289    return unless $self->use_threads; 
    295290 
    296     @_ = (); # avoid "Scalars leaked" 
     291 
     292        # To be removed: Old worker thread cration. => Padre::SlaveDriver 
     293#   @_ = (); # avoid "Scalars leaked" 
    297294#   my $worker = threads->create( 
    298295#       { 'exit' => 'thread_only' }, \&worker_loop, 
     
    413410    my @workers = $self->workers; 
    414411    $self->task_queue->insert( 0, ("STOP") x scalar(@workers) ); 
    415     while ( threads->list(threads::running) >= 1 ) { 
     412    while ( threads->list(threads::running) >= 2 ) { 
    416413        $_->join for threads->list(threads::joinable); 
    417414    } 
     
    420417        $thread->join; 
    421418    } 
     419 
     420        # cleanup master thread, too 
     421        Padre::SlaveDriver->new->cleanup(); 
    422422 
    423423    # didn't work the nice way? 
Note: See TracChangeset for help on using the changeset viewer.