Skip to content

Commit c9aa66b

Browse files
committed
Add QueueAsync.
1 parent f1e6b46 commit c9aa66b

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using AsyncFiberWorks.Core;
2+
using System;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace AsyncFiberWorks.Threading
7+
{
8+
/// <summary>
9+
/// Extension of thread pool.
10+
/// </summary>
11+
public static class ThreadPoolExtensions
12+
{
13+
/// <summary>
14+
/// Enqueue an action. Then returns a task to wait for the completion of the action.
15+
/// </summary>
16+
/// <param name="threadPool">A thread pool.</param>
17+
/// <param name="callback"></param>
18+
/// <returns>A task that waits until a given action is finished.</returns>
19+
public static async Task QueueAsync(this IThreadPool threadPool, WaitCallback callback)
20+
{
21+
var tcs = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously);
22+
threadPool.Queue((_) =>
23+
{
24+
try
25+
{
26+
callback(null);
27+
}
28+
catch (Exception ex)
29+
{
30+
tcs.SetException(ex);
31+
return;
32+
}
33+
tcs.SetResult(0);
34+
});
35+
await tcs.Task.ConfigureAwait(false);
36+
}
37+
38+
/// <summary>
39+
/// Enqueue an action. Then returns a task to wait for the completion of the action.
40+
/// </summary>
41+
/// <param name="threadPool">A thread pool.</param>
42+
/// <param name="action">An action.</param>
43+
/// <returns>A task that waits until a given action is finished.</returns>
44+
public static async Task QueueAsync(this IThreadPool threadPool, Action action)
45+
{
46+
await threadPool.QueueAsync((_) => action()).ConfigureAwait(false);
47+
}
48+
}
49+
}

src/AsyncFiberWorksTests/ThreadPoolTests.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using AsyncFiberWorks.Core;
88
using AsyncFiberWorks.Fibers;
99
using AsyncFiberWorks.Threading;
10+
using System;
1011

1112
namespace AsyncFiberWorksTests
1213
{
@@ -198,5 +199,39 @@ public void PoolFiberWithUserThreadPool()
198199
}
199200
}
200201
}
202+
203+
[Test, TestCaseSource(nameof(ThreadPoolCreators))]
204+
public async Task ThreadPoolQueueAsync(Func<IThreadPool> poolCreator)
205+
{
206+
long value = 0;
207+
var threadPool = poolCreator();
208+
var t1 = threadPool.QueueAsync(() =>
209+
{
210+
value = 1;
211+
Thread.Sleep(150);
212+
value += 1;
213+
});
214+
var t2 = threadPool.QueueAsync(() =>
215+
{
216+
Thread.Sleep(50);
217+
value *= 2;
218+
});
219+
var t3 = threadPool.QueueAsync(() =>
220+
{
221+
Thread.Sleep(100);
222+
value *= 100;
223+
});
224+
225+
await Task.WhenAll(t1, t2, t3).ConfigureAwait(false);
226+
Assert.AreEqual(201, value);
227+
}
228+
229+
230+
static object[] ThreadPoolCreators =
231+
{
232+
new object[] { (Func<IThreadPool>)(() => DefaultThreadPool.Instance) },
233+
new object[] { (Func<IThreadPool>)(() => UserThreadPool.StartNew(3)) },
234+
};
235+
201236
}
202237
}

0 commit comments

Comments
 (0)