Changeset 10566

Show
Ignore:
Timestamp:
02/07/10 03:24:00 (7 months ago)
Author:
tsee
Message:

got experimental slave driving to work

Location:
trunk/Padre/lib/Padre
Files:
3 modified

Legend:

Unmodified
Added
Removed
  • trunk/Padre/lib/Padre/SlaveDriver.pm

    r10564 r10566  
    4747use Thread::Queue 2.11; 
    4848 
     49# This event is triggered by the worker thread main loop after 
     50# finishing a task. 
     51our $TASK_DONE_EVENT : shared; 
     52BEGIN { $TASK_DONE_EVENT = Wx::NewEventType; } 
     53 
     54# This event is triggered by the worker thread main loop before 
     55# running a task. 
     56our $TASK_START_EVENT : shared; 
     57BEGIN { $TASK_START_EVENT = Wx::NewEventType; } 
     58 
    4959=head3 C<new> 
    5060 
     
    5565=cut 
    5666 
    57 { 
     67SCOPE: { 
    5868        my $SlaveDriver; 
    5969        sub new { 
    6070                my $class = shift; 
    6171                return $SlaveDriver if defined $SlaveDriver; 
    62                 $SlaveDriver = bless {} => $class; 
    6372                @_ = (); 
    64                 $SlaveDriver->{cmd_queue}  = Thread::Queue->new(); 
    65                 $SlaveDriver->{tid_queue}  = Thread::Queue->new(); 
     73                $SlaveDriver = bless { 
     74                        cmd_queue  => Thread::Queue->new(), 
     75                        tid_queue  => Thread::Queue->new(), 
     76                        task_queue => Thread::Queue->new(), 
     77                } => $class; 
    6678                $SlaveDriver->{master} = threads->create( 
    6779                        \&_slave_driver_loop, 
     
    7385 
    7486        END { 
    75                 if (defined $SlaveDriver and defined $SlaveDriver->{master} and defined $SlaveDriver->{cmd_queue}) { 
    76                         $SlaveDriver->{cmd_queue}->enqueue('STOP'); 
    77                         $SlaveDriver->{master}->join(); 
    78                 } 
    79         } 
    80  
     87                $SlaveDriver->cleanup(), undef $SlaveDriver if defined $SlaveDriver; 
     88        } 
    8189} 
    8290 
     
    95103        my $task_manager = shift; 
    96104        require Storable; 
    97         $self->{cmd_queue}->enqueue(Storable::freeze([$main, $task_manager->task_queue])); 
     105        $self->{cmd_queue}->enqueue(Storable::freeze([$task_manager->task_queue])); 
    98106        my $tid = $self->{tid_queue}->dequeue(); 
    99107        return threads->object($tid); 
     108} 
     109 
     110 
     111=head3 task_queue 
     112 
     113Returns the task queue (C<Thread::Queue> object) for use by the 
     114L<Padre::TaskManager> for passing processing tasks to the worker 
     115threads. 
     116 
     117This queue is instantiated by the slave driver because it needs to be available 
     118early for passing to the master thread. 
     119 
     120=cut 
     121 
     122sub task_queue { 
     123        my $self = shift; 
     124        return $self->{task_queue}; 
     125} 
     126 
     127=head3 cleanup 
     128 
     129Reaps the master thread. Will be called by the TaskManager on shutdown and 
     130on global destruction. 
     131 
     132=cut 
     133 
     134sub cleanup { 
     135        my $self = shift; 
     136        if (defined $self->{master} and defined $self->{cmd_queue}) { 
     137                $self->{cmd_queue}->enqueue('STOP'); 
     138                require Time::HiRes; 
     139                Time::HiRes::usleep(5000); # 5 milli-sec 
     140                if ($self->{master}->is_joinable) { 
     141                        $self->{master}->join; 
     142                } 
     143        } 
     144        # TaskManager does handle thread *killing* 
     145} 
     146 
     147sub DESTROY { 
     148        my $self = shift; 
     149        $self->cleanup(); 
    100150} 
    101151 
     
    103153# Worker thread main loop 
    104154sub _worker_loop { 
    105         my ( $main, $queue ) = @_; @_ = (); # hack to avoid "Scalars leaked" 
     155        my ( $queue ) = @_; @_ = (); # hack to avoid "Scalars leaked" 
    106156        require Storable; 
    107157        require Padre::TaskManager; 
    108158 
    109159        # Set the thread-specific main-window pointer 
     160        my $main = Wx::wxTheApp->GetTopWindow(); 
    110161        $Padre::TaskManager::_main = $main; 
    111162 
     
    123174 
    124175                my $thread_start_event = 
    125                         Wx::PlThreadEvent->new( -1, $Padre::TaskManager::TASK_START_EVENT, $task->{__thread_id} . ";" . ref($task) ); 
     176                        Wx::PlThreadEvent->new( -1, $TASK_START_EVENT, $task->{__thread_id} . ";" . ref($task) ); 
    126177                Wx::PostEvent( $main, $thread_start_event ); 
    127178 
     
    133184                $task->serialize( \$frozen_task ); 
    134185 
    135                 my $thread_done_event = Wx::PlThreadEvent->new( -1, $Padre::TaskManager::TASK_DONE_EVENT, $frozen_task ); 
     186                my $thread_done_event = Wx::PlThreadEvent->new( -1, $TASK_DONE_EVENT, $frozen_task ); 
    136187                Wx::PostEvent( $main, $thread_done_event ); 
    137188 
     
    150201                last if $args eq 'STOP'; 
    151202                 
    152                 my $worker_thread = threads->create(\&_worker_loop, @{Storable::thaw($args)}); 
     203                my $task_queue = Padre::SlaveDriver->new->task_queue; 
     204                my $worker_thread = threads->create(\&_worker_loop, $task_queue); 
    153205                my $tid = $worker_thread->tid(); 
    154206                $outqueue->enqueue($tid); 
  • trunk/Padre/lib/Padre/Task.pm

    r10436 r10566  
    133133 
    134134# set up the stdout/stderr printing events 
    135 our $STDOUT_EVENT : shared = Wx::NewEventType(); 
    136 our $STDERR_EVENT : shared = Wx::NewEventType(); 
     135our $STDOUT_EVENT : shared; 
     136BEGIN { $STDOUT_EVENT = Wx::NewEventType(); } 
     137our $STDERR_EVENT : shared; 
     138BEGIN { $STDERR_EVENT = Wx::NewEventType(); } 
    137139 
    138140=pod 
     
    486488like this: 
    487489 
    488   our $FUN_EVENT_TYPE =  : shared = Wx::NewEventType(); 
     490  our $FUN_EVENT_TYPE : shared; 
     491  BEGIN { $FUN_EVENT_TYPE = Wx::NewEventType(); } 
    489492 
    490493Then you have to setup the event handler (for example in the 
  • trunk/Padre/lib/Padre/TaskManager.pm

    r10564 r10566  
    9393}; 
    9494 
    95 # This event is triggered by the worker thread main loop after 
    96 # finishing a task. 
    97 our $TASK_DONE_EVENT : shared = Wx::NewEventType; 
    98  
    99 # This event is triggered by the worker thread main loop before 
    100 # running a task. 
    101 our $TASK_START_EVENT : shared = Wx::NewEventType; 
    102  
    10395# This event is triggered by a worker thread DURING ->run to incrementally 
    10496# communicate to the main thread over the life of a service. 
    105 our $SERVICE_POLL_EVENT : shared = Wx::NewEventType; 
     97our $SERVICE_POLL_EVENT : shared; 
     98BEGIN { $SERVICE_POLL_EVENT = Wx::NewEventType; } 
    10699 
    107100# remember whether the event handlers were initialized... 
     
    129122                @_, 
    130123                workers       => [], 
    131                 task_queue    => undef, 
     124                # grab a copy of the task_queue that's now handled by the slave driver 
     125                task_queue    => Padre::SlaveDriver->new()->task_queue, 
    132126                running_tasks => {}, 
    133127        }, $class; 
     
    141135        _init_events($main); 
    142136 
    143         $self->{task_queue} = Thread::Queue->new; 
     137        # To be removed: Old task queue instantiation => Padre::SlaveDriver 
     138        #$self->{task_queue} = Thread::Queue->new; 
    144139 
    145140        # Set up a regular action for reaping dead workers 
     
    174169                Wx::Event::EVT_COMMAND( 
    175170                        $main, -1, 
    176                         $TASK_DONE_EVENT, 
     171                        $Padre::SlaveDriver::TASK_DONE_EVENT, 
    177172                        \&on_task_done_event, 
    178173                ); 
    179174                Wx::Event::EVT_COMMAND( 
    180175                        $main, -1, 
    181                         $TASK_START_EVENT, 
     176                        $Padre::SlaveDriver::TASK_START_EVENT, 
    182177                        \&on_task_start_event, 
    183178                ); 
     
    294289        return unless $self->use_threads; 
    295290 
    296         @_ = (); # avoid "Scalars leaked" 
     291 
     292        # To be removed: Old worker thread cration. => Padre::SlaveDriver 
     293#       @_ = (); # avoid "Scalars leaked" 
    297294#       my $worker = threads->create( 
    298295#               { 'exit' => 'thread_only' }, \&worker_loop, 
     
    413410        my @workers = $self->workers; 
    414411        $self->task_queue->insert( 0, ("STOP") x scalar(@workers) ); 
    415         while ( threads->list(threads::running) >= 1 ) { 
     412        while ( threads->list(threads::running) >= 2 ) { 
    416413                $_->join for threads->list(threads::joinable); 
    417414        } 
     
    420417                $thread->join; 
    421418        } 
     419 
     420        # cleanup master thread, too 
     421        Padre::SlaveDriver->new->cleanup(); 
    422422 
    423423        # didn't work the nice way?