Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pin-utils = "0.1.0"
once_cell = "1.10.0"
indoc = "2.0.1"
criterion = { version = "0.4.0", features = [] }
tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] }

[[bench]]
name = "channel"
Expand Down
109 changes: 108 additions & 1 deletion src/signal_map.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::signal::Signal;
use std::cmp::Ord;
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::pin::Pin;
use std::marker::Unpin;
use std::future::Future;
use std::hash::Hash;
use std::task::{Poll, Context};
use futures_core::Stream;
use futures_util::stream;
Expand Down Expand Up @@ -98,6 +99,16 @@ impl<A> SignalMap for Pin<A>

// TODO Seal this
pub trait SignalMapExt: SignalMap {
#[inline]
fn filter<F>(self, callback: F) -> Filter<Self, F>
where F: FnMut(&Self::Key, &Self::Value) -> bool, Self: Sized, {
Filter {
signal: self,
callback,
forwarded_keys: Default::default(),
}
}

#[inline]
fn map_value<A, F>(self, callback: F) -> MapValue<Self, F>
where F: FnMut(Self::Value) -> A,
Expand Down Expand Up @@ -241,6 +252,102 @@ impl<A, B, F> SignalMap for MapValue<A, F>
}
}

#[pin_project(project = FilterKeyProj)]
#[derive(Debug)]
#[must_use = "SignalMaps do nothing unless polled"]
pub struct Filter<A: SignalMap, B> {
#[pin]
signal: A,
callback: B,
forwarded_keys: HashSet<A::Key>,
}

impl<A, F> SignalMap for crate::signal_map::Filter<A, F>
where A: SignalMap,
A::Key: Clone + Eq + Hash,
F: FnMut(&A::Key, &A::Value) -> bool {
type Key = A::Key;
type Value = A::Value;

// TODO should this inline ?
#[inline]
fn poll_map_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<MapDiff<Self::Key, Self::Value>>> {
let crate::signal_map::FilterKeyProj { signal, callback, forwarded_keys } = self.project();

let polled = signal.poll_map_change(cx);

match polled {
Poll::Pending => Poll::Pending,
Poll::Ready(polled_ready) => {
if let Some(polled_map_diff) = polled_ready {
let maybe_out_diff = match polled_map_diff {
MapDiff::Replace { entries } => {
forwarded_keys.clear();

let entries = entries.into_iter().filter(|entry| callback(&entry.0, &entry.1)).collect::<Vec<_>>();

entries.iter().for_each(|(k, _v)| {
forwarded_keys.insert((*k).clone());
});

Some(MapDiff::Replace { entries })
}
MapDiff::Insert { key, value } => {
if callback(&key, &value) {
forwarded_keys.insert(key.clone());

Some(MapDiff::Insert { key, value })
} else {
if forwarded_keys.remove(&key) {
Some(MapDiff::Remove { key })
} else {
None
}
}
}
MapDiff::Update { key, value } => {
if callback(&key, &value) {
forwarded_keys.insert(key.clone());

Some(MapDiff::Update { key, value })
} else {
if forwarded_keys.remove(&key) {
Some(MapDiff::Remove { key })
} else {
None
}
}
}
MapDiff::Remove { key } => {
if forwarded_keys.remove(&key) {
println!("removing key");
Some(MapDiff::Remove { key })
} else {
println!("not removing key");
None
}
}
MapDiff::Clear {} => {
forwarded_keys.clear();

Some(MapDiff::Clear {})
}
};

if maybe_out_diff.is_some() {
Poll::Ready(maybe_out_diff)
} else {
Poll::Pending
}
} else {
Poll::Ready(None)
}
}
}
}
}


// This is an optimization to allow a SignalMap to efficiently "return" multiple MapDiff
// TODO can this be made more efficient ?
// TODO refactor `signal_map`'s and `signal_vec`'s `PendingBuilder` & `unwrap` into common helpers?
Expand Down
52 changes: 51 additions & 1 deletion tests/signal_map.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::BTreeMap;
use std::iter::FromIterator;
use std::task::Poll;
use futures_channel::mpsc::channel;
use futures_signals::signal::{self, SignalExt};
use futures_signals::signal_map::{self, MapDiff, SignalMapExt};
use futures_signals::signal_map::{self, MapDiff, MutableBTreeMap, SignalMapExt};

mod util;

Expand Down Expand Up @@ -235,3 +237,51 @@ fn key_cloned_empty() {
Poll::Ready(None),
]);
}

#[tokio::test]
async fn test_filter_map() {
use futures_util::StreamExt;

let input = MutableBTreeMap::from([(1, "1".to_string()), (2, "2".to_string()), (3, "3".to_string())]);
let output_signal = input.signal_map_cloned().filter(|key, _value| key % 2 == 0);

let output: MutableBTreeMap<i32, String> = MutableBTreeMap::new();
let output_cloned = output.clone();

let (mut proceed_tx, mut proceed_rx) = channel(100);

tokio::spawn(
output_signal.for_each(move |change| {
let mut locked = output_cloned.lock_mut();

match change {
MapDiff::Replace { entries } => locked.replace_cloned(BTreeMap::from_iter(entries)),
MapDiff::Remove { key } => { locked.remove(&key); }
MapDiff::Insert { key, value } => { locked.insert_cloned(key, value); }
MapDiff::Clear {} => locked.clear(),
MapDiff::Update { key, value } => { locked.insert_cloned(key, value); }
}

proceed_tx.try_send(()).unwrap();

async {}
}));

proceed_rx.next().await.unwrap();

assert_eq!(output.lock_ref().len(), 1);
assert_eq!(output.lock_ref().get_key_value(&2), Some((&2, &"2".to_string())));

input.lock_mut().insert_cloned(42, "test".to_string());

proceed_rx.next().await.unwrap();

assert_eq!(output.lock_ref().len(), 2);
assert_eq!(output.lock_ref().get_key_value(&42), Some((&42, &"test".to_string())));

input.lock_mut().remove(&42);
proceed_rx.next().await.unwrap();

assert_eq!(output.lock_ref().len(), 1);
assert_eq!(output.lock_ref().get_key_value(&42), None);
}