Skip to content

Commit 335e83c

Browse files
author
J-Loudet
authored
refactor(storage-manager): move prefix related functions in crate (#1325)
This change is motivated by the refactor of the Replication feature. In order to exchange metadata that can be processed by all Replicas, the key expressions associated with the data stored must be prefixed (when sent) and stripped (when received). This commit exposes two functions, at the `zenoh-plugin-storage-manager` crate, that perform these operations. The objective is to reuse these functions in the Replication refactor and, as we intend to move the Replication in its own crate, exposing them at the crate level makes it easier to then import them. * plugins/zenoh-plugin-storage-manager/src/lib.rs: - moved there the `strip_prefix` function, - moved there the `get_prefixed` function and renamed it to `prefix`. * plugins/zenoh-plugin-storage-manager/src/replica/mod.rs: updated the call to the previously named `get_prefixed` function. * plugins/zenoh-plugin-storage-manager/src/replica/storage.rs: - removed the `strip_prefix` method, - removed the `prefix` function, - updated the call to `strip_prefix` and `get_prefixed`.
1 parent 7f7d648 commit 335e83c

File tree

3 files changed

+89
-61
lines changed

3 files changed

+89
-61
lines changed

plugins/zenoh-plugin-storage-manager/src/lib.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use std::{
2323
collections::HashMap,
2424
convert::TryFrom,
25+
str::FromStr,
2526
sync::{Arc, Mutex},
2627
};
2728

@@ -35,7 +36,7 @@ use zenoh::{
3536
runtime::Runtime,
3637
zlock, LibLoader,
3738
},
38-
key_expr::{keyexpr, KeyExpr},
39+
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
3940
prelude::Wait,
4041
session::Session,
4142
Result as ZResult,
@@ -408,3 +409,65 @@ fn with_extended_string<R, F: FnMut(&mut String) -> R>(
408409
prefix.truncate(prefix_len);
409410
result
410411
}
412+
413+
/// Returns the key expression stripped of the provided prefix.
414+
///
415+
/// If no prefix is provided this function returns the key expression untouched.
416+
///
417+
/// If `None` is returned, it indicates that the key expression is equal to the prefix.
418+
///
419+
/// This function will internally call [strip_prefix], see its documentation for possible outcomes.
420+
///
421+
/// # Errors
422+
///
423+
/// This function will return an error if:
424+
/// - The provided prefix contains a wildcard.
425+
/// NOTE: The configuration of a Storage is checked and will reject any prefix that contains a
426+
/// wildcard. In theory, this error should never occur.
427+
/// - The key expression is not prefixed by the provided prefix.
428+
/// - The resulting stripped key is not a valid key expression (this should, in theory, never
429+
/// happen).
430+
///
431+
/// [strip_prefix]: zenoh::key_expr::keyexpr::strip_prefix()
432+
pub fn strip_prefix(
433+
maybe_prefix: Option<&OwnedKeyExpr>,
434+
key_expr: &KeyExpr<'_>,
435+
) -> ZResult<Option<OwnedKeyExpr>> {
436+
match maybe_prefix {
437+
None => Ok(Some(key_expr.clone().into())),
438+
Some(prefix) => {
439+
if prefix.is_wild() {
440+
bail!(
441+
"Prefix < {} > contains a wild character (\"**\" or \"*\")",
442+
prefix
443+
);
444+
}
445+
446+
match key_expr.strip_prefix(prefix).as_slice() {
447+
[stripped_key_expr] => {
448+
if stripped_key_expr.is_empty() {
449+
return Ok(None);
450+
}
451+
452+
OwnedKeyExpr::from_str(stripped_key_expr).map(Some)
453+
}
454+
_ => bail!("Failed to strip prefix < {} > from: {}", prefix, key_expr),
455+
}
456+
}
457+
}
458+
}
459+
460+
/// Returns the key with an additional prefix, if one was provided.
461+
///
462+
/// If no prefix is provided, this function returns `maybe_stripped_key`.
463+
///
464+
/// If a prefix is provided, this function returns the concatenation of both.
465+
pub fn prefix(
466+
maybe_prefix: Option<&OwnedKeyExpr>,
467+
maybe_stripped_key: &OwnedKeyExpr,
468+
) -> OwnedKeyExpr {
469+
match maybe_prefix {
470+
Some(prefix) => prefix / maybe_stripped_key,
471+
None => maybe_stripped_key.clone(),
472+
}
473+
}

plugins/zenoh-plugin-storage-manager/src/replica/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,7 @@ impl Replica {
9494
}
9595
} else {
9696
result.push((
97-
StorageService::get_prefixed(
98-
&storage_config.strip_prefix,
99-
&entry.0.unwrap().into(),
100-
),
97+
crate::prefix(storage_config.strip_prefix.as_ref(), &entry.0.unwrap()),
10198
entry.1,
10299
));
103100
}

plugins/zenoh-plugin-storage-manager/src/replica/storage.rs

Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use tokio::sync::{Mutex, RwLock};
2525
use zenoh::{
2626
bytes::EncodingBuilderTrait,
2727
internal::{
28-
bail,
2928
buffers::{SplitBuffer, ZBuf},
3029
zenoh_home, Timed, TimedEvent, Timer, Value,
3130
},
@@ -39,7 +38,6 @@ use zenoh::{
3938
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait},
4039
session::{Session, SessionDeclarations},
4140
time::{Timestamp, NTP64},
42-
Result as ZResult,
4341
};
4442
use zenoh_backend_traits::{
4543
config::{GarbageCollectionConfig, StorageConfig},
@@ -342,7 +340,10 @@ impl StorageService {
342340
}
343341
};
344342

345-
let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) {
343+
let stripped_key = match crate::strip_prefix(
344+
self.strip_prefix.as_ref(),
345+
sample_to_store.key_expr(),
346+
) {
346347
Ok(stripped) => stripped,
347348
Err(e) => {
348349
tracing::error!("{}", e);
@@ -463,13 +464,14 @@ impl StorageService {
463464
if weight.is_some() && weight.unwrap().data.timestamp > *ts {
464465
// if the key matches a wild card update, check whether it was saved in storage
465466
// remember that wild card updates change only existing keys
466-
let stripped_key = match self.strip_prefix(&key_expr.into()) {
467-
Ok(stripped) => stripped,
468-
Err(e) => {
469-
tracing::error!("{}", e);
470-
break;
471-
}
472-
};
467+
let stripped_key =
468+
match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
469+
Ok(stripped) => stripped,
470+
Err(e) => {
471+
tracing::error!("{}", e);
472+
break;
473+
}
474+
};
473475
let mut storage = self.storage.lock().await;
474476
match storage.get(stripped_key, "").await {
475477
Ok(stored_data) => {
@@ -498,7 +500,7 @@ impl StorageService {
498500
async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool {
499501
// @TODO: if cache exists, read from there
500502
let mut storage = self.storage.lock().await;
501-
let stripped_key = match self.strip_prefix(&key_expr.into()) {
503+
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
502504
Ok(stripped) => stripped,
503505
Err(e) => {
504506
tracing::error!("{}", e);
@@ -529,14 +531,15 @@ impl StorageService {
529531
let matching_keys = self.get_matching_keys(q.key_expr()).await;
530532
let mut storage = self.storage.lock().await;
531533
for key in matching_keys {
532-
let stripped_key = match self.strip_prefix(&key.clone().into()) {
533-
Ok(k) => k,
534-
Err(e) => {
535-
tracing::error!("{}", e);
536-
// @TODO: return error when it is supported
537-
return;
538-
}
539-
};
534+
let stripped_key =
535+
match crate::strip_prefix(self.strip_prefix.as_ref(), &key.clone().into()) {
536+
Ok(k) => k,
537+
Err(e) => {
538+
tracing::error!("{}", e);
539+
// @TODO: return error when it is supported
540+
return;
541+
}
542+
};
540543
match storage.get(stripped_key, q.parameters().as_str()).await {
541544
Ok(stored_data) => {
542545
for entry in stored_data {
@@ -561,7 +564,7 @@ impl StorageService {
561564
}
562565
drop(storage);
563566
} else {
564-
let stripped_key = match self.strip_prefix(q.key_expr()) {
567+
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), q.key_expr()) {
565568
Ok(k) => k,
566569
Err(e) => {
567570
tracing::error!("{}", e);
@@ -603,7 +606,7 @@ impl StorageService {
603606
for (k, _ts) in entries {
604607
// @TODO: optimize adding back the prefix (possible inspiration from https://github.com/eclipse-zenoh/zenoh/blob/0.5.0-beta.9/backends/traits/src/utils.rs#L79)
605608
let full_key = match k {
606-
Some(key) => StorageService::get_prefixed(&self.strip_prefix, &key.into()),
609+
Some(key) => crate::prefix(self.strip_prefix.as_ref(), &key),
607610
None => self.strip_prefix.clone().unwrap(),
608611
};
609612
if key_expr.intersects(&full_key.clone()) {
@@ -620,41 +623,6 @@ impl StorageService {
620623
result
621624
}
622625

623-
fn strip_prefix(&self, key_expr: &KeyExpr<'_>) -> ZResult<Option<OwnedKeyExpr>> {
624-
let key = match &self.strip_prefix {
625-
Some(prefix) => {
626-
if key_expr.as_str().eq(prefix.as_str()) {
627-
""
628-
} else {
629-
match key_expr.strip_prefix(prefix).as_slice() {
630-
[ke] => ke.as_str(),
631-
_ => bail!(
632-
"Keyexpr doesn't start with prefix '{}': '{}'",
633-
prefix,
634-
key_expr
635-
),
636-
}
637-
}
638-
}
639-
None => key_expr.as_str(),
640-
};
641-
if key.is_empty() {
642-
Ok(None)
643-
} else {
644-
Ok(Some(OwnedKeyExpr::new(key.to_string()).unwrap()))
645-
}
646-
}
647-
648-
pub fn get_prefixed(
649-
strip_prefix: &Option<OwnedKeyExpr>,
650-
key_expr: &KeyExpr<'_>,
651-
) -> OwnedKeyExpr {
652-
match strip_prefix {
653-
Some(prefix) => prefix.join(key_expr.as_keyexpr()).unwrap(),
654-
None => OwnedKeyExpr::from(key_expr.as_keyexpr()),
655-
}
656-
}
657-
658626
async fn initialize_if_empty(&mut self) {
659627
if self.replication.is_some() && self.replication.as_ref().unwrap().empty_start {
660628
// align with other storages, querying them on key_expr,

0 commit comments

Comments
 (0)