Skip to content

Commit e0beb7a

Browse files
committed
Align the description of SequentialHandlerWaiter with SequentialTaskWaiter.
1 parent c90eba7 commit e0beb7a

File tree

1 file changed

+72
-42
lines changed

1 file changed

+72
-42
lines changed
Lines changed: 72 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using AsyncFiberWorks.Threading;
2+
using System;
23
using System.Threading;
34
using System.Threading.Tasks;
45

@@ -11,14 +12,16 @@ namespace AsyncFiberWorks.Procedures
1112
public class SequentialHandlerWaiter<T> : IDisposable
1213
{
1314
private readonly object _lockObj = new object();
14-
private readonly IDisposable _subscription;
15-
private readonly ProcessedFlagEventArgs<T> _currentValue = new ProcessedFlagEventArgs<T>();
16-
private bool _hasValue;
15+
private IDisposable _unsubscriber;
16+
private bool _executionRequested;
1717
private bool _isDisposed;
18-
private readonly SemaphoreSlim _notifierSet = new SemaphoreSlim(0);
19-
private readonly SemaphoreSlim _notifierClear = new SemaphoreSlim(0);
20-
private bool _reading;
21-
private CancellationToken _cancellationToken;
18+
private readonly ManualResetEventSlim _notifierExecutionRequested = new ManualResetEventSlim();
19+
private readonly ManualResetEventSlim _notifierExecutionFinished = new ManualResetEventSlim();
20+
private readonly UserThreadPool _thread;
21+
private bool _inExecuting;
22+
private CancellationToken _cancellationTokenExternal;
23+
private readonly CancellationTokenSource _onDispose = new CancellationTokenSource();
24+
private readonly ProcessedFlagEventArgs<T> _currentValue = new ProcessedFlagEventArgs<T>();
2225

2326
/// <summary>
2427
/// Register a handler to the sequential handler list.
@@ -27,56 +30,66 @@ public class SequentialHandlerWaiter<T> : IDisposable
2730
/// <param name="cancellationToken"></param>
2831
public SequentialHandlerWaiter(ISequentialHandlerListRegistry<T> handlerList, CancellationToken cancellationToken = default)
2932
{
30-
_cancellationToken = cancellationToken;
31-
_subscription = handlerList.Add((arg) => ExecuteAsync(arg));
33+
_thread = UserThreadPool.StartNew(1);
34+
_cancellationTokenExternal = cancellationToken;
35+
_unsubscriber = handlerList.Add(async (arg) =>
36+
{
37+
_currentValue.Processed = false;
38+
_currentValue.Arg = arg;
39+
await ExecuteAsync().ConfigureAwait(false);
40+
return _currentValue.Processed;
41+
});
42+
3243
}
3344

3445
/// <summary>
3546
/// Execute a handler.
3647
/// </summary>
37-
/// <param name="newValue"></param>
3848
/// <returns></returns>
3949
/// <exception cref="InvalidOperationException"></exception>
40-
async Task<bool> ExecuteAsync(T newValue)
50+
async Task ExecuteAsync()
4151
{
4252
lock (_lockObj)
4353
{
4454
if (_isDisposed)
4555
{
46-
return false;
56+
return;
4757
}
48-
if (_cancellationToken.IsCancellationRequested)
58+
if (_cancellationTokenExternal.IsCancellationRequested)
4959
{
50-
return false;
60+
return;
5161
}
52-
if (_hasValue)
62+
if (_executionRequested)
5363
{
5464
throw new InvalidOperationException();
5565
}
56-
if (_reading)
66+
if (_inExecuting)
5767
{
5868
throw new InvalidOperationException();
5969
}
60-
if (_notifierClear.CurrentCount != 0)
70+
if (_notifierExecutionRequested.IsSet)
6171
{
6272
throw new InvalidOperationException();
6373
}
6474

65-
_currentValue.Processed = false;
66-
_currentValue.Arg = newValue;
67-
_hasValue = true;
68-
_notifierSet.Release(1);
75+
_executionRequested = true;
76+
_notifierExecutionFinished.Reset();
77+
_notifierExecutionRequested.Set();
6978
}
7079

7180
try
7281
{
73-
await _notifierClear.WaitAsync(_cancellationToken).ConfigureAwait(false);
82+
var cancellation = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenExternal, _onDispose.Token);
83+
await _thread.RegisterWaitForSingleObjectAsync(_notifierExecutionFinished.WaitHandle, cancellation.Token).ConfigureAwait(false);
7484
}
7585
catch (OperationCanceledException)
7686
{
77-
return false;
87+
return;
88+
}
89+
catch (ObjectDisposedException)
90+
{
91+
return;
7892
}
79-
return _currentValue.Processed;
8093
}
8194

8295
/// <summary>
@@ -93,26 +106,43 @@ public async Task<ProcessedFlagEventArgs<T>> ExecutionStarted()
93106
{
94107
throw new ObjectDisposedException(GetType().FullName);
95108
}
96-
if (_cancellationToken.IsCancellationRequested)
109+
if (_cancellationTokenExternal.IsCancellationRequested)
97110
{
98111
throw new OperationCanceledException();
99112
}
100-
if (_hasValue && _reading)
113+
if (_executionRequested && _inExecuting)
101114
{
102-
_currentValue.Arg = default;
103-
_hasValue = false;
104-
_reading = false;
105-
_notifierClear.Release(1);
115+
_executionRequested = false;
116+
_inExecuting = false;
117+
_notifierExecutionRequested.Reset();
118+
_notifierExecutionFinished.Set();
106119
}
107120
}
108121

109-
await _notifierSet.WaitAsync(_cancellationToken).ConfigureAwait(false);
122+
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenExternal, _onDispose.Token))
123+
{
124+
try
125+
{
126+
await _thread.RegisterWaitForSingleObjectAsync(_notifierExecutionRequested.WaitHandle, linkedCts.Token).ConfigureAwait(false);
127+
}
128+
catch (OperationCanceledException)
129+
{
130+
if (_cancellationTokenExternal.IsCancellationRequested)
131+
{
132+
_cancellationTokenExternal.ThrowIfCancellationRequested();
133+
}
134+
else
135+
{
136+
throw new ObjectDisposedException(nameof(SequentialTaskWaiter));
137+
}
138+
}
139+
}
110140

111141
lock (_lockObj)
112142
{
113-
_reading = true;
114-
return _currentValue;
143+
_inExecuting = true;
115144
}
145+
return _currentValue;
116146
}
117147

118148
/// <summary>
@@ -127,15 +157,15 @@ public void Dispose()
127157
return;
128158
}
129159
_isDisposed = true;
130-
_currentValue.Arg = default;
131-
_hasValue = false;
132-
_reading = false;
133-
_notifierClear.Release(1);
134-
_notifierClear.Dispose();
135-
_notifierSet.Dispose();
136-
}
137160

138-
_subscription.Dispose();
161+
_executionRequested = false;
162+
_inExecuting = false;
163+
_unsubscriber.Dispose();
164+
_onDispose.Cancel();
165+
_onDispose.Dispose();
166+
_notifierExecutionRequested.Dispose();
167+
_notifierExecutionFinished.Dispose();
168+
}
139169
}
140170
}
141171
}

0 commit comments

Comments
 (0)