Skip to content

Conversation

@orxfun
Copy link
Owner

@orxfun orxfun commented Oct 20, 2025

Changes

Recursive Parallel Iterators

IntoParIterRec trait can be used to create a parallel recursive iterator over an initial set of elements which is useful when working with non-linear data structures such as trees and graphs.

Consider, for instance, a tree which is defined by the following node struct:

pub struct Node<T> {
    pub data: T,
    pub children: Vec<Node<T>>,
}

Assume that we want to map all the data with map: impl Fn(T) -> u64 and compute the sum of mapped values of all nodes descending from a root: &Node.

We can express this computation and execute in parallel with the following:

fn extend<'a>(node: &&'a Node, queue: &Queue<&'a Node>) {
    queue.extend(&node.children);
}

[root].into_par_rec(extend).map(map).sum()

Instead of into_par, we use into_par_rec and provide extend function as its argument. This function defines the recursive extension of the parallel iterator such that every time we process a node we first add its children to the queue. Queue is the queue of elements to be processed and it exposes two growth methods to define the recursive extension: push and extend.

Although we create the parallel iterator differently, we get a ParIter. Therefore, we have access to all features of a regular parallel iterator.

For instance, assume we want to filter nodes first. Further, instead of summing up the mapped values, we need to collect them in a vector. We can express this computation just as we would do on a linear data structure:

[root].into_par_rec(extend).filter(filter).map(map).collect()

For more details, you may see the parallelization_on_tree example.

Diagnostics

ParallelExecutorWithDiagnostics executor is created. Any parallel executor can be converted into one with diagnostics. This executor is meant to be used for testing parallel computations and understand the distribution of the workload to threads. During the parallel computation, it collects diagnostics about:

  • how many threads are used for the parallel computation
  • how many times each thread received a tasks
  • average chunk size; i.e., average number of tasks, that each thread received
  • and finally, explicit chunk sizes for the first task assignments.

These metrics are printed on the stdout once the parallel computation is completed. Therefore, it is not meant to be used for production.

Running a parallel computation with diagnostics is convenient.

let sum = range
    .par()
    .with_runner(DefaultRunner::default().with_diagnostics()) // this line enables diagnostics
    .map(|x| x + 1)
    .filter(|x| x.is_multiple_of(2))
    .sum();

Related Issues

Not exactly fixes but provides a solution to #104 with probably a different approach than intended. Please also see the related computation experiments.

edit after second iteration

Fixes #104

Thanks to @davidlattimore for suggestions and feedback on the api.

orxfun added 30 commits October 7, 2025 15:36
@orxfun orxfun marked this pull request as ready for review October 26, 2025 19:48
@orxfun orxfun merged commit a857613 into main Nov 2, 2025
3 checks passed
@orxfun orxfun deleted the special-termination-condition branch November 2, 2025 20:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for scopes

2 participants