diff --git a/Collections/BaseOrderedChannel.cs b/Collections/BaseOrderedChannel.cs index 900f1f1b..0c71dd98 100644 --- a/Collections/BaseOrderedChannel.cs +++ b/Collections/BaseOrderedChannel.cs @@ -24,7 +24,14 @@ public abstract class BaseOrderedChannel private Channel<(TSort sort, TValue value)> _channel; private bool _isClosed; - /// + /// + /// Gets the number of items in the sorted collection. + /// + /// + /// Note: This property only returns the count of items in the internal sorted collection. + /// It does not include items that are currently in the channel but have not yet been read into the collection. + /// The actual total number of items may be higher during concurrent operations. + /// public int Count { get @@ -130,7 +137,9 @@ public void Close() /// /// The sort with which to associate the new value. /// The value to add to the queue. - protected void Enqueue(TSort sort, TValue value) + /// + /// + protected async ValueTask Enqueue(TSort sort, TValue value, CancellationToken cancellationToken = default) { if (value is null) throw new ArgumentNullException(nameof(value)); @@ -150,12 +159,19 @@ protected void Enqueue(TSort sort, TValue value) try { if (!channel.Writer.TryWrite((sort, value))) - AsyncHelper.Run(() => channel.Writer.WriteAsync((sort, value))); + await channel.Writer.WriteAsync((sort, value), cancellationToken); } catch (ChannelClosedException) { // Queue was closed concurrently; value is dropped by design. } + catch + { + if (cancellationToken.IsCancellationRequested) + return; + + throw; + } } /// diff --git a/Tests/Collections/BaseOrderedChannelTests.cs b/Tests/Collections/BaseOrderedChannelTests.cs index 89de112f..f6420967 100644 --- a/Tests/Collections/BaseOrderedChannelTests.cs +++ b/Tests/Collections/BaseOrderedChannelTests.cs @@ -10,9 +10,15 @@ public class BaseOrderedChannelTests : BaseTestClass /// Test implementation of BaseOrderedChannel for testing purposes. /// Uses PriorityQueue for sorting by integer priority. /// - private class TestOrderedChannel(int maxSize) : BaseOrderedChannel>(new Ecng.Collections.PriorityQueue((a, b) => Math.Abs(a - b), Comparer.Default), maxSize) + private class TestOrderedChannel : BaseOrderedChannel> { - public void Add(int priority, string value) => Enqueue(priority, value); + public TestOrderedChannel(int maxSize = -1) + : base(new Ecng.Collections.PriorityQueue((a, b) => Math.Abs(a - b), Comparer.Default), maxSize) + { + } + + public async ValueTask Add(int priority, string value, CancellationToken cancellationToken = default) + => await Enqueue(priority, value, cancellationToken); } [TestMethod] @@ -64,13 +70,15 @@ public void MaxSize_SetValidValue_Succeeds() } [TestMethod] - public void Enqueue_WhenClosed_DropsValue() + public async Task Enqueue_WhenClosed_DropsValue() { + var token = CancellationToken; + // Arrange var queue = CreateQueue(); // Act - enqueue without opening - queue.Add(1, "test"); + await queue.Add(1, "test", token); // Assert - value should be dropped queue.Count.AssertEqual(0); @@ -86,7 +94,7 @@ public async Task EnqueueDequeue_SingleItem_WorksCorrectly() queue.Open(); // Act - queue.Add(1, "first"); + await queue.Add(1, "first", token); var result = await queue.DequeueAsync(token); // Assert @@ -104,9 +112,9 @@ public async Task EnqueueDequeue_MultipleItems_MaintainsOrder() queue.Open(); // Act - enqueue in non-sorted order - queue.Add(3, "third"); - queue.Add(1, "first"); - queue.Add(2, "second"); + await queue.Add(3, "third", token); + await queue.Add(1, "first", token); + await queue.Add(2, "second", token); await Task.Delay(100, token); // Allow time for sorting @@ -129,9 +137,9 @@ public async Task ReadAllAsync_ReturnsAllItems() var queue = CreateQueue(); queue.Open(); - queue.Add(1, "first"); - queue.Add(2, "second"); - queue.Add(3, "third"); + await queue.Add(1, "first", token); + await queue.Add(2, "second", token); + await queue.Add(3, "third", token); await Task.Delay(100, token); // Allow time for items to be queued @@ -167,9 +175,9 @@ public async Task Clear_RemovesAllItems() var queue = CreateQueue(); queue.Open(); - queue.Add(1, "first"); - queue.Add(2, "second"); - queue.Add(3, "third"); + await queue.Add(1, "first", token); + await queue.Add(2, "second", token); + await queue.Add(3, "third", token); await Task.Delay(100, token); // Allow time for items to be queued @@ -188,12 +196,12 @@ public async Task Reopen_AfterClose_WorksCorrectly() // Arrange var queue = CreateQueue(); queue.Open(); - queue.Add(1, "first"); + await queue.Add(1, "first", token); queue.Close(); // Act queue.Open(); - queue.Add(2, "second"); + await queue.Add(2, "second", token); var result = await queue.DequeueAsync(token); // Assert @@ -211,10 +219,10 @@ public async Task BoundedQueue_RespectsMaxSize() queue.Open(); // Act - try to add 3 items to queue with max size 2 - queue.Add(1, "first"); - queue.Add(2, "second"); + await queue.Add(1, "first", token); + await queue.Add(2, "second", token); - var writeTask = Task.Run(() => queue.Add(3, "third"), token); // This should block + var writeTask = Task.Run(async () => await queue.Add(3, "third", token), token); // This should block // Wait a bit to see if it completes (it shouldn't) var completed = await Task.WhenAny(writeTask, Task.Delay(500, token)) == writeTask; @@ -241,11 +249,11 @@ public async Task Count_ReflectsQueueSize() // Act & Assert queue.Count.AssertEqual(0); - queue.Add(1, "first"); + await queue.Add(1, "first", token); await Task.Delay(50, token); - queue.Add(2, "second"); - queue.Add(3, "third"); + await queue.Add(2, "second", token); + await queue.Add(3, "third", token); await Task.Delay(100, token); // Dequeue all