Skip to content

Commit 22523fc

Browse files
committed
Remove ISequential*Registry.
1 parent 31bed66 commit 22523fc

File tree

8 files changed

+58
-120
lines changed

8 files changed

+58
-120
lines changed

src/AsyncFiberWorks/Procedures/FiberAndHandlerPairList.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using AsyncFiberWorks.Core;
22
using System;
33
using System.Collections.Generic;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace AsyncFiberWorks.Procedures
@@ -13,7 +14,7 @@ namespace AsyncFiberWorks.Procedures
1314
/// If it has not yet been processed, it proceeds to the next step. If already processed, exit at that point.
1415
/// </summary>
1516
/// <typeparam name="TMessage">Message type.</typeparam>
16-
public class FiberAndHandlerPairList<TMessage> : ISequentialHandlerListRegistry<TMessage>, ISequentialPublisher<TMessage>
17+
public class FiberAndHandlerPairList<TMessage> : ISequentialPublisher<TMessage>
1718
{
1819
private readonly object _lock = new object();
1920
private readonly LinkedList<RegisteredHandler> _actions = new LinkedList<RegisteredHandler>();
@@ -125,6 +126,20 @@ public IDisposable Add(Action<IFiberExecutionEventArgs, ProcessedFlagEventArgs<T
125126
return unsubscriber;
126127
}
127128

129+
/// <summary>
130+
/// Add a handler to the tail.
131+
/// </summary>
132+
/// <param name="handler">Message handler. The return value is the processed flag. If this flag is true, subsequent handlers are not called.</param>
133+
/// <param name="context">The context in which the handler will execute. if null, the default is used.</param>
134+
/// <returns>Handle for canceling registration.</returns>
135+
public IDisposable Add(Func<TMessage, Task<bool>> handler, IFiber context = null)
136+
{
137+
return this.Add((IFiberExecutionEventArgs e, ProcessedFlagEventArgs<TMessage> message) => e.PauseWhileRunning(async () =>
138+
{
139+
message.Processed = await handler(message.Arg).ConfigureAwait(false);
140+
}), context);
141+
}
142+
128143
/// <summary>
129144
/// Distribute one message.
130145
/// </summary>

src/AsyncFiberWorks/Procedures/FiberAndTaskPairList.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using AsyncFiberWorks.Core;
22
using System;
33
using System.Collections.Generic;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace AsyncFiberWorks.Procedures
@@ -11,7 +12,7 @@ namespace AsyncFiberWorks.Procedures
1112
/// Can specify the fiber to be executed.
1213
/// Wait for the calls to complete one by one before proceeding.
1314
/// </summary>
14-
public class FiberAndTaskPairList : ISequentialTaskInvoker, ISequentialTaskListRegistry
15+
public class FiberAndTaskPairList : ISequentialTaskInvoker
1516
{
1617
private readonly object _lock = new object();
1718
private readonly LinkedList<RegisteredAction> _actions = new LinkedList<RegisteredAction>();
@@ -121,6 +122,17 @@ public IDisposable Add(Action<IFiberExecutionEventArgs> task, IFiber context = n
121122
return unsubscriber;
122123
}
123124

125+
/// <summary>
126+
/// Add a task to the tail.
127+
/// </summary>
128+
/// <param name="task">Task to be performed.</param>
129+
/// <param name="context">The context in which the task will execute. if null, the default is used.</param>
130+
/// <returns>Handle for canceling registration.</returns>
131+
public IDisposable Add(Func<Task> task, IFiber context = null)
132+
{
133+
return this.Add((IFiberExecutionEventArgs e) => e.PauseWhileRunning(task), context);
134+
}
135+
124136
/// <summary>
125137
/// Invoke all tasks sequentially.
126138
/// </summary>

src/AsyncFiberWorks/Procedures/ISequentialHandlerListRegistry.cs

Lines changed: 0 additions & 28 deletions
This file was deleted.

src/AsyncFiberWorks/Procedures/ISequentialTaskListRegistry.cs

Lines changed: 0 additions & 27 deletions
This file was deleted.

src/AsyncFiberWorks/Procedures/SequentialHandlerWaiter.cs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace AsyncFiberWorks.Procedures
1212
public class SequentialHandlerWaiter<T> : IDisposable
1313
{
1414
private readonly object _lockObj = new object();
15-
private IDisposable _unsubscriber;
1615
private bool _executionRequested;
1716
private bool _isDisposed;
1817
private readonly ManualResetEventSlim _notifierExecutionRequested = new ManualResetEventSlim();
@@ -24,22 +23,26 @@ public class SequentialHandlerWaiter<T> : IDisposable
2423
private readonly ProcessedFlagEventArgs<T> _currentValue = new ProcessedFlagEventArgs<T>();
2524

2625
/// <summary>
27-
/// Register a handler to the sequential handler list.
26+
/// Constructor.
2827
/// </summary>
29-
/// <param name="handlerList"></param>
3028
/// <param name="cancellationToken"></param>
31-
public SequentialHandlerWaiter(ISequentialHandlerListRegistry<T> handlerList, CancellationToken cancellationToken = default)
29+
public SequentialHandlerWaiter(CancellationToken cancellationToken = default)
3230
{
3331
_thread = UserThreadPool.StartNew(1);
3432
_cancellationTokenExternal = cancellationToken;
35-
_unsubscriber = handlerList.Add(async (arg) =>
36-
{
37-
_currentValue.Processed = false;
38-
_currentValue.Arg = arg;
39-
await ExecuteAsync().ConfigureAwait(false);
40-
return _currentValue.Processed;
41-
});
33+
}
4234

35+
/// <summary>
36+
/// Execute a handler.
37+
/// </summary>
38+
/// <param name="arg">An argument.</param>
39+
/// <returns>Indicates whether it has been processed.</returns>
40+
public async Task<bool> Handler(T arg)
41+
{
42+
_currentValue.Processed = false;
43+
_currentValue.Arg = arg;
44+
await ExecuteAsync().ConfigureAwait(false);
45+
return _currentValue.Processed;
4346
}
4447

4548
/// <summary>
@@ -160,7 +163,6 @@ public void Dispose()
160163

161164
_executionRequested = false;
162165
_inExecuting = false;
163-
_unsubscriber.Dispose();
164166
_onDispose.Cancel();
165167
_onDispose.Dispose();
166168
_notifierExecutionRequested.Dispose();

src/AsyncFiberWorks/Procedures/SequentialTaskListRegistryExtensions.cs

Lines changed: 0 additions & 37 deletions
This file was deleted.

src/AsyncFiberWorks/Procedures/SequentialTaskWaiter.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ namespace AsyncFiberWorks.Procedures
1111
public class SequentialTaskWaiter : IDisposable
1212
{
1313
private readonly object _lockObj = new object();
14-
private IDisposable _unsubscriber;
1514
private bool _executionRequested;
1615
private bool _isDisposed;
1716
private readonly ManualResetEventSlim _notifierExecutionRequested = new ManualResetEventSlim();
@@ -22,23 +21,21 @@ public class SequentialTaskWaiter : IDisposable
2221
private readonly CancellationTokenSource _onDispose = new CancellationTokenSource();
2322

2423
/// <summary>
25-
/// Register a task to the sequential task list.
24+
/// Constructor.
2625
/// </summary>
27-
/// <param name="taskList"></param>
2826
/// <param name="cancellationToken"></param>
29-
public SequentialTaskWaiter(ISequentialTaskListRegistry taskList, CancellationToken cancellationToken = default)
27+
public SequentialTaskWaiter(CancellationToken cancellationToken = default)
3028
{
3129
_thread = UserThreadPool.StartNew(1);
3230
_cancellationTokenExternal = cancellationToken;
33-
_unsubscriber = taskList.Add(ExecuteAsync);
3431
}
3532

3633
/// <summary>
3734
/// Execute a task.
3835
/// </summary>
3936
/// <returns></returns>
4037
/// <exception cref="InvalidOperationException"></exception>
41-
async Task ExecuteAsync()
38+
public async Task ExecuteTask()
4239
{
4340
lock (_lockObj)
4441
{
@@ -150,7 +147,6 @@ public void Dispose()
150147

151148
_executionRequested = false;
152149
_inExecuting = false;
153-
_unsubscriber.Dispose();
154150
_onDispose.Cancel();
155151
_onDispose.Dispose();
156152
_notifierExecutionRequested.Dispose();

src/AsyncFiberWorksTests/SequentialTaskWaiterTests.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ public async Task TestWait()
1919

2020
Func<int, Task> func = async (maxCount) =>
2121
{
22-
using (var activator = new SequentialTaskWaiter(taskList))
22+
using (var activator = new SequentialTaskWaiter())
23+
using (var unsubscriber = taskList.Add(activator.ExecuteTask))
2324
{
2425
int counter = 0;
2526
while (counter < maxCount)
@@ -59,7 +60,8 @@ public async Task TestCancellation()
5960

6061
var func = new Func<Task>(async () =>
6162
{
62-
using (var activator = new SequentialTaskWaiter(taskList, cancellationToken))
63+
using (var activator = new SequentialTaskWaiter(cancellationToken))
64+
using (var unsubscriber = taskList.Add(activator.ExecuteTask))
6365
{
6466
try
6567
{
@@ -101,7 +103,8 @@ public async Task TestWaitingOfT()
101103

102104
Func<int, Task> func = async (maxCount) =>
103105
{
104-
using (var reg = new SequentialHandlerWaiter<int>(handlerList))
106+
using (var reg = new SequentialHandlerWaiter<int>())
107+
using (var unsubscriber = handlerList.Add(reg.Handler))
105108
{
106109
int counter = 0;
107110
while (counter < maxCount)
@@ -147,7 +150,8 @@ public async Task TestCancellationOfT()
147150

148151
var func = new Func<Task>(async () =>
149152
{
150-
using (var reg = new SequentialHandlerWaiter<int>(handlerList, cancellationToken))
153+
using (var reg = new SequentialHandlerWaiter<int>(cancellationToken))
154+
using (var unsubscriber = handlerList.Add(reg.Handler))
151155
{
152156
try
153157
{
@@ -185,14 +189,15 @@ public async Task TestCancellationOfT()
185189
[Test]
186190
public async Task TestWaitingOfProcessedFlagEventArgs()
187191
{
188-
var driver = new FiberAndHandlerPairList<int>();
192+
var handlerList = new FiberAndHandlerPairList<int>();
189193
int resultCounter = 0;
190194
var lockObj = new object();
191195

192196
var cts = new CancellationTokenSource();
193197
Func<int, Task> func = async (threshold) =>
194198
{
195-
using (var reg = new SequentialHandlerWaiter<int>(driver, cts.Token))
199+
using (var reg = new SequentialHandlerWaiter<int>(cts.Token))
200+
using (var unsubscriber = handlerList.Add(reg.Handler))
196201
{
197202
while (true)
198203
{
@@ -222,7 +227,7 @@ public async Task TestWaitingOfProcessedFlagEventArgs()
222227
for (int i = 0; i < 10; i++)
223228
{
224229
int eventArg = i + 1;
225-
await driver.PublishSequentialAsync(eventArg, defaultContext);
230+
await handlerList.PublishSequentialAsync(eventArg, defaultContext);
226231
}
227232

228233
cts.Cancel();

0 commit comments

Comments
 (0)