Skip to content

Commit 6c6f2db

Browse files
committed
Changed so that cancellation tokens can be specified in AsyncRegister.
1 parent 4a9478f commit 6c6f2db

File tree

3 files changed

+141
-29
lines changed

3 files changed

+141
-29
lines changed

src/AsyncFiberWorks/Procedures/AsyncRegister.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@ public class AsyncRegister : IDisposable
1717
private readonly SemaphoreSlim _notifierSet = new SemaphoreSlim(0);
1818
private readonly SemaphoreSlim _notifierClear = new SemaphoreSlim(0);
1919
private bool _reading;
20+
private CancellationToken _cancellationToken;
2021

2122
/// <summary>
2223
/// Subscribe a task list.
2324
/// </summary>
2425
/// <param name="taskList"></param>
25-
public AsyncRegister(ISequentialTaskListRegistry taskList)
26+
/// <param name="cancellationToken"></param>
27+
public AsyncRegister(ISequentialTaskListRegistry taskList, CancellationToken cancellationToken = default)
2628
{
29+
_cancellationToken = cancellationToken;
2730
_subscription = taskList.Add(async () =>
2831
{
2932
await SetFlagAndWaitClearing().ConfigureAwait(false);
@@ -43,6 +46,10 @@ public async Task SetFlagAndWaitClearing()
4346
{
4447
return;
4548
}
49+
if (_cancellationToken.IsCancellationRequested)
50+
{
51+
return;
52+
}
4653
if (_hasValue)
4754
{
4855
throw new InvalidOperationException();
@@ -60,24 +67,34 @@ public async Task SetFlagAndWaitClearing()
6067
_notifierSet.Release(1);
6168
}
6269

63-
await _notifierClear.WaitAsync().ConfigureAwait(false);
70+
try
71+
{
72+
await _notifierClear.WaitAsync(_cancellationToken).ConfigureAwait(false);
73+
}
74+
catch (OperationCanceledException)
75+
{
76+
return;
77+
}
6478
}
6579

6680
/// <summary>
6781
/// Wait for the flag to be set.
6882
/// </summary>
69-
/// <param name="token"></param>
7083
/// <returns></returns>
7184
/// <exception cref="ObjectDisposedException"></exception>
7285
/// <exception cref="OperationCanceledException"></exception>
73-
public async Task WaitSetting(CancellationToken token = default)
86+
public async Task WaitSetting()
7487
{
7588
lock (_lockObj)
7689
{
7790
if (_isDisposed)
7891
{
7992
throw new ObjectDisposedException(GetType().FullName);
8093
}
94+
if (_cancellationToken.IsCancellationRequested)
95+
{
96+
throw new OperationCanceledException();
97+
}
8198
if (_hasValue && _reading)
8299
{
83100
_hasValue = false;
@@ -86,7 +103,7 @@ public async Task WaitSetting(CancellationToken token = default)
86103
}
87104
}
88105

89-
await _notifierSet.WaitAsync(token).ConfigureAwait(false);
106+
await _notifierSet.WaitAsync(_cancellationToken).ConfigureAwait(false);
90107

91108
lock (_lockObj)
92109
{

src/AsyncFiberWorks/Procedures/AsyncRegisterOfT.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ public class AsyncRegister<T> : IDisposable
1919
private readonly SemaphoreSlim _notifierSet = new SemaphoreSlim(0);
2020
private readonly SemaphoreSlim _notifierClear = new SemaphoreSlim(0);
2121
private bool _reading;
22+
private CancellationToken _cancellationToken;
2223

2324
/// <summary>
2425
/// Subscribe a handler list.
2526
/// </summary>
2627
/// <param name="handlerList"></param>
27-
public AsyncRegister(ISequentialHandlerListRegistry<T> handlerList)
28+
/// <param name="cancellationToken"></param>
29+
public AsyncRegister(ISequentialHandlerListRegistry<T> handlerList, CancellationToken cancellationToken = default)
2830
{
31+
_cancellationToken = cancellationToken;
2932
_subscription = handlerList.Add((arg) => SetValueAndWaitClearing(arg));
3033
}
3134

@@ -43,6 +46,10 @@ async Task<bool> SetValueAndWaitClearing(T newValue)
4346
{
4447
return false;
4548
}
49+
if (_cancellationToken.IsCancellationRequested)
50+
{
51+
return false;
52+
}
4653
if (_hasValue)
4754
{
4855
throw new InvalidOperationException();
@@ -62,25 +69,35 @@ async Task<bool> SetValueAndWaitClearing(T newValue)
6269
_notifierSet.Release(1);
6370
}
6471

65-
await _notifierClear.WaitAsync().ConfigureAwait(false);
72+
try
73+
{
74+
await _notifierClear.WaitAsync(_cancellationToken).ConfigureAwait(false);
75+
}
76+
catch (OperationCanceledException)
77+
{
78+
return false;
79+
}
6680
return _currentValue.Processed;
6781
}
6882

6983
/// <summary>
7084
/// Wait for the value to be set.
7185
/// </summary>
72-
/// <param name="token"></param>
7386
/// <returns></returns>
7487
/// <exception cref="ObjectDisposedException"></exception>
7588
/// <exception cref="OperationCanceledException"></exception>
76-
public async Task<ProcessedFlagEventArgs<T>> WaitSetting(CancellationToken token = default)
89+
public async Task<ProcessedFlagEventArgs<T>> WaitSetting()
7790
{
7891
lock (_lockObj)
7992
{
8093
if (_isDisposed)
8194
{
8295
throw new ObjectDisposedException(GetType().FullName);
8396
}
97+
if (_cancellationToken.IsCancellationRequested)
98+
{
99+
throw new OperationCanceledException();
100+
}
84101
if (_hasValue && _reading)
85102
{
86103
_currentValue.Arg = default;
@@ -90,7 +107,7 @@ public async Task<ProcessedFlagEventArgs<T>> WaitSetting(CancellationToken token
90107
}
91108
}
92109

93-
await _notifierSet.WaitAsync(token).ConfigureAwait(false);
110+
await _notifierSet.WaitAsync(_cancellationToken).ConfigureAwait(false);
94111

95112
lock (_lockObj)
96113
{

src/AsyncFiberWorksTests/AsyncRegisterTests.cs

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ public async Task WaitingOfAsyncRegister()
1919

2020
Func<int, Task> func = async (maxCount) =>
2121
{
22-
var reg = new AsyncRegister(taskList);
23-
try
22+
using (var reg = new AsyncRegister(taskList))
2423
{
2524
int counter = 0;
2625
while (counter < maxCount)
@@ -33,10 +32,6 @@ public async Task WaitingOfAsyncRegister()
3332
counter += 1;
3433
}
3534
}
36-
finally
37-
{
38-
reg.Dispose();
39-
}
4035
};
4136

4237
var task1 = func(3);
@@ -52,6 +47,51 @@ public async Task WaitingOfAsyncRegister()
5247
Assert.AreEqual(3 + 6, resultCounter);
5348
}
5449

50+
[Test]
51+
public async Task CancellationOfAsyncRegister()
52+
{
53+
var taskList = new FiberAndTaskPairList();
54+
int resultCounter = 0;
55+
int exceptionCounter = 0;
56+
var lockObj = new object();
57+
var cts = new CancellationTokenSource();
58+
var cancellationToken = cts.Token;
59+
60+
var func = new Func<Task>(async () =>
61+
{
62+
using (var reg = new AsyncRegister(taskList, cancellationToken))
63+
{
64+
try
65+
{
66+
while (true)
67+
{
68+
await reg.WaitSetting();
69+
lock (lockObj)
70+
{
71+
resultCounter += 1;
72+
}
73+
}
74+
}
75+
catch (OperationCanceledException)
76+
{
77+
exceptionCounter += 1;
78+
}
79+
}
80+
});
81+
82+
var task1 = func();
83+
84+
var fiber = new PoolFiber();
85+
await taskList.InvokeSequentialAsync(fiber);
86+
await taskList.InvokeSequentialAsync(fiber);
87+
await taskList.InvokeSequentialAsync(fiber);
88+
cts.Cancel();
89+
90+
await task1;
91+
Assert.AreEqual(3, resultCounter);
92+
Assert.AreEqual(1, exceptionCounter);
93+
}
94+
5595
[Test]
5696
public async Task WaitingOfAsyncRegisterOfT()
5797
{
@@ -61,8 +101,7 @@ public async Task WaitingOfAsyncRegisterOfT()
61101

62102
Func<int, Task> func = async (maxCount) =>
63103
{
64-
var reg = new AsyncRegister<int>(handlerList);
65-
try
104+
using (var reg = new AsyncRegister<int>(handlerList))
66105
{
67106
int counter = 0;
68107
while (counter < maxCount)
@@ -75,10 +114,6 @@ public async Task WaitingOfAsyncRegisterOfT()
75114
counter += 1;
76115
}
77116
}
78-
finally
79-
{
80-
reg.Dispose();
81-
}
82117
};
83118

84119
var task1 = func(3);
@@ -99,6 +134,54 @@ int Sigma(int n)
99134
return n * (n + 1) / 2;
100135
}
101136

137+
[Test]
138+
public async Task CancellationOfAsyncRegisterOfT()
139+
{
140+
var handlerList = new FiberAndHandlerPairList<int>();
141+
int resultCounter = 0;
142+
int exceptionCounter = 0;
143+
int totalCounter = 0;
144+
var lockObj = new object();
145+
var cts = new CancellationTokenSource();
146+
var cancellationToken = cts.Token;
147+
148+
var func = new Func<Task>(async () =>
149+
{
150+
using (var reg = new AsyncRegister<int>(handlerList, cancellationToken))
151+
{
152+
try
153+
{
154+
while (true)
155+
{
156+
var ret = await reg.WaitSetting();
157+
lock (lockObj)
158+
{
159+
totalCounter += ret.Arg;
160+
resultCounter += 1;
161+
}
162+
}
163+
}
164+
catch (OperationCanceledException)
165+
{
166+
exceptionCounter += 1;
167+
}
168+
}
169+
});
170+
171+
var task1 = func();
172+
173+
var fiber = new PoolFiber();
174+
await handlerList.PublishSequentialAsync(1, fiber);
175+
await handlerList.PublishSequentialAsync(2, fiber);
176+
await handlerList.PublishSequentialAsync(3, fiber);
177+
cts.Cancel();
178+
179+
await task1;
180+
Assert.AreEqual(3, resultCounter);
181+
Assert.AreEqual(6, totalCounter);
182+
Assert.AreEqual(1, exceptionCounter);
183+
}
184+
102185
[Test]
103186
public async Task WaitingOfProcessedFlagEventArgs()
104187
{
@@ -109,12 +192,11 @@ public async Task WaitingOfProcessedFlagEventArgs()
109192
var cts = new CancellationTokenSource();
110193
Func<int, Task> func = async (threshold) =>
111194
{
112-
var reg = new AsyncRegister<int>(driver);
113-
try
195+
using (var reg = new AsyncRegister<int>(driver, cts.Token))
114196
{
115197
while (true)
116198
{
117-
var e = await reg.WaitSetting(cts.Token);
199+
var e = await reg.WaitSetting();
118200
if (e.Arg < threshold)
119201
{
120202
e.Processed = false;
@@ -129,10 +211,6 @@ public async Task WaitingOfProcessedFlagEventArgs()
129211
}
130212
}
131213
}
132-
finally
133-
{
134-
reg.Dispose();
135-
}
136214
};
137215

138216
var task1 = func(7);

0 commit comments

Comments
 (0)