diff --git a/Changes b/Changes index 3d1edfe93..601fa9f20 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,10 @@ {{$NEXT}} + - Make collector options configurable + - Add collector option for max poll events + - Add collector option for max open jobs + - Add option to allow multiple collectors + 1.000038 2020-11-02 20:49:12-08:00 America/Los_Angeles - Add shellcall and aux output capture for plugins diff --git a/lib/App/Yath/Command/collector.pm b/lib/App/Yath/Command/collector.pm index 3d44c42d8..e025ae816 100644 --- a/lib/App/Yath/Command/collector.pm +++ b/lib/App/Yath/Command/collector.pm @@ -34,22 +34,27 @@ sub run { my $run = Test2::Harness::Run->new(%{decode_json()}); - my $collector = $collector_class->new( + my $collector; + $collector = $collector_class->new( %args, settings => $settings, workdir => $dir, run_id => $run_id, runner_pid => $runner_pid, run => $run, + output_fh => $fh, # as_json may already have the json form of the event cached, if so # we can avoid doing an extra call to encode_json - action => sub { print $fh defined($_[0]) ? $_[0]->as_json . "\n" : "null\n"; }, + action => sub { print {$collector->output_fh} defined($_[0]) ? ref($_[0]) ? $_[0]->as_json . "\n" : $_[0] : "null\n"; }, ); local $SIG{PIPE} = 'IGNORE'; my $ok = eval { $collector->process(); 1 }; my $err = $@; + # Avoid refcycle + delete $collector->{action}; + eval { print $fh "null\n"; 1 } or warn $@; die $err unless $ok; diff --git a/lib/App/Yath/Command/start.pm b/lib/App/Yath/Command/start.pm index 9d7370cbc..58d78034b 100644 --- a/lib/App/Yath/Command/start.pm +++ b/lib/App/Yath/Command/start.pm @@ -35,6 +35,7 @@ include_options( 'App::Yath::Options::Runner', 'App::Yath::Options::Workspace', 'App::Yath::Options::Persist', + 'App::Yath::Options::Collector', ); option_group {prefix => 'runner', category => "Persistent Runner Options"} => sub { diff --git a/lib/App/Yath/Command/test.pm b/lib/App/Yath/Command/test.pm index 2437e617f..5c1161c77 100644 --- a/lib/App/Yath/Command/test.pm +++ b/lib/App/Yath/Command/test.pm @@ -61,6 +61,7 @@ include_options( 'App::Yath::Options::Run', 'App::Yath::Options::Runner', 'App::Yath::Options::Workspace', + 'App::Yath::Options::Collector', ); sub MAX_ATTACH() { 1_048_576 } diff --git a/lib/App/Yath/Options/Collector.pm b/lib/App/Yath/Options/Collector.pm new file mode 100644 index 000000000..6efb06e64 --- /dev/null +++ b/lib/App/Yath/Options/Collector.pm @@ -0,0 +1,82 @@ +package App::Yath::Options::Collector; +use strict; +use warnings; + +our $VERSION = '1.000039'; + +use App::Yath::Options; + +option_group {prefix => 'collector', category => "Collector Options"} => sub { + option max_open_jobs => ( + type => 's', + description => 'Maximum number of jobs a collector can process at a time, if more jobs are pending their output will be delayed until the earlier jobs have been processed. (Default: 300)', + default => 300, + long_examples => [' 300'], + short_examples => [' 300'], + ); + + option spawn_worker_at_max => ( + type => 'b', + description => 'Spawn an extra collector whenever we hit max_open_jobs. (Default: off)', + default => 0, + ); + + option max_poll_events => ( + type => 's', + description => 'Maximum number of events to poll from a job before jumping to the next job. (Default: 1000)', + default => 1000, + long_examples => [' 1000'], + short_examples => [' 1000'], + ); +}; + +1; + +__END__ + + +=pod + +=encoding UTF-8 + +=head1 NAME + +App::Yath::Options::Collector - collector options for Yath. + +=head1 DESCRIPTION + +This is where the command line options for the collector are defined. + +=head1 PROVIDED OPTIONS POD IS AUTO-GENERATED + +=head1 SOURCE + +The source code repository for Test2-Harness can be found at +F. + +=head1 MAINTAINERS + +=over 4 + +=item Chad Granum Eexodist@cpan.orgE + +=back + +=head1 AUTHORS + +=over 4 + +=item Chad Granum Eexodist@cpan.orgE + +=back + +=head1 COPYRIGHT + +Copyright 2020 Chad Granum Eexodist7@gmail.comE. + +This program is free software; you can redistribute it and/or +modify it under the same terms as Perl itself. + +See F + +=cut diff --git a/lib/Test2/Harness/Collector.pm b/lib/Test2/Harness/Collector.pm index 0b7b65b5a..429f633b5 100644 --- a/lib/Test2/Harness/Collector.pm +++ b/lib/Test2/Harness/Collector.pm @@ -12,6 +12,7 @@ use Test2::Harness::Util::UUID qw/gen_uuid/; use Test2::Harness::Util::Queue; use Time::HiRes qw/sleep time/; use File::Spec; +use POSIX qw/:sys_wait_h/; use File::Path qw/remove_tree/; @@ -24,6 +25,9 @@ use Test2::Harness::Util::HashBase qw{ {+WAIT_TIME} //= 0.02; - $self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $self->settings, about => {no_display => 1})); + $self->{+WORKERS} = []; +} + +sub spawn_worker { + my $self = shift; + + my $workers = $self->{+WORKERS} //= []; + + my ($rh, $wh); + pipe($rh, $wh) or die "Could not open pipe: $!"; + + my $pid = fork // die "Could not fork: $!"; + if ($pid) { + close($wh); + $rh->blocking(0); + my $buffer = ""; + push @$workers => [$pid, $rh, \$buffer]; + + # Child will process the jobs we had, we start from scratch; + $self->{+JOBS} = {}; + + return $pid; + } + + # State Management + $self->{+WORKER} = 1; + $self->{+JOBS_DONE} = 1; + $self->{+TASKS_DONE} = 1; + $self->{+WORKERS} = []; + + close($rh); + + $wh->autoflush(1); + $self->{+OUTPUT_FH} = $wh; + + return; +} + +sub process_workers { + my $self = shift; + + my $workers = $self->{+WORKERS} or return 0; + my $count = 0; + + my $max_poll_events = $self->settings->collector->max_poll_events; + + for my $worker (@$workers) { + my ($pid, $rh, $buffer) = @$worker; + $count++; + + my $check = waitpid($pid, WNOHANG); + my $exit = $?; + my $done = $check < 0 || $check == $pid; + + if ($done) { + my $e = $self->_harness_event(0, undef, time, info => [{details => "Collector $pid complete ($exit)", tag => "INTERNAL", debug => 0, important => 0}]); + $self->{+ACTION}->($e); + + + @$workers = grep { "$worker" ne "$_" } @$workers; + + $rh->blocking(1); + for my $line (<$rh>) { + $$buffer .= $line; + } + } + else { + my $lcount = 0; + for my $line (<$rh>) { + $$buffer .= $line; + last if $lcount++ >= $max_poll_events; + } + } + + my $oldbuf = $$buffer; + $$buffer = ""; + + for my $line (split /^/, $oldbuf) { + unless ($done || substr($line, -1, 1) eq "\n") { + $$buffer = $line; + last; + } + + $count++; + $self->{+ACTION}->($line); + } + + # Do this here to make sure all events from the collector are received + die "Collector worker ($pid) did not exit properly (check: $check, exit: $exit)" if $done && $exit || $check < 0; + } + + return $count; } sub process { my $self = shift; + my $settings = $self->settings; + + $self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $settings, about => {no_display => 1})); + while (1) { my $count = 0; + $count += $self->process_workers if $self->{+WORKERS}; $count += $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT}; $count += $self->process_tasks(); my $jobs = $self->jobs; - unless (keys %$jobs) { + unless (keys %$jobs || @{$self->{+WORKERS}}) { last if $self->{+JOBS_DONE}; last if $self->runner_done; } while(my ($job_try, $jdir) = each %$jobs) { + $count++; my $e_count = 0; - for my $event ($jdir->poll(1000)) { + for my $event ($jdir->poll($settings->collector->max_poll_events)) { $self->{+ACTION}->($event); - $count++; + $e_count++; } $count += $e_count; @@ -76,7 +177,7 @@ sub process { my $done = $jdir->done or next; delete $jobs->{$job_try}; - unless ($self->settings->debug->keep_dirs) { + unless ($settings->debug->keep_dirs) { my $job_path = $jdir->job_root; # Needed because we set the perms so that a tmpdir under it can be used. # This is the only remove_tree that needs it because it is the @@ -88,16 +189,18 @@ sub process { delete $self->{+PENDING}->{$jdir->job_id} unless $done->{retry}; } - last if !$count && $self->runner_exited; + last if !$count && !@{$self->{+WORKERS}} && ($self->runner_exited || $self->{+WORKER}); sleep $self->{+WAIT_TIME} unless $count; } + exit(0) if $self->{+WORKER}; + # One last slurp $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT}; $self->{+ACTION}->(undef) if $self->{+JOBS_DONE} && $self->{+TASKS_DONE}; - remove_tree($self->{+RUN_DIR}, {safe => 1, keep_root => 0}) unless $self->settings->debug->keep_dirs; + remove_tree($self->{+RUN_DIR}, {safe => 1, keep_root => 0}) unless $settings->debug->keep_dirs; return; } @@ -123,6 +226,8 @@ sub runner_exited { sub process_runner_output { my $self = shift; + return 0 if $self->{+WORKER}; + my $out = 0; return $out unless $self->{+SHOW_RUNNER_OUTPUT}; @@ -189,6 +294,7 @@ sub process_runner_output { sub process_tasks { my $self = shift; + return 0 if $self->{+WORKER}; return 0 if $self->{+TASKS_DONE}; my $queue = $self->tasks_queue or return 0; @@ -214,16 +320,66 @@ sub process_tasks { return $count; } +sub handle_backed_up { + my $self = shift; + + if ($self->settings->collector->spawn_worker_at_max) { + my $pid = $self->spawn_worker() or return; + my $e = $self->_harness_event(0, undef, time, info => [{details => "Spawned an extra collector: $pid", tag => "INTERNAL", debug => 0, important => 0}]); + $self->{+ACTION}->($e); + return; + } + + return if $self->{+BACKED_UP}++; + + # This is an unlikely code path. If we're here, it means the last loop couldn't process any results. + my $e = $self->_harness_event(0, undef, time, info => [{details => <<" EOT", tag => "INTERNAL", debug => 1, important => 1}]); +*** THIS IS NOT FATAL *** + + * The collector has reached the maximum number of concurrent jobs to process. + * Testing will continue, but some tests may be running or even complete before they are rendered. + * All tests and events will eventually be displayed, and your final results will not be effected. + +Set a higher --max-open-jobs collector setting to prevent this problem in the +future, but be advised that could result in too many open filehandles on some +systems. + +This message will only be shown once. + EOT + + $self->{+ACTION}->($e); + return; +} + sub jobs { my $self = shift; my $jobs = $self->{+JOBS} //= {}; + my $buffer = $self->{+JOBS_BUFFER} //= []; - return $jobs if $self->{+JOBS_DONE}; + return $jobs if $self->{+WORKER}; + return $jobs if $self->{+JOBS_DONE} && !@{$self->{+JOBS_BUFFER}}; my $queue = $self->jobs_queue or return $jobs; - for my $item ($queue->poll) { + my $max_open_jobs = $self->settings->collector->max_open_jobs // 1024; + push @$buffer => $queue->poll(); + + while (my $item = shift @$buffer) { + if (keys(%$jobs) >= $max_open_jobs) { + $self->handle_backed_up; + + # This may have changed + $jobs = $self->{+JOBS}; + + return $jobs if $self->{+WORKER}; + + if (keys %$jobs) { + unshift @$buffer => $item; + return $jobs; + } + } + my ($spos, $epos, $job) = @$item; unless ($job) { diff --git a/lib/Test2/Harness/Util/Queue.pm b/lib/Test2/Harness/Util/Queue.pm index 1bdc3fba2..8a716cb40 100644 --- a/lib/Test2/Harness/Util/Queue.pm +++ b/lib/Test2/Harness/Util/Queue.pm @@ -43,11 +43,12 @@ sub reset { sub poll { my $self = shift; + my $max = shift; return $self->{+ENDED} if $self->{+ENDED}; $self->{+QH} ||= Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE}); - my @out = $self->{+QH}->poll_with_index(); + my @out = $self->{+QH}->poll_with_index( $max ? (max => $max) : () ); $self->{+ENDED} = $out[-1] if @out && !defined($out[-1]->[-1]);