Skip to content

Commit 5833984

Browse files
committed
dist: install while downloading
1 parent 9051b19 commit 5833984

File tree

2 files changed

+32
-34
lines changed

2 files changed

+32
-34
lines changed

src/dist/download.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl<'a> DownloadCfg<'a> {
117117
}
118118
}
119119

120-
pub(crate) fn clean(&self, hashes: &[String]) -> Result<()> {
120+
pub(crate) fn clean(&self, hashes: &[impl AsRef<Path>]) -> Result<()> {
121121
for hash in hashes.iter() {
122122
let used_file = self.download_dir.join(hash);
123123
if self.download_dir.join(&used_file).exists() {

src/dist/manifestation.rs

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ mod tests;
77
use std::path::Path;
88

99
use anyhow::{Context, Result, anyhow, bail};
10-
use futures_util::stream::StreamExt;
11-
use std::sync::Arc;
12-
use tokio::sync::Semaphore;
10+
use futures_util::stream::{FuturesUnordered, StreamExt};
1311
use tracing::{info, warn};
1412

1513
use crate::dist::component::{
@@ -153,8 +151,6 @@ impl Manifestation {
153151
}
154152

155153
// Download component packages and validate hashes
156-
let mut things_to_install = Vec::new();
157-
let mut things_downloaded = Vec::new();
158154
let components = update
159155
.components_urls_and_hashes(new_manifest)
160156
.map(|res| {
@@ -168,7 +164,6 @@ impl Manifestation {
168164
})
169165
.collect::<Result<Vec<_>>>()?;
170166

171-
let components_len = components.len();
172167
const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2;
173168
let concurrent_downloads = download_cfg
174169
.process
@@ -183,27 +178,6 @@ impl Manifestation {
183178
.and_then(|s| s.parse().ok())
184179
.unwrap_or(DEFAULT_MAX_RETRIES);
185180

186-
info!("downloading component(s)");
187-
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
188-
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
189-
let sem = semaphore.clone();
190-
async move {
191-
let _permit = sem.acquire().await.unwrap();
192-
bin.download(max_retries).await
193-
}
194-
});
195-
if components_len > 0 {
196-
let results = component_stream
197-
.buffered(components_len)
198-
.collect::<Vec<_>>()
199-
.await;
200-
for result in results {
201-
let (bin, downloaded_file) = result?;
202-
things_downloaded.push(bin.binary.hash.clone());
203-
things_to_install.push((bin, downloaded_file));
204-
}
205-
}
206-
207181
// Begin transaction
208182
let mut tx = Transaction::new(prefix.clone(), tmp_cx, download_cfg.process);
209183

@@ -242,15 +216,41 @@ impl Manifestation {
242216
tx = self.uninstall_component(component, new_manifest, tx, download_cfg.process)?;
243217
}
244218

245-
// Install components
246-
for (component_bin, installer_file) in things_to_install {
247-
tx = component_bin.install(installer_file, tx, self)?;
219+
let mut downloads = FuturesUnordered::new();
220+
let mut component_iter = components.iter();
221+
let mut cleanup_downloads = vec![];
222+
loop {
223+
if downloads.is_empty() && component_iter.len() == 0 {
224+
break;
225+
}
226+
227+
let (bin, downloaded) = match downloads.next().await {
228+
Some(Ok((bin, downloaded))) => (bin, downloaded),
229+
Some(Err(e)) => return Err(e),
230+
None => {
231+
if let Some(bin) = component_iter.next() {
232+
downloads.push(bin.download(max_retries));
233+
}
234+
continue;
235+
}
236+
};
237+
238+
while component_iter.len() > 0 && downloads.len() < concurrent_downloads {
239+
if let Some(bin) = component_iter.next() {
240+
downloads.push(bin.download(max_retries));
241+
}
242+
}
243+
244+
cleanup_downloads.push(&bin.binary.hash);
245+
tx = bin.install(downloaded, tx, self)?;
248246
}
249247

250248
// Install new distribution manifest
251249
let new_manifest_str = new_manifest.clone().stringify()?;
252250
tx.modify_file(rel_installed_manifest_path)?;
253251
utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?;
252+
download_cfg.clean(&cleanup_downloads)?;
253+
drop(downloads);
254254

255255
// Write configuration.
256256
//
@@ -271,8 +271,6 @@ impl Manifestation {
271271
// End transaction
272272
tx.commit();
273273

274-
download_cfg.clean(&things_downloaded)?;
275-
276274
Ok(UpdateStatus::Changed)
277275
}
278276

@@ -686,7 +684,7 @@ struct ComponentBinary<'a> {
686684
}
687685

688686
impl<'a> ComponentBinary<'a> {
689-
async fn download(self, max_retries: usize) -> Result<(Self, File)> {
687+
async fn download(&self, max_retries: usize) -> Result<(&Self, File)> {
690688
use tokio_retry::{RetryIf, strategy::FixedInterval};
691689

692690
let url = self.download_cfg.url(&self.binary.url)?;

0 commit comments

Comments
 (0)