Skip to content

Commit a857613

Browse files
authored
Merge pull request #120 from orxfun/special-termination-condition
Recursive Parallel Iteration
2 parents 1b0bf95 + f0e861e commit a857613

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2063
-60
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
strategy:
1717
matrix:
1818
toolchain: ["stable"]
19-
features: ["", "--features generic_iterator"]
19+
features: ["", "--all-features", "--no-default-features"]
2020

2121
steps:
2222
- uses: actions/checkout@v4

Cargo.toml

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "orx-parallel"
3-
version = "3.3.0"
3+
version = "3.4.0"
44
edition = "2024"
55
authors = ["orxfun <orx.ugur.arikan@gmail.com>"]
66
readme = "README.md"
@@ -11,16 +11,17 @@ keywords = ["parallel", "concurrency", "performance", "thread", "iterator"]
1111
categories = ["concurrency", "algorithms"]
1212

1313
[dependencies]
14-
orx-pinned-vec = { version = "3.17.0", default-features = false }
15-
orx-fixed-vec = { version = "3.19.0", default-features = false }
16-
orx-split-vec = { version = "3.19.0", default-features = false }
17-
orx-concurrent-iter = { version = "3.1.0", default-features = false }
18-
orx-concurrent-bag = { version = "3.1.0", default-features = false }
19-
orx-concurrent-ordered-bag = { version = "3.1.0", default-features = false }
14+
orx-pinned-vec = { version = "3.21.0", default-features = false }
15+
orx-fixed-vec = { version = "3.22.0", default-features = false }
16+
orx-split-vec = { version = "3.22.0", default-features = false }
17+
orx-concurrent-iter = { version = "3.3.0", default-features = false }
18+
orx-concurrent-bag = { version = "3.4.0", default-features = false }
19+
orx-concurrent-ordered-bag = { version = "3.4.0", default-features = false }
20+
orx-pinned-concurrent-col = { version = "2.18.0", default-features = false }
2021
orx-iterable = { version = "1.3.0", default-features = false }
21-
orx-pinned-concurrent-col = { version = "2.15.0", default-features = false }
2222
orx-priority-queue = { version = "1.7.0", default-features = false }
2323
orx-pseudo-default = { version = "2.1.0", default-features = false }
24+
orx-concurrent-recursive-iter = { version = "2.0.0", default-features = false }
2425

2526
# optional: generic iterator
2627
rayon = { version = "1.11.0", optional = true, default-features = false }
@@ -35,10 +36,10 @@ yastl = { version = "0.1.2", optional = true, default-features = false }
3536

3637
[dev-dependencies]
3738
chrono = "0.4.42"
38-
clap = { version = "4.5.47", features = ["derive"] }
39+
clap = { version = "4.5.50", features = ["derive"] }
3940
criterion = "0.7.0"
4041
orx-concurrent-option = { version = "1.5.0", default-features = false }
41-
orx-concurrent-vec = "3.8.0"
42+
orx-concurrent-vec = "3.10.0"
4243
rand = "0.9.2"
4344
rand_chacha = "0.9"
4445
rayon = "1.11.0"

README.md

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
* [Parallel Computation by Iterators](#parallel-computation-by-iterators)
1010
* [Parallelizable Collections](#parallelizable-collections)
11+
* [Parallelization over Nonlinear Data Structures](#parallelization-over-nonlinear-data-structures)
1112
* [Performance and Benchmarks](#performance-and-benchmarks)
1213
* [Fallible Parallel Iterators](#fallible-parallel-iterators)
1314
* [Using Mutable Variables](#using-mutable-variables)
@@ -150,8 +151,47 @@ The following table demonstrates these methods for the `HashSet`; however, they
150151

151152
Note that each approach can be more efficient in different scenarios. For large elements, (ii) might be preferred to avoid allocation of the vector. For insignificant tasks to be performed on each element, (i) might be preferred to take full benefit of vector-specific optimizations.
152153

154+
## Parallelization over Nonlinear Data Structures
155+
156+
[IntoParIterRec](https://docs.rs/orx-parallel/latest/orx_parallel/trait.IntoParIterRec.html) trait can be used to create a **parallel recursive iterator** over an initial set of elements which is useful when working with non-linear data structures such as **trees** and **graphs**.
157+
158+
Consider, for instance, a tree which is defined by the following node struct:
159+
160+
```rust ignore
161+
pub struct Node<T> {
162+
pub data: T,
163+
pub children: Vec<Node<T>>,
164+
}
165+
```
166+
167+
Assume that we want to map all the data with `map: impl Fn(T) -> u64` and compute the sum of mapped values of all nodes descending from a `root: &Node`.
168+
169+
We can express this computation and execute in parallel with the following:
170+
171+
```rust ignore
172+
fn extend<'a>(node: &&'a Node, queue: &Queue<&'a Node>) {
173+
queue.extend(&node.children);
174+
}
175+
176+
[root].into_par_rec(extend).map(map).sum()
177+
```
178+
179+
Instead of `into_par`, we use `into_par_rec` and provide `extend` function as its argument. This function defines the recursive extension of the parallel iterator such that every time we process a `node` we first add its children to the `queue`. [`Queue`](https://docs.rs/orx-concurrent-recursive-iter/latest/orx_concurrent_recursive_iter/struct.Queue.html) is the queue of elements to be processed and it exposes two growth methods to define the recursive extension: `push` and `extend`.
180+
181+
Although we create the parallel iterator differently, we get a `ParIter`. Therefore, we have access to all features of a regular parallel iterator.
182+
183+
For instance, assume we want to filter nodes first. Further, instead of summing up the mapped values, we need to collect them in a vector. We can express this computation just as we would do on a linear data structure:
184+
185+
```rust ignore
186+
[root].into_par_rec(extend).filter(filter).map(map).collect()
187+
```
188+
189+
For more details, you may see the [parallelization_on_tree](https://github.com/orxfun/orx-parallel/blob/main/examples/parallelization_on_tree) example.
190+
153191
## Performance and Benchmarks
154192

193+
*Please also see [impact of ChunkSize on performance](#impact-of-chunksize-on-performance) section.*
194+
155195
You may find some sample parallel programs in [examples](https://github.com/orxfun/orx-parallel/blob/main/examples) directory. These examples allow to express parallel computations as iterator method compositions and run quick experiments with different approaches. Examples use `GenericIterator`. As the name suggests, it is a generalization of sequential iterator, rayon's parallel iterator and orx-parallel's parallel iterator, and hence, allows for convenient experiments. You may play with the code, update the tested computations and run these examples by including **generic_iterator** feature, such as:
156196

157197
`cargo run --release --features generic_iterator --example benchmark_collect -- --len 123456 --num-repetitions 10`
@@ -419,6 +459,26 @@ This is guaranteed by the fact that both consuming computation calls and configu
419459

420460
Additionally, maximum number of threads that can be used by parallel computations can be globally bounded by the environment variable `ORX_PARALLEL_MAX_NUM_THREADS`. Please see the corresponding [example](https://github.com/orxfun/orx-parallel/blob/main/examples/max_num_threads_config.rs) for details.
421461

462+
### Impact of `ChunkSize` on Performance
463+
464+
The impact of the chunk size on performance might be significant.
465+
466+
Our objective is to minimize the sum of two computational costs:
467+
* parallelization overhead => it gets smaller as chunk size gets greater
468+
* cost of heterogeneity => it gets larger as chunk size gets greater
469+
470+
Parallelization overhead can further be divided into two:
471+
* concurrent state update: This often corresponds to one atomic update per chunk. It may be significant if our computation is very small such as `input.par().sum()`. Otherwise, cost of atomic update could be negligible.
472+
* false sharing: This is relevant only if we are writing results. For instance, when we are one-to-one mapping an input and collecting the results such as `input.par().map(|x| x.to_string()).collect()`, or if are writing with mut references such as `input.par().for_each(|x| *x += 1)`. Here, the performance might suffer from false sharing when the `chunk size × size of output item` is not large enough. You may also see [false sharing](https://docs.rs/orx-concurrent-bag/latest/orx_concurrent_bag/#false-sharing) section for `ConcurrentBag`.
473+
474+
In either case, when computation on each item is sufficiently long, parallelization overhead is negligible. Here, we want to make sure that we do not have heterogeneity cost. Therefore, a safe chunk size choice would be one, `par.chunk_size(1)`.
475+
476+
Otherwise, our choice depends on the use case. As a rule of thumb, we want a chunk size that is **just large enough** to mitigate the parallelization overhead but not larger so that we do not suffer from heterogeneity.
477+
478+
The default configuration `par.chunk_size(ChunkSize::Auto)` or `par.chunk_size(0)` uses a heuristic to solve this tradeoff. A difficult case for the current version is when the tasks are significantly heterogeneous (see the [discussion](https://github.com/orxfun/orx-parallel/discussions/26) for future improvements).
479+
480+
As described above, the **best way to deal with heterogeneity** is to have `par.chunk_size(1)`. You may of course test larger chunk sizes to optimize the computation for your data.
481+
422482

423483
## Runner: Pools and Executors
424484

@@ -459,9 +519,13 @@ let inputs: Vec<_> = (0..42).collect();
459519
let sum = inputs.par().sum();
460520

461521
// equivalent to:
462-
let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
463-
assert_eq!(sum, sum2);
522+
#[cfg(feature = "std")]
523+
{
524+
let sum2 = inputs.par().with_pool(StdDefaultPool::default()).sum();
525+
assert_eq!(sum, sum2);
526+
}
464527

528+
#[cfg(not(miri))]
465529
#[cfg(feature = "scoped_threadpool")]
466530
{
467531
let mut pool = scoped_threadpool::Pool::new(8);
@@ -470,6 +534,7 @@ assert_eq!(sum, sum2);
470534
assert_eq!(sum, sum2);
471535
}
472536

537+
#[cfg(not(miri))]
473538
#[cfg(feature = "rayon-core")]
474539
{
475540
let pool = rayon_core::ThreadPoolBuilder::new()
@@ -481,6 +546,7 @@ assert_eq!(sum, sum2);
481546
assert_eq!(sum, sum2);
482547
}
483548

549+
#[cfg(not(miri))]
484550
#[cfg(feature = "yastl")]
485551
{
486552
let pool = YastlPool::new(8);

benches/rec_iter_map_collect.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
2+
use orx_concurrent_recursive_iter::Queue;
3+
use orx_parallel::*;
4+
use orx_split_vec::SplitVec;
5+
use rand::prelude::*;
6+
use rand_chacha::ChaCha8Rng;
7+
use std::hint::black_box;
8+
9+
fn fibonacci(n: u64, work: usize) -> u64 {
10+
(7..(work + 7))
11+
.map(|j| {
12+
let n = black_box((n + j as u64) % 100);
13+
let mut a = 0;
14+
let mut b = 1;
15+
for _ in 0..n {
16+
let c = a + b;
17+
a = b;
18+
b = c;
19+
}
20+
a
21+
})
22+
.sum()
23+
}
24+
25+
struct Node {
26+
value: Vec<u64>,
27+
children: Vec<Node>,
28+
}
29+
30+
impl Node {
31+
fn new(mut n: u32, rng: &mut impl Rng) -> Self {
32+
let mut children = Vec::new();
33+
if n < 5 {
34+
for _ in 0..n {
35+
children.push(Node::new(0, rng));
36+
}
37+
} else {
38+
while n > 0 {
39+
let n2 = rng.random_range(0..=n);
40+
children.push(Node::new(n2, rng));
41+
n -= n2;
42+
}
43+
}
44+
Self {
45+
value: (0..rng.random_range(1..500))
46+
.map(|_| rng.random_range(0..40))
47+
.collect(),
48+
children,
49+
}
50+
}
51+
52+
fn seq_num_nodes(&self) -> usize {
53+
1 + self
54+
.children
55+
.iter()
56+
.map(|node| node.seq_num_nodes())
57+
.sum::<usize>()
58+
}
59+
60+
fn seq(&self, work: usize, numbers: &mut Vec<u64>) {
61+
numbers.extend(self.value.iter().map(|x| fibonacci(*x, work)));
62+
for c in &self.children {
63+
c.seq(work, numbers);
64+
}
65+
}
66+
}
67+
68+
// alternatives
69+
70+
fn seq(roots: &[Node], work: usize) -> Vec<u64> {
71+
let mut result = vec![];
72+
for root in roots {
73+
root.seq(work, &mut result);
74+
}
75+
result
76+
}
77+
78+
fn orx_lazy_unknown_chunk1024(roots: &[Node], work: usize) -> SplitVec<u64> {
79+
fn extend<'a>(node: &&'a Node, queue: &Queue<&'a Node>) {
80+
queue.extend(&node.children);
81+
}
82+
83+
roots
84+
.into_par_rec(extend)
85+
.chunk_size(1024)
86+
.flat_map(|x| x.value.iter().map(|x| fibonacci(*x, work)))
87+
.collect()
88+
}
89+
90+
fn orx_lazy_exact(roots: &[Node], work: usize, num_nodes: usize) -> SplitVec<u64> {
91+
fn extend<'a>(node: &&'a Node, queue: &Queue<&'a Node>) {
92+
queue.extend(&node.children);
93+
}
94+
95+
roots
96+
.into_par_rec_exact(extend, num_nodes)
97+
.flat_map(|x| x.value.iter().map(|x| fibonacci(*x, work)))
98+
.collect()
99+
}
100+
101+
fn orx_linearized(roots: &[Node], work: usize) -> SplitVec<u64> {
102+
fn extend<'a>(node: &&'a Node, queue: &Queue<&'a Node>) {
103+
queue.extend(&node.children);
104+
}
105+
106+
roots
107+
.into_par_rec(extend)
108+
.linearize()
109+
.flat_map(|x| x.value.iter().map(|x| fibonacci(*x, work)))
110+
.collect()
111+
}
112+
113+
fn run(c: &mut Criterion) {
114+
let treatments = [1, 10, 25];
115+
let mut group = c.benchmark_group("rec_iter_map_collect");
116+
let mut rng = ChaCha8Rng::seed_from_u64(42);
117+
let roots = vec![
118+
Node::new(5000, &mut rng),
119+
Node::new(2000, &mut rng),
120+
Node::new(4000, &mut rng),
121+
];
122+
123+
let num_nodes: usize = roots.iter().map(|x| x.seq_num_nodes()).sum();
124+
125+
for work in &treatments {
126+
let mut expected = seq(&roots, *work);
127+
expected.sort();
128+
129+
group.bench_with_input(BenchmarkId::new("seq", work), work, |b, _| {
130+
let mut result = seq(&roots, *work);
131+
result.sort();
132+
assert_eq!(&expected, &result);
133+
b.iter(|| seq(&roots, *work))
134+
});
135+
136+
group.bench_with_input(BenchmarkId::new("orx_lazy_exact", work), work, |b, _| {
137+
let mut result = orx_lazy_exact(&roots, *work, num_nodes).to_vec();
138+
result.sort();
139+
assert_eq!(&expected, &result);
140+
b.iter(|| orx_lazy_exact(&roots, *work, num_nodes))
141+
});
142+
143+
group.bench_with_input(
144+
BenchmarkId::new("orx_lazy_unknown_chunk1024", work),
145+
work,
146+
|b, _| {
147+
let mut result = orx_lazy_unknown_chunk1024(&roots, *work).to_vec();
148+
result.sort();
149+
assert_eq!(&expected, &result);
150+
b.iter(|| orx_lazy_unknown_chunk1024(&roots, *work))
151+
},
152+
);
153+
154+
group.bench_with_input(BenchmarkId::new("orx_linearized", work), work, |b, _| {
155+
let mut result = orx_linearized(&roots, *work).to_vec();
156+
result.sort();
157+
assert_eq!(&expected, &result);
158+
b.iter(|| orx_linearized(&roots, *work))
159+
});
160+
}
161+
162+
group.finish();
163+
}
164+
165+
criterion_group!(benches, run);
166+
criterion_main!(benches);

0 commit comments

Comments
 (0)