root/trunk/Padre/lib/Padre/TaskManager.pm @ 11102

Revision 11102, 16.5 KB (checked in by adamk, 6 months ago)

Removing an import

Line 
1package Padre::TaskManager;
2
3=pod
4
5=head1 NAME
6
7Padre::TaskManager - Padre Background Task Scheduler
8
9=head1 SYNOPSIS
10
11  require Padre::Task::Foo;
12  my $task = Padre::Task::Foo->new(some => 'data');
13  $task->schedule; # handed off to the task manager
14
15=head1 DESCRIPTION
16
17Padre uses threads for asynchronous background operations
18which may take so long that they would make the GUI unresponsive
19if run in the main (GUI) thread.
20
21This class implements a pool of a configurable number of
22re-usable worker threads. Re-using threads is necessary as
23the overhead of spawning threads is high. Additional threads
24are spawned if many background tasks are scheduled for execution.
25When the load goes down, the number of extra threads is (slowly!)
26reduced down to the default.
27
28=head1 INTERFACE
29
30=head2 Class Methods
31
32=head3 C<new>
33
34The constructor returns a C<Padre::TaskManager> object.
35At the moment, C<Padre::TaskManager> is a singleton.
36An object is instantiated when the editor object is created.
37
38Optional parameters:
39
40=over 2
41
42=item min_no_workers / max_no_workers
43
44Set the minimum and maximum number of worker threads
45to spawn. Default: 1 to 3
46
47The first workers are spawned lazily: I.e. only when
48the first task is being scheduled.
49
50=item use_threads
51
52Disable for profiling runs. In the degraded, thread-less mode,
53all tasks are run in the main thread. Default: 1 (use threads)
54
55=item reap_interval
56
57The number of milliseconds to wait before checking for dead
58worker threads. Default: 15000ms
59
60=back
61
62=cut
63
64use 5.008;
65use strict;
66use warnings;
67use Params::Util ();
68
69our $VERSION = '0.58';
70
71# According to Wx docs,
72# this MUST be loaded before Wx,
73# so this also happens in the script.
74use threads;
75use threads::shared;
76use Thread::Queue 2.11;
77use Time::HiRes qw(gettimeofday tv_interval);
78
79require Padre;
80use Padre::Task    ();
81use Padre::Service ();
82use Padre::Wx      ();
83use Padre::Logger;
84require Padre::SlaveDriver;
85
86use Class::XSAccessor {
87        getters => {
88                task_queue     => 'task_queue',
89                reap_interval  => 'reap_interval',
90                use_threads    => 'use_threads',
91                max_no_workers => 'max_no_workers',
92        }
93};
94
95# This event is triggered by a worker thread DURING ->run to incrementally
96# communicate to the main thread over the life of a service.
97our $SERVICE_POLL_EVENT : shared;
98
99BEGIN {
100        $SERVICE_POLL_EVENT = Wx::NewEventType;
101}
102
103# remember whether the event handlers were initialized...
104our $EVENTS_INITIALIZED = 0;
105
106# Timer to reap dead workers every N milliseconds
107our $REAP_TIMER;
108
109# You can instantiate this class only once.
110our $SINGLETON;
111
112sub new {
113        my $class = shift;
114
115        return $SINGLETON if defined $SINGLETON;
116
117        my $driver = Padre::SlaveDriver->new;
118
119        my $self = $SINGLETON = bless {
120                min_no_workers => 2,    # there were config settings for
121                max_no_workers => 6,    # these long ago?
122                use_threads    => 1,    # can be explicitly disabled
123                reap_interval  => 15000,
124                @_,
125                workers => [],
126
127                # Grab a copy of the task_queue that's now handled by the slave driver
128                task_queue    => $driver->task_queue,
129                running_tasks => {},
130        }, $class;
131
132        # Special case for profiling mode
133        if ( defined( $INC{"Devel/NYTProf.pm"} ) ) {
134                $self->{use_threads} = 0;
135        }
136
137        my $main = Padre->ide->wx;
138        _init_events($main);
139
140        # To be removed: Old task queue instantiation => Padre::SlaveDriver
141        #$self->{task_queue} = Thread::Queue->new;
142
143        # Set up a regular action for reaping dead workers
144        # and setting up new workers
145        if ( not defined $REAP_TIMER and $self->use_threads ) {
146
147                # explicit id necessary to distinguish from start-up timer of the main window
148                my $timerid = Wx::NewId();
149                $REAP_TIMER = Wx::Timer->new( $main, $timerid );
150                Wx::Event::EVT_TIMER(
151                        $main, $timerid,
152                        sub {
153                                $SINGLETON->reap;
154                        },
155                );
156                $REAP_TIMER->Start(
157                        $self->reap_interval,
158                        Wx::wxTIMER_CONTINUOUS,
159                );
160        }
161
162        #       if ( not defined $SERVICE_TIMER and $self->use_threads ) {
163        #               my $timer ;
164        #       }
165
166        return $self;
167}
168
169# This is separated out to its own routine in order to
170# squash the "Scalars Leaked" warning (or at least one of them).
171# Previously, the warning pointed to the "my $main = ..." line.
172# This move of the event setup was a wild guess that changing the
173# scope might help. --Steffen
174sub _init_events {
175        my $main = shift;
176        @_ = ();
177        unless ($EVENTS_INITIALIZED) {
178                no warnings 'once';
179                Wx::Event::EVT_COMMAND(
180                        $main, -1,
181                        $Padre::SlaveDriver::TASK_DONE_EVENT,
182                        \&on_task_done_event,
183                );
184                Wx::Event::EVT_COMMAND(
185                        $main, -1,
186                        $Padre::SlaveDriver::TASK_START_EVENT,
187                        \&on_task_start_event,
188                );
189                Wx::Event::EVT_COMMAND(
190                        $main, -1,
191                        $SERVICE_POLL_EVENT,
192                        \&on_service_poll_event,
193                );
194                $EVENTS_INITIALIZED = 1;
195        }
196}
197
198=pod
199
200=head2 Instance Methods
201
202=head3 C<schedule>
203
204Given a C<Padre::Task> instance (or rather an instance of a subclass),
205schedule that task for execution in a worker thread.
206If you call the C<schedule> method of the task object, it will
207proxy to this method for convenience.
208
209=cut
210
211sub schedule {
212        my $self = shift;
213        my $task = Params::Util::_INSTANCE( shift, 'Padre::Task' )
214                or die "Invalid task scheduled!"; # TO DO: grace
215
216        if ( Params::Util::_INSTANCE( $task, 'Padre::Service' ) ) {
217                $self->{running_services}{$task} = $task;
218        }
219
220        # Cleanup old threads and refill the pool
221        $self->reap();
222
223        # Prepare and stop if vetoes
224        my $return = $task->prepare();
225        if ( $return and $return =~ /^break$/i ) {
226                return;
227        }
228
229        my $string;
230        $task->serialize( \$string );
231
232        if ( $self->use_threads ) {
233                require Time::HiRes;
234
235                # This is to make sure we don't indefinitely fill the
236                # queue if the CPU can't keep up. If it REALLY can't
237                # keep up, we *want* to block eventually.
238                # For now, the limit has been set to 5*NWORKERTHREADS
239                # which should be a lot.
240                while ( $self->task_queue->pending > 5 * $self->{max_no_workers} ) {
241
242                        # Sleep 10msec
243                        Time::HiRes::usleep(10000);
244                }
245                $self->task_queue->enqueue($string);
246
247        } else {
248
249                # TO DO: Instead of this hack, consider
250                # "reimplementing" the worker loop
251                # as a non-threading, non-queued, fake worker loop
252                $self->task_queue->enqueue($string);
253                $self->task_queue->enqueue("STOP");
254                require Padre::SlaveDriver;
255                no warnings 'once';
256                if ( not defined $Padre::SlaveDriver::TASK_DONE_EVENT ) {
257                        Padre::SlaveDriver->_init_events();
258                }
259                Padre::SlaveDriver::_worker_loop( $self->task_queue );
260        }
261
262        return 1;
263}
264
265=pod
266
267=head3 C<setup_workers>
268
269Create more workers if necessary. Called by C<reap> which
270is called regularly by the reap timer, so users don't
271typically need to call this.
272
273=cut
274
275sub setup_workers {
276        my $self = shift;
277        @_ = (); # Avoid "Scalars leaked"
278
279        return unless $self->use_threads;
280
281        my $main = Padre->ide->wx->main;
282
283        # Ensure minimum no. workers
284        my $workers = $self->{workers};
285        while ( @$workers < $self->{min_no_workers} ) {
286                $self->_make_worker_thread($main);
287        }
288
289        # Add workers to satisfy demand
290        my $jobs_pending = $self->task_queue->pending();
291        if ( @$workers < $self->{max_no_workers} and $jobs_pending > 2 * @$workers ) {
292                my $target = int( $jobs_pending / 2 );
293                $target = $self->{max_no_workers} if $target > $self->{max_no_workers};
294                $self->_make_worker_thread($main) for 1 .. ( $target - @$workers );
295        }
296
297        return 1;
298}
299
300# short method to create a new thread
301sub _make_worker_thread {
302        my $self = shift;
303        my $main = shift;
304        return unless $self->use_threads;
305
306
307        # To be removed: Old worker thread cration. => Padre::SlaveDriver
308        #       @_ = (); # avoid "Scalars leaked"
309        #       my $worker = threads->create(
310        #               { 'exit' => 'thread_only' }, \&worker_loop,
311        #               $main, $self->task_queue
312        #       );
313        my $worker = Padre::SlaveDriver->new->spawn($self);
314        die if not ref $worker;
315        push @{ $self->{workers} }, $worker;
316}
317
318=pod
319
320=head3 C<reap>
321
322Check for worker threads that have exited and can be joined.
323If there are more worker threads than the normal number and
324they are idle, one worker thread (per C<reap> call) is
325stopped.
326
327This method is called regularly by the reap timer (see
328the C<reap_interval> option to the constructor) and it's not
329typically called by users.
330
331=cut
332
333sub reap {
334        my $self = shift;
335        return if not $self->use_threads;
336
337        @_ = (); # avoid "Scalars leaked"
338        my $workers = $self->{workers};
339
340        my @active_or_waiting;
341
342        #warn "No. worker threads before reaping: ".scalar (@$workers);
343
344        foreach my $thread (@$workers) {
345                if ( $thread->is_joinable() ) {
346                        my $tid = $thread->tid();
347
348                        # clean up the running task if necessary (case of crashed thread)
349                        $self->_stop_task($tid);
350                        my $tmp = $thread->join();
351                } else {
352                        push @active_or_waiting, $thread;
353                }
354        }
355        $self->{workers} = \@active_or_waiting;
356
357        #warn "No. worker threads after reaping:  ".scalar (@$workers);
358
359        # kill the no. of workers that aren't needed
360        my $n_threads_to_kill = @active_or_waiting - $self->{max_no_workers};
361        $n_threads_to_kill = 0 if $n_threads_to_kill < 0;
362        my $jobs_pending = $self->task_queue->pending();
363
364        # slowly reduce the no. workers to the minimum
365        $n_threads_to_kill++
366                if @active_or_waiting - $n_threads_to_kill > $self->{min_no_workers}
367                        and $jobs_pending == 0;
368
369        if ($n_threads_to_kill) {
370
371                # my $target_n_threads = @active_or_waiting - $n_threads_to_kill;
372                my $queue = $self->task_queue;
373                $queue->insert( 0, ("STOP") x $n_threads_to_kill )
374                        unless $queue->pending()
375                                and not ref( $queue->peek(0) );
376        }
377
378        $self->setup_workers();
379
380        return 1;
381}
382
383sub _stop_task {
384        my $self      = shift;
385        my $tid       = shift;
386        my $task_type = shift;
387
388        my $running = $self->{running_tasks};
389
390        if ( not defined $task_type ) { # attempt cleanup after crash
391                foreach my $task_type ( keys %$running ) {
392                        delete $running->{$task_type}{$tid};
393                        delete $running->{$task_type} if not keys %{ $running->{$task_type} };
394                }
395        } else {
396                delete $running->{$task_type}{$tid};
397                delete $running->{$task_type} if not keys %{ $running->{$task_type} };
398        }
399
400        Padre->ide->wx->main->GetStatusBar->refresh;
401        return (1);
402}
403
404=pod
405
406=head3 C<cleanup>
407
408Shutdown all services with a HANGUP, then stop all worker threads.
409Called on editor shutdown.
410
411=cut
412
413sub cleanup {
414        my $self = shift;
415        return if not $self->use_threads;
416
417        # Send all services a HANGUP , they will (hopefully)
418        # catch this and break the run loop, returning below as
419        # regular tasks. :|
420        TRACE('Tell services to hangup') if DEBUG;
421        $self->shutdown_services;
422
423        # the nice way:
424        TRACE('Tell all tasks to stop') if DEBUG;
425        my @workers = $self->workers;
426        $self->task_queue->insert( 0, ("STOP") x scalar(@workers) );
427
428        my $waitstart = [gettimeofday()];
429
430        # Changing the selection seems to solve the endless-loop problem
431        #       while ( threads->list(threads::running) >= 2 ) {
432        while ( threads->list(threads::joinable) > 0 ) {
433                for ( threads->list(threads::joinable) ) {
434                        $_->join;
435                }
436
437                # Wait no more than two minutes
438                last    if( tv_interval($waitstart) >= (2*60) );
439
440                # Pass time slices to the threads for finishing
441                threads->yield();
442        }
443
444        foreach my $thread ( threads->list(threads::joinable) ) {
445                TRACE( 'Joining thread ' . $thread->tid ) if DEBUG;
446                $thread->join;
447        }
448
449        # cleanup master thread, too
450        Padre::SlaveDriver->new->cleanup;
451
452        # didn't work the nice way?
453        while ( threads->list(threads::running) >= 1 ) {
454                TRACE( 'Killing thread ' . $_->tid ) if DEBUG;
455                foreach ( threads->list(threads::running) ) {
456                        $_->detach;
457                        $_->kill('TERM');
458                }
459        }
460
461        return 1;
462}
463
464=pod
465
466=head2 Accessors
467
468=head3 C<task_queue>
469
470Returns the queue of tasks to be processed as a
471L<Thread::Queue> object. The tasks in the
472queue have been serialized for passing between threads,
473so this is mostly useful internally or
474for checking the number of outstanding jobs.
475
476=head3 C<reap_interval>
477
478Returns the number of milliseconds between the
479regular cleanup runs.
480
481=head3 C<use_threads>
482
483Returns whether running in degraded mode (no threads, false)
484or normal operation (threads, true).
485
486=head3 C<running_tasks>
487
488Returns the number of tasks that are currently being executed.
489
490=cut
491
492sub running_tasks {
493        my $self = shift;
494        my $n    = 0;
495        foreach my $task_type_hash ( values %{ $self->{running_tasks} } ) {
496                $n += keys %$task_type_hash;
497        }
498        return $n;
499}
500
501=pod
502
503=head3 C<shutdown_services>
504
505Gracefully shutdown the services by instructing them to hangup themselves
506and return via the usual Task mechanism.
507
508=cut
509
510## ERM FIX ME where are is the {running_services} populated then eh?
511sub shutdown_services {
512        my $self = shift;
513        TRACE('Shutdown services') if DEBUG;
514
515        while ( my ( $sid, $service ) = each %{ $self->{running_services} } ) {
516                TRACE("Hangup service $sid!") if DEBUG;
517                $service->shutdown;
518        }
519}
520
521=pod
522
523=head3 C<workers>
524
525Returns B<a list> of the worker threads.
526
527=cut
528
529sub workers {
530        $_[0]->{workers};
531}
532
533=pod
534
535=head2 Event Handlers
536
537=head3 C<on_task_done_event>
538
539This event handler is called when a background task has
540finished execution. It deserializes the background task
541object and calls its C<finish> method with the
542Padre main window object as first argument. (This is done
543because C<finish> most likely updates the GUI.)
544
545=cut
546
547sub on_task_done_event {
548        my ( $main, $event ) = @_;
549        @_ = (); # hack to avoid "Scalars leaked"
550        my $frozen = $event->GetData;
551
552        # FIXME - can we know the _real_ class so the an extender
553        #  may hook de/serialize
554        my $task = Padre::Task->deserialize( \$frozen );
555
556        $task->finish($main);
557        my $tid = $task->{__thread_id};
558
559        # TO DO/FIXME:
560        # This should somehow get at the specific TaskManager object
561        # instead of going through the Padre globals!
562        my $manager   = Padre->ide->task_manager;
563        my $running   = $manager->{running_tasks};
564        my $task_type = ref($task);
565        $manager->_stop_task( $tid, $task_type );
566
567        return ();
568}
569
570=pod
571
572=head3 C<on_task_start_event>
573
574This event handler is called when a background task is about to start
575execution.
576It simply increments the running task counter.
577
578=cut
579
580sub on_task_start_event {
581        my ( $wx, $event ) = @_; @_ = (); # hack to avoid "Scalars leaked"
582                                          # TO DO/FIXME:
583                                          # This should somehow get at the specific TaskManager object
584                                          # instead of going through the Padre globals!
585        my $main              = $wx->main;
586        my $manager           = Padre->ide->task_manager;
587        my $tid_and_task_type = $event->GetData();
588        my ( $tid, $task_type ) = split /;/, $tid_and_task_type, 2;
589        $manager->{running_tasks}{$task_type}{$tid} = 1;
590        $main->GetStatusBar->refresh;
591
592        return ();
593}
594
595=pod
596
597=head3 C<on_service_poll_event>
598
599=cut
600
601sub on_service_poll_event {
602        my ( $main, $event ) = @_; @_ = ();
603        my $tid_and_type = $event->GetData();
604        my ( $tid, $type ) = split /;/, $tid_and_type, 2;
605        warn "Polled by service [$tid] as [$type]";
606        return ();
607}
608
609=pod
610
611=head3 C<on_dump_running_tasks>
612
613Called by the toolbar task-status button.
614Dumps the list of running tasks to the output panel.
615
616=cut
617
618sub on_dump_running_tasks {
619        my $ide      = Padre->ide;
620        my $manager  = $ide->task_manager;
621        my $nrunning = $manager->running_tasks();
622
623        my $main   = $ide->wx->main;
624        my $output = $main->output;
625        $main->show_output(1);
626        $output->style_neutral;
627
628        $output->AppendText( "\n-----------------------------------------\n["
629                        . localtime() . "] "
630                        . sprintf( Wx::gettext("%s worker threads are running.\n"), scalar( $manager->workers ) ) );
631        if ( $nrunning == 0 ) {
632                $output->AppendText( Wx::gettext("Currently, no background tasks are being executed.\n") );
633                return ();
634        }
635
636        my $running = $manager->{running_tasks};
637        my $text;
638        $text .= Wx::gettext("The following tasks are currently executing in the background:\n");
639
640        foreach my $type ( keys %$running ) {
641                my $threads = $running->{$type};
642                my $n       = keys %$threads;
643                $text .= sprintf(
644                        Wx::gettext("- %s of type '%s':\n  (in thread(s) %s)\n"),
645                        $n, $type, join( ", ", sort { $a <=> $b } keys %$threads )
646                );
647        }
648
649        $output->AppendText($text);
650
651        my $queue   = $manager->task_queue;
652        my $pending = $queue->pending;
653
654        if ($pending) {
655                $output->AppendText(
656                        sprintf( Wx::gettext("\nAdditionally, there are %s tasks pending execution.\n"), $pending ) );
657        }
658}
659
660
6611;
662
663=pod
664
665=head1 TO DO
666
667What if the computer can't keep up with the queued jobs? This needs
668some consideration and probably, the C<schedule()> call needs to block once
669the queue is I<"full">. However, it's not clear how this can work if the
670Wx C<MainLoop> isn't reached for processing finish events.
671
672Polling services I<aliveness> in a useful way, something a C<Wx::Taskmanager>
673might like to display. Ability to selectively kill tasks/services
674
675=head1 SEE ALSO
676
677The base class of all I<"work units"> is L<Padre::Task>.
678
679=head1 AUTHOR
680
681Steffen Mueller C<smueller@cpan.org>
682
683=head1 COPYRIGHT AND LICENSE
684
685Copyright 2008-2010 The Padre development team as listed in Padre.pm.
686
687This program is free software; you can redistribute it and/or
688modify it under the same terms as Perl 5 itself.
689
690=cut
691
692# Copyright 2008-2010 The Padre development team as listed in Padre.pm.
693# LICENSE
694# This program is free software; you can redistribute it and/or
695# modify it under the same terms as Perl 5 itself.
Note: See TracBrowser for help on using the browser.