From 83c92da5d3428185c226288b32a6e09fab306b1b Mon Sep 17 00:00:00 2001 From: Todd Rinaldo Date: Mon, 9 Nov 2020 13:03:53 -0600 Subject: [PATCH 1/4] Avoid opening too many jobs in the collector. Only parse so many at one time. Most unix systems limit the number of file handles any one process can have open at a time to 1024. When the collector gets behind, it may try to open too many files for completed jobs at once. Limit this with a new collector setting which defaults to 300. They'll be picked up once the ones polled are processed. This commit adds App::Yath::Options::Collector --- lib/App/Yath/Command/collector.pm | 9 ++++- lib/App/Yath/Options/Collector.pm | 65 +++++++++++++++++++++++++++++++ lib/Test2/Harness/Collector.pm | 17 +++++++- lib/Test2/Harness/Util/Queue.pm | 3 +- 4 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 lib/App/Yath/Options/Collector.pm diff --git a/lib/App/Yath/Command/collector.pm b/lib/App/Yath/Command/collector.pm index 3d44c42d8..89d2d039a 100644 --- a/lib/App/Yath/Command/collector.pm +++ b/lib/App/Yath/Command/collector.pm @@ -6,6 +6,7 @@ our $VERSION = '1.000039'; use File::Spec; +use App::Yath::Options; use App::Yath::Util qw/isolate_stdout/; use Test2::Harness::Util::JSON qw/decode_json/; @@ -16,6 +17,12 @@ use Test2::Harness::Run; use parent 'App::Yath::Command'; use Test2::Harness::Util::HashBase; +include_options( + 'App::Yath::Options::Debug', + 'App::Yath::Options::PreCommand', + 'App::Yath::Options::Collector', +); + sub internal_only { 1 } sub summary { "For internal use only" } sub name { 'collector' } @@ -28,7 +35,7 @@ sub run { my $fh = isolate_stdout(); - my $settings = Test2::Harness::Settings->new(File::Spec->catfile($dir, 'settings.json')); + my $settings = $self->settings; require(mod2file($collector_class)); diff --git a/lib/App/Yath/Options/Collector.pm b/lib/App/Yath/Options/Collector.pm new file mode 100644 index 000000000..bedb536c0 --- /dev/null +++ b/lib/App/Yath/Options/Collector.pm @@ -0,0 +1,65 @@ +package App::Yath::Options::Collector; +use strict; +use warnings; + +our $VERSION = '1.000039'; + +use App::Yath::Options; + +option_group {prefix => 'collector', category => "Collector Options"} => sub { + option max_jobs_to_process => ( + description => 'The maximum number of jobs that the collector can process each loop (Default: 300)', + default => 300, + ); +}; + +1; + +__END__ + + +=pod + +=encoding UTF-8 + +=head1 NAME + +App::Yath::Options::Collector - collector options for Yath. + +=head1 DESCRIPTION + +This is where the command line options for the collector are defined. + +=head1 PROVIDED OPTIONS POD IS AUTO-GENERATED + +=head1 SOURCE + +The source code repository for Test2-Harness can be found at +F. + +=head1 MAINTAINERS + +=over 4 + +=item Chad Granum Eexodist@cpan.orgE + +=back + +=head1 AUTHORS + +=over 4 + +=item Chad Granum Eexodist@cpan.orgE + +=back + +=head1 COPYRIGHT + +Copyright 2020 Chad Granum Eexodist7@gmail.comE. + +This program is free software; you can redistribute it and/or +modify it under the same terms as Perl itself. + +See F + +=cut diff --git a/lib/Test2/Harness/Collector.pm b/lib/Test2/Harness/Collector.pm index 0b7b65b5a..7c9a37400 100644 --- a/lib/Test2/Harness/Collector.pm +++ b/lib/Test2/Harness/Collector.pm @@ -221,9 +221,19 @@ sub jobs { return $jobs if $self->{+JOBS_DONE}; + # Don't monitor more than 'max_jobs_to_process' or we might have too many open file handles and crash + # Max open files handles on a process applies. Usually this is 1024 so we can't have everything open at once when we're behind. + my $max_jobs_to_process = $self->settings->collector->max_jobs_to_process // 1024; + my $additional_jobs_to_parse = $max_jobs_to_process - scalar keys %$jobs; + if($additional_jobs_to_parse <= 0) { + # This is an unlikely code path. If we're here, it means the last loop couldn't process any results. + print "WARN: max jobs still exceded. Continuing to process jobs\n"; + return $jobs; + } + my $queue = $self->jobs_queue or return $jobs; - for my $item ($queue->poll) { + for my $item ($queue->poll($additional_jobs_to_parse)) { my ($spos, $epos, $job) = @$item; unless ($job) { @@ -271,6 +281,11 @@ sub jobs { ); } + # The collector didn't read in all the jobs because it'd run out of file handles. We need to let the stream know we're behind. + if( scalar keys %$jobs >= $max_jobs_to_process ) { + print STDERR "WARN: The Yath Collector is running behind. More than $max_jobs_to_process test results have not been processed.\n"; + } + return $jobs; } diff --git a/lib/Test2/Harness/Util/Queue.pm b/lib/Test2/Harness/Util/Queue.pm index 1bdc3fba2..8a716cb40 100644 --- a/lib/Test2/Harness/Util/Queue.pm +++ b/lib/Test2/Harness/Util/Queue.pm @@ -43,11 +43,12 @@ sub reset { sub poll { my $self = shift; + my $max = shift; return $self->{+ENDED} if $self->{+ENDED}; $self->{+QH} ||= Test2::Harness::Util::File::JSONL->new(name => $self->{+FILE}); - my @out = $self->{+QH}->poll_with_index(); + my @out = $self->{+QH}->poll_with_index( $max ? (max => $max) : () ); $self->{+ENDED} = $out[-1] if @out && !defined($out[-1]->[-1]); From e81da2d2828f9b740310ae1b8f5c65e0348e205b Mon Sep 17 00:00:00 2001 From: Chad Granum Date: Mon, 9 Nov 2020 19:59:28 -0800 Subject: [PATCH 2/4] Clean up and re-work PR for too many open jobs * Added another collector option * Renamed collector option * Put option in test and start commands * Make warning when job limit is hit a proper event * Only issue the warning once per test run * Use the correct settings file in collector --- Changes | 4 +++ lib/App/Yath/Command/collector.pm | 9 +----- lib/App/Yath/Command/start.pm | 1 + lib/App/Yath/Command/test.pm | 1 + lib/App/Yath/Options/Collector.pm | 15 ++++++++-- lib/Test2/Harness/Collector.pm | 49 +++++++++++++++++++++++-------- 6 files changed, 57 insertions(+), 22 deletions(-) diff --git a/Changes b/Changes index 3d1edfe93..2fe43c3b2 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,9 @@ {{$NEXT}} + - Make collector options configurable + - Add collector option for max poll events + - Add collector option for max open jobs + 1.000038 2020-11-02 20:49:12-08:00 America/Los_Angeles - Add shellcall and aux output capture for plugins diff --git a/lib/App/Yath/Command/collector.pm b/lib/App/Yath/Command/collector.pm index 89d2d039a..3d44c42d8 100644 --- a/lib/App/Yath/Command/collector.pm +++ b/lib/App/Yath/Command/collector.pm @@ -6,7 +6,6 @@ our $VERSION = '1.000039'; use File::Spec; -use App::Yath::Options; use App::Yath::Util qw/isolate_stdout/; use Test2::Harness::Util::JSON qw/decode_json/; @@ -17,12 +16,6 @@ use Test2::Harness::Run; use parent 'App::Yath::Command'; use Test2::Harness::Util::HashBase; -include_options( - 'App::Yath::Options::Debug', - 'App::Yath::Options::PreCommand', - 'App::Yath::Options::Collector', -); - sub internal_only { 1 } sub summary { "For internal use only" } sub name { 'collector' } @@ -35,7 +28,7 @@ sub run { my $fh = isolate_stdout(); - my $settings = $self->settings; + my $settings = Test2::Harness::Settings->new(File::Spec->catfile($dir, 'settings.json')); require(mod2file($collector_class)); diff --git a/lib/App/Yath/Command/start.pm b/lib/App/Yath/Command/start.pm index 9d7370cbc..58d78034b 100644 --- a/lib/App/Yath/Command/start.pm +++ b/lib/App/Yath/Command/start.pm @@ -35,6 +35,7 @@ include_options( 'App::Yath::Options::Runner', 'App::Yath::Options::Workspace', 'App::Yath::Options::Persist', + 'App::Yath::Options::Collector', ); option_group {prefix => 'runner', category => "Persistent Runner Options"} => sub { diff --git a/lib/App/Yath/Command/test.pm b/lib/App/Yath/Command/test.pm index 2437e617f..5c1161c77 100644 --- a/lib/App/Yath/Command/test.pm +++ b/lib/App/Yath/Command/test.pm @@ -61,6 +61,7 @@ include_options( 'App::Yath::Options::Run', 'App::Yath::Options::Runner', 'App::Yath::Options::Workspace', + 'App::Yath::Options::Collector', ); sub MAX_ATTACH() { 1_048_576 } diff --git a/lib/App/Yath/Options/Collector.pm b/lib/App/Yath/Options/Collector.pm index bedb536c0..c18b91a4f 100644 --- a/lib/App/Yath/Options/Collector.pm +++ b/lib/App/Yath/Options/Collector.pm @@ -7,9 +7,20 @@ our $VERSION = '1.000039'; use App::Yath::Options; option_group {prefix => 'collector', category => "Collector Options"} => sub { - option max_jobs_to_process => ( - description => 'The maximum number of jobs that the collector can process each loop (Default: 300)', + option max_open_jobs => ( + type => 's', + description => 'Maximum number of jobs a collector can process at a time, if more jobs are pending their output will be delayed until the earlier jobs have been processed. (Default: 300)', default => 300, + long_examples => [' 300'], + short_examples => [' 300'], + ); + + option max_poll_events => ( + type => 's', + description => 'Maximum number of events to poll from a job before jumping to the next job. (Default: 1000)', + default => 1000, + long_examples => [' 1000'], + short_examples => [' 1000'], ); }; diff --git a/lib/Test2/Harness/Collector.pm b/lib/Test2/Harness/Collector.pm index 7c9a37400..266ec7ca8 100644 --- a/lib/Test2/Harness/Collector.pm +++ b/lib/Test2/Harness/Collector.pm @@ -24,6 +24,8 @@ use Test2::Harness::Util::HashBase qw{ settings; + while (1) { my $count = 0; $count += $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT}; @@ -66,7 +70,7 @@ sub process { while(my ($job_try, $jdir) = each %$jobs) { my $e_count = 0; - for my $event ($jdir->poll(1000)) { + for my $event ($jdir->poll($self->settings->collector->max_poll_events // 1000)) { $self->{+ACTION}->($event); $count++; } @@ -76,7 +80,7 @@ sub process { my $done = $jdir->done or next; delete $jobs->{$job_try}; - unless ($self->settings->debug->keep_dirs) { + unless ($settings->debug->keep_dirs) { my $job_path = $jdir->job_root; # Needed because we set the perms so that a tmpdir under it can be used. # This is the only remove_tree that needs it because it is the @@ -97,7 +101,7 @@ sub process { $self->{+ACTION}->(undef) if $self->{+JOBS_DONE} && $self->{+TASKS_DONE}; - remove_tree($self->{+RUN_DIR}, {safe => 1, keep_root => 0}) unless $self->settings->debug->keep_dirs; + remove_tree($self->{+RUN_DIR}, {safe => 1, keep_root => 0}) unless $settings->debug->keep_dirs; return; } @@ -214,6 +218,29 @@ sub process_tasks { return $count; } +sub send_backed_up { + my $self = shift; + return if $self->{+BACKED_UP}++; + + # This is an unlikely code path. If we're here, it means the last loop couldn't process any results. + my $e = $self->_harness_event(0, undef, time, info => [{details => <<" EOT", tag => "INTERNAL", debug => 1, important => 1}]); +*** THIS IS NOT FATAL *** + + * The collector has reached the maximum number of concurrent jobs to process. + * Testing will continue, but some tests may be running or even complete before they are rendered. + * All tests and events will eventually be displayed, and your final results will not be effected. + +Set a higher --max-open-jobs collector setting to prevent this problem in the +future, but be advised that could result in too many open filehandles on some +systems. + +This message will only be shown once. + EOT + + $self->{+ACTION}->($e); + return; +} + sub jobs { my $self = shift; @@ -221,13 +248,13 @@ sub jobs { return $jobs if $self->{+JOBS_DONE}; - # Don't monitor more than 'max_jobs_to_process' or we might have too many open file handles and crash - # Max open files handles on a process applies. Usually this is 1024 so we can't have everything open at once when we're behind. - my $max_jobs_to_process = $self->settings->collector->max_jobs_to_process // 1024; - my $additional_jobs_to_parse = $max_jobs_to_process - scalar keys %$jobs; + # Don't monitor more than 'max_open_jobs' or we might have too many open file handles and crash + # Max open files handles on a process applies. Usually this is 1024 so we + # can't have everything open at once when we're behind. + my $max_open_jobs = $self->settings->collector->max_open_jobs // 1024; + my $additional_jobs_to_parse = $max_open_jobs - keys %$jobs; if($additional_jobs_to_parse <= 0) { - # This is an unlikely code path. If we're here, it means the last loop couldn't process any results. - print "WARN: max jobs still exceded. Continuing to process jobs\n"; + $self->send_backed_up; return $jobs; } @@ -282,9 +309,7 @@ sub jobs { } # The collector didn't read in all the jobs because it'd run out of file handles. We need to let the stream know we're behind. - if( scalar keys %$jobs >= $max_jobs_to_process ) { - print STDERR "WARN: The Yath Collector is running behind. More than $max_jobs_to_process test results have not been processed.\n"; - } + $self->send_backed_up if $max_open_jobs <= keys %$jobs; return $jobs; } From 923dfd211e74b5fd518ab195ef015854b7ab472a Mon Sep 17 00:00:00 2001 From: Chad Granum Date: Mon, 9 Nov 2020 20:25:51 -0800 Subject: [PATCH 3/4] Fix minor bug in collector Primarily a typo, but also a race condition that could have collector exit with jobs still waiting to send events. --- lib/Test2/Harness/Collector.pm | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/Test2/Harness/Collector.pm b/lib/Test2/Harness/Collector.pm index 266ec7ca8..2917a9f9d 100644 --- a/lib/Test2/Harness/Collector.pm +++ b/lib/Test2/Harness/Collector.pm @@ -69,10 +69,11 @@ sub process { } while(my ($job_try, $jdir) = each %$jobs) { + $count++; my $e_count = 0; for my $event ($jdir->poll($self->settings->collector->max_poll_events // 1000)) { $self->{+ACTION}->($event); - $count++; + $e_count++; } $count += $e_count; From 4734dd21fbf5d86701802f074d956f817667c359 Mon Sep 17 00:00:00 2001 From: Chad Granum Date: Mon, 9 Nov 2020 22:43:14 -0800 Subject: [PATCH 4/4] Add option to allow multiple collectors I am too tired right now after finally getting this working. It needs review, and I will probably want to clean it up myself after some sleep. --- Changes | 1 + lib/App/Yath/Command/collector.pm | 9 +- lib/App/Yath/Options/Collector.pm | 6 ++ lib/Test2/Harness/Collector.pm | 155 ++++++++++++++++++++++++++---- 4 files changed, 149 insertions(+), 22 deletions(-) diff --git a/Changes b/Changes index 2fe43c3b2..601fa9f20 100644 --- a/Changes +++ b/Changes @@ -3,6 +3,7 @@ - Make collector options configurable - Add collector option for max poll events - Add collector option for max open jobs + - Add option to allow multiple collectors 1.000038 2020-11-02 20:49:12-08:00 America/Los_Angeles diff --git a/lib/App/Yath/Command/collector.pm b/lib/App/Yath/Command/collector.pm index 3d44c42d8..e025ae816 100644 --- a/lib/App/Yath/Command/collector.pm +++ b/lib/App/Yath/Command/collector.pm @@ -34,22 +34,27 @@ sub run { my $run = Test2::Harness::Run->new(%{decode_json()}); - my $collector = $collector_class->new( + my $collector; + $collector = $collector_class->new( %args, settings => $settings, workdir => $dir, run_id => $run_id, runner_pid => $runner_pid, run => $run, + output_fh => $fh, # as_json may already have the json form of the event cached, if so # we can avoid doing an extra call to encode_json - action => sub { print $fh defined($_[0]) ? $_[0]->as_json . "\n" : "null\n"; }, + action => sub { print {$collector->output_fh} defined($_[0]) ? ref($_[0]) ? $_[0]->as_json . "\n" : $_[0] : "null\n"; }, ); local $SIG{PIPE} = 'IGNORE'; my $ok = eval { $collector->process(); 1 }; my $err = $@; + # Avoid refcycle + delete $collector->{action}; + eval { print $fh "null\n"; 1 } or warn $@; die $err unless $ok; diff --git a/lib/App/Yath/Options/Collector.pm b/lib/App/Yath/Options/Collector.pm index c18b91a4f..6efb06e64 100644 --- a/lib/App/Yath/Options/Collector.pm +++ b/lib/App/Yath/Options/Collector.pm @@ -15,6 +15,12 @@ option_group {prefix => 'collector', category => "Collector Options"} => sub { short_examples => [' 300'], ); + option spawn_worker_at_max => ( + type => 'b', + description => 'Spawn an extra collector whenever we hit max_open_jobs. (Default: off)', + default => 0, + ); + option max_poll_events => ( type => 's', description => 'Maximum number of events to poll from a job before jumping to the next job. (Default: 1000)', diff --git a/lib/Test2/Harness/Collector.pm b/lib/Test2/Harness/Collector.pm index 2917a9f9d..429f633b5 100644 --- a/lib/Test2/Harness/Collector.pm +++ b/lib/Test2/Harness/Collector.pm @@ -12,6 +12,7 @@ use Test2::Harness::Util::UUID qw/gen_uuid/; use Test2::Harness::Util::Queue; use Time::HiRes qw/sleep time/; use File::Spec; +use POSIX qw/:sys_wait_h/; use File::Path qw/remove_tree/; @@ -24,7 +25,8 @@ use Test2::Harness::Util::HashBase qw{ {+WAIT_TIME} //= 0.02; - $self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $self->settings, about => {no_display => 1})); + $self->{+WORKERS} = []; +} + +sub spawn_worker { + my $self = shift; + + my $workers = $self->{+WORKERS} //= []; + + my ($rh, $wh); + pipe($rh, $wh) or die "Could not open pipe: $!"; + + my $pid = fork // die "Could not fork: $!"; + if ($pid) { + close($wh); + $rh->blocking(0); + my $buffer = ""; + push @$workers => [$pid, $rh, \$buffer]; + + # Child will process the jobs we had, we start from scratch; + $self->{+JOBS} = {}; + + return $pid; + } + + # State Management + $self->{+WORKER} = 1; + $self->{+JOBS_DONE} = 1; + $self->{+TASKS_DONE} = 1; + $self->{+WORKERS} = []; + + close($rh); + + $wh->autoflush(1); + $self->{+OUTPUT_FH} = $wh; + + return; +} + +sub process_workers { + my $self = shift; + + my $workers = $self->{+WORKERS} or return 0; + my $count = 0; + + my $max_poll_events = $self->settings->collector->max_poll_events; + + for my $worker (@$workers) { + my ($pid, $rh, $buffer) = @$worker; + $count++; + + my $check = waitpid($pid, WNOHANG); + my $exit = $?; + my $done = $check < 0 || $check == $pid; + + if ($done) { + my $e = $self->_harness_event(0, undef, time, info => [{details => "Collector $pid complete ($exit)", tag => "INTERNAL", debug => 0, important => 0}]); + $self->{+ACTION}->($e); + + + @$workers = grep { "$worker" ne "$_" } @$workers; + + $rh->blocking(1); + for my $line (<$rh>) { + $$buffer .= $line; + } + } + else { + my $lcount = 0; + for my $line (<$rh>) { + $$buffer .= $line; + last if $lcount++ >= $max_poll_events; + } + } + + my $oldbuf = $$buffer; + $$buffer = ""; + + for my $line (split /^/, $oldbuf) { + unless ($done || substr($line, -1, 1) eq "\n") { + $$buffer = $line; + last; + } + + $count++; + $self->{+ACTION}->($line); + } + + # Do this here to make sure all events from the collector are received + die "Collector worker ($pid) did not exit properly (check: $check, exit: $exit)" if $done && $exit || $check < 0; + } + + return $count; } sub process { @@ -56,14 +149,17 @@ sub process { my $settings = $self->settings; + $self->{+ACTION}->($self->_harness_event(0, undef, time, harness_run => $self->{+RUN}, harness_settings => $settings, about => {no_display => 1})); + while (1) { my $count = 0; + $count += $self->process_workers if $self->{+WORKERS}; $count += $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT}; $count += $self->process_tasks(); my $jobs = $self->jobs; - unless (keys %$jobs) { + unless (keys %$jobs || @{$self->{+WORKERS}}) { last if $self->{+JOBS_DONE}; last if $self->runner_done; } @@ -71,7 +167,7 @@ sub process { while(my ($job_try, $jdir) = each %$jobs) { $count++; my $e_count = 0; - for my $event ($jdir->poll($self->settings->collector->max_poll_events // 1000)) { + for my $event ($jdir->poll($settings->collector->max_poll_events)) { $self->{+ACTION}->($event); $e_count++; } @@ -93,10 +189,12 @@ sub process { delete $self->{+PENDING}->{$jdir->job_id} unless $done->{retry}; } - last if !$count && $self->runner_exited; + last if !$count && !@{$self->{+WORKERS}} && ($self->runner_exited || $self->{+WORKER}); sleep $self->{+WAIT_TIME} unless $count; } + exit(0) if $self->{+WORKER}; + # One last slurp $self->process_runner_output if $self->{+SHOW_RUNNER_OUTPUT}; @@ -128,6 +226,8 @@ sub runner_exited { sub process_runner_output { my $self = shift; + return 0 if $self->{+WORKER}; + my $out = 0; return $out unless $self->{+SHOW_RUNNER_OUTPUT}; @@ -194,6 +294,7 @@ sub process_runner_output { sub process_tasks { my $self = shift; + return 0 if $self->{+WORKER}; return 0 if $self->{+TASKS_DONE}; my $queue = $self->tasks_queue or return 0; @@ -219,8 +320,16 @@ sub process_tasks { return $count; } -sub send_backed_up { +sub handle_backed_up { my $self = shift; + + if ($self->settings->collector->spawn_worker_at_max) { + my $pid = $self->spawn_worker() or return; + my $e = $self->_harness_event(0, undef, time, info => [{details => "Spawned an extra collector: $pid", tag => "INTERNAL", debug => 0, important => 0}]); + $self->{+ACTION}->($e); + return; + } + return if $self->{+BACKED_UP}++; # This is an unlikely code path. If we're here, it means the last loop couldn't process any results. @@ -246,22 +355,31 @@ sub jobs { my $self = shift; my $jobs = $self->{+JOBS} //= {}; + my $buffer = $self->{+JOBS_BUFFER} //= []; - return $jobs if $self->{+JOBS_DONE}; + return $jobs if $self->{+WORKER}; + return $jobs if $self->{+JOBS_DONE} && !@{$self->{+JOBS_BUFFER}}; + + my $queue = $self->jobs_queue or return $jobs; - # Don't monitor more than 'max_open_jobs' or we might have too many open file handles and crash - # Max open files handles on a process applies. Usually this is 1024 so we - # can't have everything open at once when we're behind. my $max_open_jobs = $self->settings->collector->max_open_jobs // 1024; - my $additional_jobs_to_parse = $max_open_jobs - keys %$jobs; - if($additional_jobs_to_parse <= 0) { - $self->send_backed_up; - return $jobs; - } + push @$buffer => $queue->poll(); - my $queue = $self->jobs_queue or return $jobs; + while (my $item = shift @$buffer) { + if (keys(%$jobs) >= $max_open_jobs) { + $self->handle_backed_up; + + # This may have changed + $jobs = $self->{+JOBS}; + + return $jobs if $self->{+WORKER}; + + if (keys %$jobs) { + unshift @$buffer => $item; + return $jobs; + } + } - for my $item ($queue->poll($additional_jobs_to_parse)) { my ($spos, $epos, $job) = @$item; unless ($job) { @@ -309,9 +427,6 @@ sub jobs { ); } - # The collector didn't read in all the jobs because it'd run out of file handles. We need to let the stream know we're behind. - $self->send_backed_up if $max_open_jobs <= keys %$jobs; - return $jobs; }