Skip to content

Commit d44b6e6

Browse files
committed
introduce RwLock to control access to local archive index files
1 parent 7dfb7f4 commit d44b6e6

File tree

3 files changed

+79
-56
lines changed

3 files changed

+79
-56
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
6464
derive_builder = "0.20.2"
6565

6666
# Async
67-
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
67+
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process", "sync"] }
6868
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
6969
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
7070
futures-util = "0.3.5"

src/config.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{cdn::CdnKind, storage::StorageKind};
22
use anyhow::{Context, Result, anyhow, bail};
3-
use std::{env::VarError, error::Error, path::PathBuf, str::FromStr, time::Duration};
3+
use std::{env::VarError, error::Error, io, path, path::PathBuf, str::FromStr, time::Duration};
44
use tracing::trace;
55
use url::Url;
66

@@ -209,10 +209,10 @@ impl Config {
209209
.cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?))
210210
.cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?)
211211
.cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?)
212-
.local_archive_cache_path(env(
212+
.local_archive_cache_path(ensure_absolute_path(env(
213213
"DOCSRS_ARCHIVE_INDEX_CACHE_PATH",
214214
prefix.join("archive_cache"),
215-
)?)
215+
)?)?)
216216
.compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?)
217217
.temp_dir(temp_dir)
218218
.rustwide_workspace(env(
@@ -235,6 +235,14 @@ impl Config {
235235
}
236236
}
237237

238+
fn ensure_absolute_path(path: PathBuf) -> io::Result<PathBuf> {
239+
if path.is_absolute() {
240+
Ok(path)
241+
} else {
242+
Ok(path::absolute(&path)?)
243+
}
244+
}
245+
238246
fn env<T>(var: &str, default: T) -> Result<T>
239247
where
240248
T: FromStr,

src/storage/mod.rs

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::{
2222
};
2323
use anyhow::anyhow;
2424
use chrono::{DateTime, Utc};
25+
use dashmap::DashMap;
2526
use fn_error_context::context;
2627
use futures_util::stream::BoxStream;
2728
use mime::Mime;
@@ -30,19 +31,23 @@ use std::{
3031
fmt,
3132
fs::{self, File},
3233
io::{self, BufReader},
34+
iter,
3335
num::ParseIntError,
3436
ops::RangeInclusive,
3537
path::{Path, PathBuf},
38+
str::FromStr,
3639
sync::Arc,
3740
};
38-
use std::{iter, str::FromStr};
3941
use tokio::{
4042
io::{AsyncRead, AsyncWriteExt},
4143
runtime,
44+
sync::RwLock,
4245
};
4346
use tracing::{error, info_span, instrument, trace};
4447
use walkdir::WalkDir;
4548

49+
static ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";
50+
4651
type FileRange = RangeInclusive<u64>;
4752

4853
#[derive(Debug, thiserror::Error)]
@@ -181,6 +186,8 @@ enum StorageBackend {
181186
pub struct AsyncStorage {
182187
backend: StorageBackend,
183188
config: Arc<Config>,
189+
/// Locks to synchronize access to the locally cached archive index files.
190+
locks: DashMap<PathBuf, Arc<RwLock<()>>>,
184191
}
185192

186193
impl AsyncStorage {
@@ -199,6 +206,7 @@ impl AsyncStorage {
199206
}
200207
},
201208
config,
209+
locks: DashMap::new(),
202210
})
203211
}
204212

@@ -313,12 +321,10 @@ impl AsyncStorage {
313321
path: &str,
314322
) -> Result<bool> {
315323
match self
316-
.download_archive_index(archive_path, latest_build_id)
324+
.find_in_archive_index(archive_path, latest_build_id, path)
317325
.await
318326
{
319-
Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path)
320-
.await?
321-
.is_some()),
327+
Ok(file_info) => Ok(file_info.is_some()),
322328
Err(err) => {
323329
if err.downcast_ref::<PathNotFoundError>().is_some() {
324330
Ok(false)
@@ -375,41 +381,67 @@ impl AsyncStorage {
375381
Ok(blob.decompress())
376382
}
377383

384+
fn local_index_cache_lock(&self, local_index_path: impl AsRef<Path>) -> Arc<RwLock<()>> {
385+
let local_index_path = local_index_path.as_ref().to_path_buf();
386+
387+
self.locks
388+
.entry(local_index_path)
389+
.or_insert_with(|| Arc::new(RwLock::new(())))
390+
.downgrade()
391+
.clone()
392+
}
393+
378394
#[instrument]
379-
pub(super) async fn download_archive_index(
395+
async fn find_in_archive_index(
380396
&self,
381397
archive_path: &str,
382398
latest_build_id: Option<BuildId>,
383-
) -> Result<PathBuf> {
384-
// remote/folder/and/x.zip.index
385-
let remote_index_path = format!("{archive_path}.index");
399+
path_in_archive: &str,
400+
) -> Result<Option<archive_index::FileInfo>> {
401+
// we know that config.local_archive_cache_path is an absolute path, not relative.
402+
// So it will be usable as key in the DashMap.
386403
let local_index_path = self.config.local_archive_cache_path.join(format!(
387-
"{archive_path}.{}.index",
404+
"{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}",
388405
latest_build_id.map(|id| id.0).unwrap_or(0)
389406
));
390407

391-
if !local_index_path.exists() {
392-
let index_content = self.get(&remote_index_path, usize::MAX).await?.content;
408+
let rwlock = self.local_index_cache_lock(&local_index_path);
393409

394-
tokio::fs::create_dir_all(
395-
local_index_path
396-
.parent()
397-
.ok_or_else(|| anyhow!("index path without parent"))?,
398-
)
399-
.await?;
410+
// directly acquire the read-lock, so the syscall (`path.exists()`) below is already
411+
// protected.
412+
let mut _read_guard = rwlock.read().await;
413+
414+
if !tokio::fs::try_exists(&local_index_path).await? {
415+
// upgrade the lock to a write-lock for downloading & storing the index.
416+
drop(_read_guard);
417+
let _write_guard = rwlock.write().await;
418+
419+
// check existance again in case of Race Condition (TOCTOU)
420+
if !tokio::fs::try_exists(&local_index_path).await? {
421+
// remote/folder/and/x.zip.index
422+
let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");
423+
424+
tokio::fs::create_dir_all(
425+
local_index_path
426+
.parent()
427+
.ok_or_else(|| anyhow!("index path without parent"))?,
428+
)
429+
.await?;
430+
431+
{
432+
let mut file = tokio::fs::File::create(&local_index_path).await?;
433+
let mut stream = self.get_stream(&remote_index_path).await?.content;
434+
435+
tokio::io::copy(&mut stream, &mut file).await?;
436+
437+
file.flush().await?;
438+
}
439+
}
400440

401-
// when we don't have a locally cached index and many parallel request
402-
// we might download the same archive index multiple times here.
403-
// So we're storing the content into a temporary file before renaming it
404-
// into the final location.
405-
let temp_path = tempfile::NamedTempFile::new_in(&self.config.local_archive_cache_path)?
406-
.into_temp_path();
407-
let mut file = tokio::fs::File::create(&temp_path).await?;
408-
file.write_all(&index_content).await?;
409-
tokio::fs::rename(temp_path, &local_index_path).await?;
441+
_read_guard = _write_guard.downgrade();
410442
}
411443

412-
Ok(local_index_path)
444+
archive_index::find_in_file(local_index_path, path_in_archive).await
413445
}
414446

415447
#[instrument]
@@ -420,11 +452,8 @@ impl AsyncStorage {
420452
path: &str,
421453
max_size: usize,
422454
) -> Result<Blob> {
423-
let index_filename = self
424-
.download_archive_index(archive_path, latest_build_id)
425-
.await?;
426-
427-
let info = archive_index::find_in_file(index_filename, path)
455+
let info = self
456+
.find_in_archive_index(archive_path, latest_build_id, path)
428457
.await?
429458
.ok_or(PathNotFoundError)?;
430459

@@ -454,11 +483,8 @@ impl AsyncStorage {
454483
latest_build_id: Option<BuildId>,
455484
path: &str,
456485
) -> Result<StreamingBlob> {
457-
let index_filename = self
458-
.download_archive_index(archive_path, latest_build_id)
459-
.await?;
460-
461-
let info = archive_index::find_in_file(index_filename, path)
486+
let info = self
487+
.find_in_archive_index(archive_path, latest_build_id, path)
462488
.await?
463489
.ok_or(PathNotFoundError)?;
464490

@@ -531,7 +557,7 @@ impl AsyncStorage {
531557
.await?;
532558

533559
let alg = CompressionAlgorithm::default();
534-
let remote_index_path = format!("{}.index", &archive_path);
560+
let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path);
535561
let compressed_index_content = {
536562
let _span = info_span!("create_archive_index", %remote_index_path).entered();
537563

@@ -843,17 +869,6 @@ impl Storage {
843869
.block_on(self.inner.get_range(path, max_size, range, compression))
844870
}
845871

846-
pub(super) fn download_index(
847-
&self,
848-
archive_path: &str,
849-
latest_build_id: Option<BuildId>,
850-
) -> Result<PathBuf> {
851-
self.runtime.block_on(
852-
self.inner
853-
.download_archive_index(archive_path, latest_build_id),
854-
)
855-
}
856-
857872
pub(crate) fn get_from_archive(
858873
&self,
859874
archive_path: &str,
@@ -1391,12 +1406,12 @@ mod backend_tests {
13911406
.inner
13921407
.config
13931408
.local_archive_cache_path
1394-
.join("folder/test.zip.0.index");
1409+
.join(format!("folder/test.zip.0.{ARCHIVE_INDEX_FILE_EXTENSION}"));
13951410

13961411
let (stored_files, compression_alg) =
13971412
storage.store_all_in_archive("folder/test.zip", dir.path())?;
13981413

1399-
assert!(storage.exists("folder/test.zip.index")?);
1414+
assert!(storage.exists(&format!("folder/test.zip.{ARCHIVE_INDEX_FILE_EXTENSION}"))?);
14001415

14011416
assert_eq!(compression_alg, CompressionAlgorithm::Bzip2);
14021417
assert_eq!(stored_files.len(), files.len());

0 commit comments

Comments
 (0)