Skip to content

Commit d07e03a

Browse files
authored
Remove extra data allocation used for MT generation (#837)
Make (a small) part of the replication the replication process sequential to avoid creating an extra `mmap` the size of a sector size. Decrease memory usage by 1x. Increase replication time (for parallel setups which will waste part of that parallelism) by a small fraction (encoding time still dominates).
1 parent 037d5fb commit d07e03a

File tree

8 files changed

+120
-170
lines changed

8 files changed

+120
-170
lines changed

README.md

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,7 @@ cargo build \
176176
--size 1048576
177177
```
178178

179-
To optimize even more for memory there's another option (used in addition to the `disk-trees` feature) to generate all MTs in sequential order, to make sure we can offload them to disk before we start buildding the next one,
180-
181-
```
182-
FIL_PROOFS_GENERATE_MERKLE_TREES_IN_PARALLEL=0
183-
```
184-
185-
You can inspect that it's working also in the replication log, where you'll see that the MTs are all generated in order without any layer index out of place.
186-
187-
All these optimizations (`disk-trees` with a directory to offload MTs plus sequential generation) should reduce the maximum RSS, in the `zigzag` example, to 3 times the sector size used (so in the above command that tested ZigZag with a 1 GiB sector the maximum RSS reported by commands like `/usr/bin/time -v` should not exceed 3 GiB, please submit an issue if you observe otherwise).
179+
This optimization should reduce the maximum RSS, in the `zigzag` example, to 1-2 times the sector size used (so in the above command that tested ZigZag with a 1 GiB sector the maximum RSS reported by commands like `/usr/bin/time -v` should not exceed 2 GiB, please submit an issue if you observe otherwise).
188180

189181
## Generate Documentation
190182

storage-proofs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ bench = false
1515
bitvec = "0.5"
1616
rand = "0.4"
1717
libc = "0.2"
18-
merkletree = "=0.9"
18+
merkletree = "=0.10"
1919
failure = "0.1"
2020
byteorder = "1"
2121
config = "0.9.3"

storage-proofs/src/drgraph.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use rayon::prelude::*;
77
use crate::error::*;
88
use crate::hasher::pedersen::PedersenHasher;
99
use crate::hasher::{Domain, Hasher};
10-
use crate::merkle::MerkleTree;
10+
use crate::merkle::{MerkleStore, MerkleTree};
1111
use crate::parameter_cache::ParameterSetMetadata;
1212
use crate::util::{data_at_node, NODE_SIZE};
1313
use merkletree::merkle::FromIndexedParallelIterator;
@@ -62,6 +62,16 @@ pub trait Graph<H: Hasher>: ::std::fmt::Debug + Clone + PartialEq + Eq {
6262
}
6363
}
6464

65+
/// Builds a merkle tree based on the given leaves store.
66+
// FIXME: Add the parallel case (if it's worth it).
67+
fn merkle_tree_from_leaves(
68+
&self,
69+
leaves: MerkleStore<H::Domain>,
70+
leafs_number: usize,
71+
) -> Result<MerkleTree<H::Domain, H::Function>> {
72+
Ok(MerkleTree::from_leaves_store(leaves, leafs_number))
73+
}
74+
6575
/// Returns the merkle tree depth.
6676
fn merkle_tree_depth(&self) -> u64 {
6777
graph_height(self.size()) as u64

storage-proofs/src/hasher/pedersen.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ mod tests {
319319

320320
use merkletree::hash::Hashable;
321321

322-
use crate::merkle::{MerkleTree, VecMerkleTree};
322+
use crate::merkle::MerkleTree;
323323

324324
#[test]
325325
fn test_path() {
@@ -335,8 +335,7 @@ mod tests {
335335
fn test_pedersen_hasher() {
336336
let values = ["hello", "world", "you", "two"];
337337

338-
let t = VecMerkleTree::<PedersenDomain, PedersenFunction>::from_data(values.iter());
339-
// Using `VecMerkleTree` since the `MmapStore` of `MerkleTree` doesn't support `Deref` (`as_slice`).
338+
let t = MerkleTree::<PedersenDomain, PedersenFunction>::from_data(values.iter());
340339

341340
assert_eq!(t.leafs(), 4);
342341

storage-proofs/src/hasher/sha256.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod tests {
1717
use std::fmt;
1818
use std::iter::FromIterator;
1919

20-
use crate::merkle::VecMerkleTree;
20+
use crate::merkle::MerkleTree;
2121
use merkletree::hash::{Algorithm, Hashable};
2222

2323
use super::super::{DigestDomain, Hasher};
@@ -109,8 +109,8 @@ mod tests {
109109

110110
let v: Vec<DigestDomain> = vec![h1.into(), h2.into(), h3.into()];
111111
let v2: Vec<DigestDomain> = vec![h1.into(), h2.into()];
112-
let t = VecMerkleTree::<<Sha256Hasher as Hasher>::Domain, <Sha256Hasher as Hasher>::Function>::from_iter(v);
113-
let t2 = VecMerkleTree::<
112+
let t = MerkleTree::<<Sha256Hasher as Hasher>::Domain, <Sha256Hasher as Hasher>::Function>::from_iter(v);
113+
let t2 = MerkleTree::<
114114
<Sha256Hasher as Hasher>::Domain,
115115
<Sha256Hasher as Hasher>::Function,
116116
>::from_iter(v2);

storage-proofs/src/layered_drgporep.rs

Lines changed: 95 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ use std::marker::PhantomData;
33
use std::sync::mpsc::channel;
44

55
use crossbeam_utils::thread;
6-
use memmap::MmapMut;
7-
use memmap::MmapOptions;
86
use rayon::prelude::*;
97
use serde::de::Deserialize;
108
use serde::ser::Serialize;
@@ -14,22 +12,15 @@ use crate::drgporep::{self, DrgPoRep};
1412
use crate::drgraph::Graph;
1513
use crate::error::{Error, Result};
1614
use crate::hasher::{Domain, HashFunction, Hasher};
17-
use crate::merkle::MerkleTree;
15+
use crate::merkle::{next_pow2, populate_leaves, MerkleStore, MerkleTree, Store};
1816
use crate::parameter_cache::ParameterSetMetadata;
1917
use crate::porep::{self, PoRep};
2018
use crate::proof::ProofScheme;
21-
use crate::settings;
19+
use crate::util::{data_at_node, NODE_SIZE};
2220
use crate::vde;
2321

2422
type Tree<H> = MerkleTree<<H as Hasher>::Domain, <H as Hasher>::Function>;
2523

26-
fn anonymous_mmap(len: usize) -> MmapMut {
27-
MmapOptions::new()
28-
.len(len)
29-
.map_anon()
30-
.expect("Failed to create memory map")
31-
}
32-
3324
#[derive(Debug, Clone, Serialize)]
3425
pub enum LayerChallenges {
3526
Fixed {
@@ -372,140 +363,110 @@ pub trait Layers {
372363
let mut taus = Vec::with_capacity(layers);
373364
let mut auxs: Vec<Tree<Self::Hasher>> = Vec::with_capacity(layers);
374365

375-
if !&settings::SETTINGS
376-
.lock()
377-
.unwrap()
378-
.generate_merkle_trees_in_parallel
379-
{
380-
let mut sorted_trees: Vec<_> = Vec::new();
381-
382-
(0..=layers).fold(graph.clone(), |current_graph, layer| {
383-
let tree_d = current_graph.merkle_tree(&data).unwrap();
384-
385-
info!("returning tree (layer: {})", layer);
386-
387-
sorted_trees.push(tree_d);
388-
389-
if layer < layers {
390-
info!("encoding (layer: {})", layer);
391-
vde::encode(&current_graph, replica_id, data)
392-
.expect("encoding failed in thread");
393-
}
394-
395-
Self::transform(&current_graph)
396-
});
397-
398-
sorted_trees
399-
.into_iter()
400-
.fold(None, |previous_comm_r: Option<_>, replica_tree| {
401-
let comm_r = replica_tree.root();
402-
// Each iteration's replica_tree becomes the next iteration's previous_tree (data_tree).
403-
// The first iteration has no previous_tree.
404-
if let Some(comm_d) = previous_comm_r {
405-
let tau = porep::Tau { comm_r, comm_d };
406-
// info!("setting tau/aux (layer: {})", i - 1);
407-
// FIXME: Use `enumerate` if this log is worth it.
408-
taus.push(tau);
409-
};
410-
411-
auxs.push(replica_tree);
412-
413-
Some(comm_r)
414-
});
415-
} else {
416-
// The parallel case is more complicated but should produce the same results as the
417-
// serial case. Note that to make lifetimes work out, we have to inline and tease apart
418-
// the definition of DrgPoRep::replicate. This is because as implemented, it entangles
419-
// encoding and merkle tree generation too tightly to be used as a subcomponent.
420-
// Instead, we need to create a scope which encloses all the work, spawning threads
421-
// for merkle tree generation and sending the results back to a channel.
422-
// The received results need to be sorted by layer because ordering of the completed results
423-
// is not guaranteed. Misordered results will be seen in practice when trees are small.
424-
425-
// The outer scope ensure that `tx` is dropped and closed before we read from `outer_rx`.
426-
// Otherwise, the read loop will block forever waiting for more input.
427-
let outer_rx = {
428-
let (tx, rx) = channel();
429-
430-
let errf = |e| {
431-
let err_string = format!("{:?}", e);
432-
error!(
433-
"MerkleTreeGenerationError: {} - {:?}",
434-
&err_string,
435-
failure::Backtrace::new()
436-
);
437-
Error::MerkleTreeGenerationError(err_string)
438-
};
439-
440-
let _ = thread::scope(|scope| -> Result<()> {
441-
let mut threads = Vec::with_capacity(layers + 1);
442-
(0..=layers).fold(graph.clone(), |current_graph, layer| {
443-
let mut data_copy = anonymous_mmap(data.len());
444-
data_copy[0..data.len()].clone_from_slice(data);
445-
446-
let return_channel = tx.clone();
447-
let (transfer_tx, transfer_rx) = channel::<Self::Graph>();
366+
// We need to create a scope which encloses all the work, spawning threads
367+
// for merkle tree generation and sending the results back to a channel.
368+
// The received results need to be sorted by layer because ordering of the completed results
369+
// is not guaranteed. Misordered results will be seen in practice when trees are small.
370+
371+
// The outer scope ensure that `tx` is dropped and closed before we read from `outer_rx`.
372+
// Otherwise, the read loop will block forever waiting for more input.
373+
let outer_rx = {
374+
let (tx, rx) = channel();
375+
376+
let errf = |e| {
377+
let err_string = format!("{:?}", e);
378+
error!(
379+
"MerkleTreeGenerationError: {} - {:?}",
380+
&err_string,
381+
failure::Backtrace::new()
382+
);
383+
Error::MerkleTreeGenerationError(err_string)
384+
};
448385

449-
transfer_tx
450-
.send(current_graph.clone())
386+
let _ = thread::scope(|scope| -> Result<()> {
387+
let mut threads = Vec::with_capacity(layers + 1);
388+
(0..=layers).fold(graph.clone(), |current_graph, layer| {
389+
let leafs = data.len() / NODE_SIZE;
390+
assert_eq!(data.len() % NODE_SIZE, 0);
391+
let pow = next_pow2(leafs);
392+
// FIXME: Who's actually responsible for ensuring power of 2
393+
// sector sizes?
394+
395+
let mut leaves_store = MerkleStore::new(pow);
396+
397+
populate_leaves::<
398+
_,
399+
<Self::Hasher as Hasher>::Function,
400+
_,
401+
std::iter::Map<_, _>,
402+
>(&mut leaves_store, (0..leafs).map(|i| {
403+
let d = data_at_node(&data, i).expect("data_at_node math failed");
404+
<Self::Hasher as Hasher>::Domain::try_from_bytes(d)
405+
.expect("failed to convert node data to domain element")
406+
}));
407+
let return_channel = tx.clone();
408+
let (transfer_tx, transfer_rx) = channel::<Self::Graph>();
409+
410+
transfer_tx
411+
.send(current_graph.clone())
412+
.expect("Failed to send value through channel");
413+
414+
let thread = scope.spawn(move |_| {
415+
// If we panic anywhere in this closure, thread.join() below will receive an error —
416+
// so it is safe to unwrap.
417+
let graph = transfer_rx
418+
.recv()
419+
.expect("Failed to receive value through channel");
420+
421+
let tree_d =
422+
graph.merkle_tree_from_leaves(leaves_store, leafs).unwrap();
423+
424+
info!("returning tree (layer: {})", layer);
425+
return_channel
426+
.send((layer, tree_d))
451427
.expect("Failed to send value through channel");
452-
453-
let thread = scope.spawn(move |_| {
454-
// If we panic anywhere in this closure, thread.join() below will receive an error —
455-
// so it is safe to unwrap.
456-
let graph = transfer_rx
457-
.recv()
458-
.expect("Failed to receive value through channel");
459-
460-
let tree_d = graph.merkle_tree(&data_copy).unwrap();
461-
462-
info!("returning tree (layer: {})", layer);
463-
return_channel
464-
.send((layer, tree_d))
465-
.expect("Failed to send value through channel");
466-
});
467-
468-
threads.push(thread);
469-
470-
if layer < layers {
471-
info!("encoding (layer: {})", layer);
472-
vde::encode(&current_graph, replica_id, data)
473-
.expect("encoding failed in thread");
474-
}
475-
Self::transform(&current_graph)
476428
});
477429

478-
for thread in threads {
479-
thread.join().map_err(errf)?;
430+
threads.push(thread);
431+
432+
if layer < layers {
433+
info!("encoding (layer: {})", layer);
434+
vde::encode(&current_graph, replica_id, data)
435+
.expect("encoding failed in thread");
480436
}
437+
Self::transform(&current_graph)
438+
});
481439

482-
Ok(())
483-
})
484-
.map_err(errf)?;
440+
for thread in threads {
441+
thread.join().map_err(errf)?;
442+
}
485443

486-
rx
487-
};
444+
Ok(())
445+
})
446+
.map_err(errf)?;
488447

489-
let mut sorted_trees = outer_rx.iter().collect::<Vec<_>>();
490-
sorted_trees.sort_unstable_by_key(|k| k.0);
448+
rx
449+
};
491450

492-
sorted_trees
493-
.into_iter()
494-
.fold(None, |previous_comm_r: Option<_>, (i, replica_tree)| {
495-
let comm_r = replica_tree.root();
496-
// Each iteration's replica_tree becomes the next iteration's previous_tree (data_tree).
497-
// The first iteration has no previous_tree.
498-
if let Some(comm_d) = previous_comm_r {
499-
let tau = porep::Tau { comm_r, comm_d };
500-
info!("setting tau/aux (layer: {})", i - 1);
501-
taus.push(tau);
502-
};
451+
let mut sorted_trees = outer_rx.iter().collect::<Vec<_>>();
452+
sorted_trees.sort_unstable_by_key(|k| k.0);
503453

504-
auxs.push(replica_tree);
454+
sorted_trees
455+
.into_iter()
456+
.fold(None, |previous_comm_r: Option<_>, (i, replica_tree)| {
457+
let comm_r = replica_tree.root();
458+
// Each iteration's replica_tree becomes the next iteration's previous_tree (data_tree).
459+
// The first iteration has no previous_tree.
460+
if let Some(comm_d) = previous_comm_r {
461+
let tau = porep::Tau { comm_r, comm_d };
462+
info!("setting tau/aux (layer: {})", i - 1);
463+
taus.push(tau);
464+
};
505465

506-
Some(comm_r)
507-
});
508-
}
466+
auxs.push(replica_tree);
467+
468+
Some(comm_r)
469+
});
509470

510471
Ok((taus, auxs))
511472
}

storage-proofs/src/merkle.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,22 @@ use std::marker::PhantomData;
55
// Reexport here, so we don't depend on merkletree directly in other places.
66
use merkletree::hash::Algorithm;
77
use merkletree::merkle;
8-
#[cfg(not(feature = "disk-trees"))]
9-
use merkletree::merkle::MmapStore;
10-
use merkletree::merkle::VecStore;
118
use merkletree::proof;
129
use paired::bls12_381::Fr;
1310

1411
use crate::hasher::{Domain, Hasher};
1512

16-
// `mmap`ed `MerkleTree` (replacing the previously `Vec`-backed
17-
// `MerkleTree`, now encapsulated in `merkle::VecStore` and exposed
18-
// as `VecMerkleTree`).
13+
pub use merkletree::merkle::{next_pow2, populate_leaves, Store};
14+
pub type MmapStore<E> = merkletree::merkle::MmapStore<E>;
1915
pub type DiskStore<E> = merkletree::merkle::DiskStore<E>;
20-
pub type VecMerkleTree<T, A> = merkle::MerkleTree<T, A, VecStore<T>>;
2116
#[cfg(feature = "disk-trees")]
2217
pub type MerkleTree<T, A> = merkle::MerkleTree<T, A, DiskStore<T>>;
2318
#[cfg(not(feature = "disk-trees"))]
2419
pub type MerkleTree<T, A> = merkle::MerkleTree<T, A, MmapStore<T>>;
20+
#[cfg(feature = "disk-trees")]
21+
pub type MerkleStore<T> = DiskStore<T>;
22+
#[cfg(not(feature = "disk-trees"))]
23+
pub type MerkleStore<T> = MmapStore<T>;
2524

2625
/// Representation of a merkle proof.
2726
/// Each element in the `path` vector consists of a tuple `(hash, is_right)`, with `hash` being the the hash of the node at the current level and `is_right` a boolean indicating if the path is taking the right path.

0 commit comments

Comments
 (0)