Skip to content

[Feature]: MaximumBy #954

@fubar-coder

Description

@fubar-coder

Describe the functionality desired 🐞

The library has a Maximum operator, but no MaximumBy, which allows the aggregation of a TObject using a key selector.

The steps the functionality will provide

Example

Usage

var data = new SourceList<TestItem>();

var output = new List<string>();
using var _ = data.Connect()
    .AutoRefresh()
    .MaximumBy(x => x.Name)
    .Subscribe(x =>
    {
        output.Add(x.Name);
        Console.WriteLine(x.Name);
    });

var item1 = new TestItem("b");

data.Add(item1);
data.Add(new TestItem("a"));

Debug.Assert(output.Count == 2);
Debug.Assert(output[0] == "b");
Debug.Assert(output[1] == "b");

data.Add(new TestItem("c"));
Debug.Assert(output.Count == 3);
Debug.Assert(output[2] == "c");

item1.Name = "d";
Debug.Assert(output.Count == 4);
Debug.Assert(output[3] == "d");

Test object

public class TestItem(string name) : INotifyPropertyChanged
{
    private string _name = name;

    public event PropertyChangedEventHandler? PropertyChanged;

    public string Name
    {
        get => _name;
        set => SetField(ref _name, value);
    }

    protected virtual void OnPropertyChanged([CallerMemberName] string? propertyName = null)
    {
        PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(propertyName));
    }

    protected bool SetField<T>(ref T field, T value, [CallerMemberName] string? propertyName = null)
    {
        if (EqualityComparer<T>.Default.Equals(field, value)) return false;
        field = value;
        OnPropertyChanged(propertyName);
        return true;
    }
}

Extension class

/// <summary>
/// Methods to extend the DynamicData library.
/// </summary>
public static class DynamicDataExtensions
{
    /// <summary>
    /// Performs a maximum operation on the source.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="keySelector">The key selector for the object to get the maximum for.</param>
    /// <typeparam name="TObject">The object type to get the maximum for.</typeparam>
    /// <typeparam name="TKey">The key selector to determine the maximum.</typeparam>
    /// <returns>The observable returning the maximum object.</returns>
    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static IObservable<TObject> MaximumBy<TObject, TKey>(
        this IObservable<IChangeSet<TObject>> source,
        Func<TObject, TKey> keySelector)
        where TObject : notnull
    {
        return source.MaximumBy(keySelector, null);
    }

    /// <summary>
    /// Alternative implementation of the <see cref="MaxEx.Maximum{TObject,TResult}"/> method, which
    /// uses a comparer for the object itself.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="comparer">The comparer for the source object.</param>
    /// <typeparam name="TObject">The object type to get the maximum for.</typeparam>
    /// <returns>The observable returning the maximum object.</returns>
    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static IObservable<TObject> Maximum<TObject>(
        this IObservable<IChangeSet<TObject>> source,
        IComparer<TObject>? comparer)
        where TObject : notnull
    {
        return source.MaximumBy(static x => x, comparer);
    }

    /// <summary>
    /// Performs a maximum operation on the source using both a key selector and comparer.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="keySelector">The key selector for the object to get the maximum for.</param>
    /// <param name="comparer">The comparer for the source object.</param>
    /// <typeparam name="TObject">The object type to get the maximum for.</typeparam>
    /// <typeparam name="TKey">The key selector to determine the maximum.</typeparam>
    /// <returns>The observable returning the maximum object.</returns>
    public static IObservable<TObject> MaximumBy<TObject, TKey>(
        this IObservable<IChangeSet<TObject>> source,
        Func<TObject, TKey> keySelector,
        IComparer<TKey>? comparer)
        where TObject : notnull
    {
        comparer ??= Comparer<TKey>.Default;
        return source
            .ToChangesAndCollection()
            .Scan(
                AggregationState<TObject, TKey>.Empty,
                (state, latest) =>
                {
                    var current = state;
                    var requiresReset = false;

                    using var changesEnumerator = latest.Changes.GetEnumerator();
                    if (changesEnumerator.MoveNext())
                    {
                        do
                        {
                            var change = changesEnumerator.Current;
                            var value = keySelector(change.Item);

                            if (change.Type == AggregateType.Add)
                            {
                                // Set a new current item, when the current item was not set,
                                // or if the comparer indicates a new maximum value.
                                if (!current.IsSet || comparer.Compare(value, current.Key) > 0)
                                {
                                    current = new AggregationState<TObject, TKey>(change.Item, value);
                                }
                            }
                            else
                            {
                                // check whether the max / min has been removed. If so we need to look
                                // up the latest from the underlying collection
                                if (!current.IsSet)
                                {
                                    // Value was not set, but we have a remove change. This means that the
                                    // something went wrong???
                                    continue;
                                }

                                if (comparer.Compare(value, current.Key) != 0)
                                {
                                    continue;
                                }

                                requiresReset = true;
                                break;
                            }
                        } while (changesEnumerator.MoveNext());
                    }
                    else
                    {
                        // An empty enumeration occurs when <c>AutoRefresh</c> is used and the
                        // underlying item has been changed.
                        requiresReset = true;
                    }

                    // Do we need to process the whole collection?
                    // This may happen due to a refresh or a removal of the max item.
                    if (requiresReset)
                    {
                        var collection = latest.Collection;
                        if (collection.Count == 0)
                        {
                            current = AggregationState<TObject, TKey>.Empty;
                        }
                        else
                        {
                            var found = collection.MaxBy(keySelector);
                            current = found is null
                                ? AggregationState<TObject, TKey>.Empty
                                : new AggregationState<TObject, TKey>(found, keySelector(found));
                        }
                    }

                    return current;
                })
            .Where(x => x.IsSet)
            .Select(x => x.Current!);
    }

    /// <summary>
    /// Copied from the original DynamicData source code.
    /// </summary>
    /// <param name="source"></param>
    /// <typeparam name="TObject"></typeparam>
    /// <returns></returns>
    private static IObservable<ChangesAndCollection<TObject>> ToChangesAndCollection<TObject>(
        this IObservable<IChangeSet<TObject>> source)
        where TObject : notnull
    {
        return source.Publish(
            shared =>
            {
                var changes = shared.ForAggregation();
                var data = shared.ToCollection();
                return data.Zip(changes, (d, c) => new ChangesAndCollection<TObject>(c, d));
            });
    }

    /// <summary>
    /// Contains the current maximum value and its key.
    /// </summary>
    /// <typeparam name="TObject"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    private sealed class AggregationState<TObject, TKey>
        where TObject : notnull
    {
        /// <summary>
        /// Gets an empty instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        public static AggregationState<TObject, TKey> Empty { get; } = new();

        /// <summary>
        /// Initializes a new instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        private AggregationState()
            : this(false, default, default)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        /// <param name="current">The current value.</param>
        /// <param name="key">The key of the current value.</param>
        public AggregationState(TObject current, TKey key)
            : this(true, current, key)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        /// <param name="isSet">Determines whether the current value is set.</param>
        /// <param name="current">The current value.</param>
        /// <param name="key">The key of the current value.</param>
        private AggregationState(bool isSet, TObject? current, TKey? key)
        {
            IsSet = isSet;
            Current = current;
            Key = key;
        }

        /// <summary>
        /// Gets a value indicating whether this instance is set to a non-null value and key.
        /// </summary>
        [MemberNotNullWhen(true, nameof(Current), nameof(Key))]
        public bool IsSet { get; }

        /// <summary>
        /// Gets the value of the current maximum item.
        /// </summary>
        public TObject? Current { get; }

        /// <summary>
        /// Gets the key of the current maximum item.
        /// </summary>
        public TKey? Key { get; }
    }

    /// <summary>
    /// Copied from the original DynamicData source code.
    /// </summary>
    /// <param name="changes"></param>
    /// <param name="collection"></param>
    /// <typeparam name="T"></typeparam>
    private sealed class ChangesAndCollection<T>(IAggregateChangeSet<T> changes, IReadOnlyCollection<T> collection)
    {
        public IAggregateChangeSet<T> Changes => changes;

        public IReadOnlyCollection<T> Collection => collection;
    }
}

Considerations

I was looking at the Maximum implementation, but ran into two problems:

  1. I was unable to specify a comparer
  2. I was unable to get the maximum based on a property of the source object

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions