From f11682429f6f29303da552ba194ae51397ec6ad1 Mon Sep 17 00:00:00 2001 From: Mathias Myrland Date: Thu, 21 Dec 2023 18:34:00 +0100 Subject: [PATCH 1/2] (add-signal-map-filter) Added filter for SignalMap --- Cargo.toml | 1 + src/signal_map.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++ tests/signal_map.rs | 52 ++++++++++++++++++++++++++++++- 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1dcea17..ddcda20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ pin-utils = "0.1.0" once_cell = "1.10.0" indoc = "2.0.1" criterion = { version = "0.4.0", features = [] } +tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] } [[bench]] name = "channel" diff --git a/src/signal_map.rs b/src/signal_map.rs index edc48d7..d144286 100644 --- a/src/signal_map.rs +++ b/src/signal_map.rs @@ -98,6 +98,15 @@ impl SignalMap for Pin // TODO Seal this pub trait SignalMapExt: SignalMap { + #[inline] + fn filter(self, callback: F) -> FilterKey + where F: FnMut(&Self::Key) -> bool, Self: Sized, { + FilterKey { + signal: self, + callback, + } + } + #[inline] fn map_value(self, callback: F) -> MapValue where F: FnMut(Self::Value) -> A, @@ -241,6 +250,71 @@ impl SignalMap for MapValue } } +#[pin_project(project = FilterKeyProj)] +#[derive(Debug)] +#[must_use = "SignalMaps do nothing unless polled"] +pub struct FilterKey { + #[pin] + signal: A, + callback: B, +} + +impl SignalMap for crate::signal_map::FilterKey + where A: SignalMap, + F: FnMut(&A::Key) -> bool { + type Key = A::Key; + type Value = A::Value; + + // TODO should this inline ? + #[inline] + fn poll_map_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + let crate::signal_map::FilterKeyProj { signal, callback } = self.project(); + + let polled = signal.poll_map_change(cx); + + match polled { + Poll::Pending => Poll::Pending, + Poll::Ready(polled_ready) => { + if let Some(polled_map_diff) = polled_ready { + let maybe_out_diff = match polled_map_diff { + MapDiff::Replace { entries } => { + let entries = entries.into_iter().filter(|entry| callback(&entry.0)).collect::>(); + Some(MapDiff::Replace { entries }) + } + MapDiff::Insert { key, value } => { + if callback(&key) { + Some(MapDiff::Insert { key, value }) + } else { None } + } + MapDiff::Update { key, value } => { + if callback(&key) { + Some(MapDiff::Update { key, value }) + } else { + None + } + } + MapDiff::Remove { key } => { + Some(MapDiff::Remove { key }) + } + MapDiff::Clear {} => { + Some(MapDiff::Clear {}) + } + }; + + if maybe_out_diff.is_some() { + Poll::Ready(maybe_out_diff) + } else { + Poll::Pending + } + } else { + Poll::Ready(None) + } + } + } + } +} + + // This is an optimization to allow a SignalMap to efficiently "return" multiple MapDiff // TODO can this be made more efficient ? // TODO refactor `signal_map`'s and `signal_vec`'s `PendingBuilder` & `unwrap` into common helpers? diff --git a/tests/signal_map.rs b/tests/signal_map.rs index 0f1cd34..11ca848 100644 --- a/tests/signal_map.rs +++ b/tests/signal_map.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; +use std::iter::FromIterator; use std::task::Poll; +use futures_channel::mpsc::channel; use futures_signals::signal::{self, SignalExt}; -use futures_signals::signal_map::{self, MapDiff, SignalMapExt}; +use futures_signals::signal_map::{self, MapDiff, MutableBTreeMap, SignalMapExt}; mod util; @@ -235,3 +237,51 @@ fn key_cloned_empty() { Poll::Ready(None), ]); } + +#[tokio::test] +async fn test_filter_map() { + use futures_util::StreamExt; + + let input = MutableBTreeMap::from([(1, "1".to_string()), (2, "2".to_string()), (3, "3".to_string())]); + let output_signal = input.signal_map_cloned().filter(|v| v % 2 == 0); + + let output: MutableBTreeMap = MutableBTreeMap::new(); + let output_cloned = output.clone(); + + let (mut proceed_tx, mut proceed_rx) = channel(100); + + tokio::spawn( + output_signal.for_each(move |change| { + let mut locked = output_cloned.lock_mut(); + + match change { + MapDiff::Replace { entries } => locked.replace_cloned(BTreeMap::from_iter(entries)), + MapDiff::Remove { key } => { locked.remove(&key); } + MapDiff::Insert { key, value } => { locked.insert_cloned(key, value); } + MapDiff::Clear {} => locked.clear(), + MapDiff::Update { key, value } => { locked.insert_cloned(key, value); } + } + + proceed_tx.try_send(()).unwrap(); + + async {} + })); + + proceed_rx.next().await.unwrap(); + + assert_eq!(output.lock_ref().len(), 1); + assert_eq!(output.lock_ref().get_key_value(&2), Some((&2, &"2".to_string()))); + + input.lock_mut().insert_cloned(42, "test".to_string()); + + proceed_rx.next().await.unwrap(); + + assert_eq!(output.lock_ref().len(), 2); + assert_eq!(output.lock_ref().get_key_value(&42), Some((&42, &"test".to_string()))); + + input.lock_mut().remove(&42); + proceed_rx.next().await.unwrap(); + + assert_eq!(output.lock_ref().len(), 1); + assert_eq!(output.lock_ref().get_key_value(&42), None); +} \ No newline at end of file From e60ffd500ec9ab42079f86b69ec80c6bcf8b41a0 Mon Sep 17 00:00:00 2001 From: Mathias Myrland Date: Sat, 23 Dec 2023 23:57:54 +0100 Subject: [PATCH 2/2] Updated Filter for map, added tracking of accepted keys and extended filter function to take &key, &value --- src/signal_map.rs | 61 ++++++++++++++++++++++++++++++++++----------- tests/signal_map.rs | 2 +- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/signal_map.rs b/src/signal_map.rs index d144286..d52376b 100644 --- a/src/signal_map.rs +++ b/src/signal_map.rs @@ -1,9 +1,10 @@ use crate::signal::Signal; use std::cmp::Ord; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, HashSet, VecDeque}; use std::pin::Pin; use std::marker::Unpin; use std::future::Future; +use std::hash::Hash; use std::task::{Poll, Context}; use futures_core::Stream; use futures_util::stream; @@ -99,11 +100,12 @@ impl SignalMap for Pin // TODO Seal this pub trait SignalMapExt: SignalMap { #[inline] - fn filter(self, callback: F) -> FilterKey - where F: FnMut(&Self::Key) -> bool, Self: Sized, { - FilterKey { + fn filter(self, callback: F) -> Filter + where F: FnMut(&Self::Key, &Self::Value) -> bool, Self: Sized, { + Filter { signal: self, callback, + forwarded_keys: Default::default(), } } @@ -253,22 +255,24 @@ impl SignalMap for MapValue #[pin_project(project = FilterKeyProj)] #[derive(Debug)] #[must_use = "SignalMaps do nothing unless polled"] -pub struct FilterKey { +pub struct Filter { #[pin] signal: A, callback: B, + forwarded_keys: HashSet, } -impl SignalMap for crate::signal_map::FilterKey +impl SignalMap for crate::signal_map::Filter where A: SignalMap, - F: FnMut(&A::Key) -> bool { + A::Key: Clone + Eq + Hash, + F: FnMut(&A::Key, &A::Value) -> bool { type Key = A::Key; type Value = A::Value; // TODO should this inline ? #[inline] fn poll_map_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { - let crate::signal_map::FilterKeyProj { signal, callback } = self.project(); + let crate::signal_map::FilterKeyProj { signal, callback, forwarded_keys } = self.project(); let polled = signal.poll_map_change(cx); @@ -278,25 +282,54 @@ impl SignalMap for crate::signal_map::FilterKey if let Some(polled_map_diff) = polled_ready { let maybe_out_diff = match polled_map_diff { MapDiff::Replace { entries } => { - let entries = entries.into_iter().filter(|entry| callback(&entry.0)).collect::>(); + forwarded_keys.clear(); + + let entries = entries.into_iter().filter(|entry| callback(&entry.0, &entry.1)).collect::>(); + + entries.iter().for_each(|(k, _v)| { + forwarded_keys.insert((*k).clone()); + }); + Some(MapDiff::Replace { entries }) } MapDiff::Insert { key, value } => { - if callback(&key) { + if callback(&key, &value) { + forwarded_keys.insert(key.clone()); + Some(MapDiff::Insert { key, value }) - } else { None } + } else { + if forwarded_keys.remove(&key) { + Some(MapDiff::Remove { key }) + } else { + None + } + } } MapDiff::Update { key, value } => { - if callback(&key) { + if callback(&key, &value) { + forwarded_keys.insert(key.clone()); + Some(MapDiff::Update { key, value }) } else { - None + if forwarded_keys.remove(&key) { + Some(MapDiff::Remove { key }) + } else { + None + } } } MapDiff::Remove { key } => { - Some(MapDiff::Remove { key }) + if forwarded_keys.remove(&key) { + println!("removing key"); + Some(MapDiff::Remove { key }) + } else { + println!("not removing key"); + None + } } MapDiff::Clear {} => { + forwarded_keys.clear(); + Some(MapDiff::Clear {}) } }; diff --git a/tests/signal_map.rs b/tests/signal_map.rs index 11ca848..5bcef5e 100644 --- a/tests/signal_map.rs +++ b/tests/signal_map.rs @@ -243,7 +243,7 @@ async fn test_filter_map() { use futures_util::StreamExt; let input = MutableBTreeMap::from([(1, "1".to_string()), (2, "2".to_string()), (3, "3".to_string())]); - let output_signal = input.signal_map_cloned().filter(|v| v % 2 == 0); + let output_signal = input.signal_map_cloned().filter(|key, _value| key % 2 == 0); let output: MutableBTreeMap = MutableBTreeMap::new(); let output_cloned = output.clone();