Skip to content

Improve file read performance using multi-threading #1

@gregeva

Description

@gregeva

File read performance can be quite sluggish when files are large and they are on remote storage.

Considering normal expected use of log timeline requires multiple executions to hone in on specific aspects of the logs, it is important that the executions are fast. Presently it is about 45s for 2 Million lines (900MB) read, parsed, processed from a built-in SSD. I expect this performance to be much, much worse over shared cloud storage where logs are being analysed in-place.

Here is some Copilot example code to accomplish what I am thinking:

use Parallel::ForkManager;
use Fcntl;
use IO::Select;

my $num_workers = 4;
my @files = @ARGV;
my $update_interval = 100_000; # Print update every 100,000 lines
my $worker_report_interval = $update_interval / $num_workers;

foreach my $file (@files) {
process_file($file);
}

sub process_file {
my ($file) = @_;

return unless -f $file;

my $filesize = -s $file;
my $chunk_size = int($filesize / $num_workers);

print "Processing $file ($filesize bytes) with $num_workers workers...\n";

my $pm = Parallel::ForkManager->new($num_workers);
my @readers;
my $total_lines_read = 0;

# Create pipes for worker-to-parent communication
for my $i (0 .. $num_workers - 1) {
    pipe(my $reader, my $writer) or die "Cannot create pipe: $!";
    push @readers, $reader;
    $pm->run_on_finish(sub {
        close $writer;
    });

    $pm->start and next;

    open my $fh, '<', $file or die "Cannot open file: $!";
    sysseek($fh, $i * $chunk_size, 0);

    # Ensure we start at the beginning of a line
    <$fh> if $i > 0;

    my $lines_read = 0;
    close $reader;  # Close unused reader end in worker
    open my $progress, ">&", $writer or die "Cannot duplicate pipe: $!";

    while (tell($fh) < ($i + 1) * $chunk_size && defined(my $line = <$fh>)) {
        process_line($line, $file);
        $lines_read++;

        # Report progress every (update_interval / num_workers) lines
        print $progress "$lines_read\n" if $lines_read % $worker_report_interval == 0;
    }

    close $fh;
    close $progress;
    $pm->finish;
}

# Parent process: Monitor and display aggregated progress
close $_ for @readers;
my $selector = IO::Select->new(@readers);

while ($selector->count > 0) {
    foreach my $fh ($selector->can_read(0.1)) {
        my $update = <$fh>;
        if (defined $update) {
            chomp($update);
            $total_lines_read += $update;

            # Print a single updating progress line
            print "\rProcessed $total_lines_read lines in $file..." if $total_lines_read >= $update_interval;
        } else {
            $selector->remove($fh);  # Remove closed pipes
        }
    }
}

# Clear the progress message before printing completion
print "\r" . (' ' x 50) . "\r";

$pm->wait_all_children;
print "Finished processing $file\n";

}

sub process_line {
my ($line, $file) = @_;
# Your regex processing logic here
}

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions