From 165230fe4c67d0f6e93b5668f9e1d634f0ceef5b Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Sat, 12 Mar 2022 23:48:13 +0000 Subject: [PATCH 1/6] add experimental mars io task scheduler --- .../src/Microsoft.Data.SqlClient.csproj | 3 + .../Data/SqlClient/SNI/SNIMarsConnection.cs | 49 +- .../Microsoft/Data/SqlClient/SNI/SNIPacket.cs | 2 +- .../Data/SqlClient/SNI/SNITaskScheduler.cs | 606 ++++++++++++++++++ .../Data/SqlClient/LocalAppContextSwitches.cs | 21 +- 5 files changed, 666 insertions(+), 15 deletions(-) create mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index da5a2d1062..814ec8165b 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -931,6 +931,9 @@ + + + Microsoft.Data.SqlClient.SqlMetaData.xml diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs index 395cfed4be..29d653a4e3 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Threading; +using System.Threading.Tasks; namespace Microsoft.Data.SqlClient.SNI { @@ -16,16 +17,21 @@ internal class SNIMarsConnection { private const string s_className = nameof(SNIMarsConnection); + private static QueuedTaskScheduler s_scheduler; + private TaskFactory s_factory; + private readonly Guid _connectionId = Guid.NewGuid(); private readonly Dictionary _sessions = new Dictionary(); private readonly byte[] _headerBytes = new byte[SNISMUXHeader.HEADER_LENGTH]; private readonly SNISMUXHeader _currentHeader = new SNISMUXHeader(); private SNIHandle _lowerHandle; - private ushort _nextSessionId = 0; + private int _nextSessionId; private int _currentHeaderByteCount = 0; private int _dataBytesLeft = 0; private SNIPacket _currentPacket; + + /// /// Connection ID /// @@ -45,6 +51,8 @@ public Guid ConnectionId /// Lower handle public SNIMarsConnection(SNIHandle lowerHandle) { + + _nextSessionId = -1; _lowerHandle = lowerHandle; SqlClientEventSource.Log.TrySNITraceEvent(s_className, EventType.INFO, "Created MARS Session Id {0}", args0: ConnectionId); _lowerHandle.SetAsyncCallbacks(HandleReceiveComplete, HandleSendComplete); @@ -54,7 +62,8 @@ public SNIMarsHandle CreateMarsSession(object callbackObject, bool async) { lock (this) { - ushort sessionId = _nextSessionId++; + ushort sessionId = unchecked((ushort)(Interlocked.Increment(ref _nextSessionId) % ushort.MaxValue)); + SNIMarsHandle handle = new SNIMarsHandle(this, sessionId, callbackObject, async); _sessions.Add(sessionId, handle); SqlClientEventSource.Log.TrySNITraceEvent(s_className, EventType.INFO, "MARS Session Id {0}, SNI MARS Handle Id {1}, created new MARS Session {2}", args0: ConnectionId, args1: handle?.ConnectionId, args2: sessionId); @@ -68,23 +77,39 @@ public SNIMarsHandle CreateMarsSession(object callbackObject, bool async) /// public uint StartReceive() { - long scopeID = SqlClientEventSource.Log.TrySNIScopeEnterEvent(s_className); - try + using (TrySNIEventScope.Create(nameof(SNIMarsConnection))) + { + if (LocalAppContextSwitches.UseExperimentalMARSThreading +#if NETCOREAPP31_AND_ABOVE + && ThreadPool.PendingWorkItemCount>0 +#endif + ) + { + LazyInitializer.EnsureInitialized(ref s_scheduler, () => new QueuedTaskScheduler(3, "MARSIOScheduler", false, ThreadPriority.AboveNormal)); + LazyInitializer.EnsureInitialized(ref s_factory, () => new TaskFactory(s_scheduler)); + + // will start an async task on the scheduler and immediatley return so this await is safe + return s_factory.StartNew(StartAsyncReceiveLoopForConnection, this).GetAwaiter().GetResult(); + } + else + { + return StartAsyncReceiveLoopForConnection(this); + } + } + + static uint StartAsyncReceiveLoopForConnection(object state) { + SNIMarsConnection connection = (SNIMarsConnection)state; SNIPacket packet = null; - if (ReceiveAsync(ref packet) == TdsEnums.SNI_SUCCESS_IO_PENDING) + if (connection.ReceiveAsync(ref packet) == TdsEnums.SNI_SUCCESS_IO_PENDING) { - SqlClientEventSource.Log.TrySNITraceEvent(s_className, EventType.INFO, "MARS Session Id {0}, Success IO pending.", args0: ConnectionId); + SqlClientEventSource.Log.TrySNITraceEvent(nameof(SNIMarsConnection), EventType.INFO, "MARS Session Id {0}, Success IO pending.", args0: connection.ConnectionId); return TdsEnums.SNI_SUCCESS_IO_PENDING; } - SqlClientEventSource.Log.TrySNITraceEvent(s_className, EventType.ERR, "MARS Session Id {0}, Connection not usable.", args0: ConnectionId); + SqlClientEventSource.Log.TrySNITraceEvent(nameof(SNIMarsConnection), EventType.ERR, "MARS Session Id {0}, Connection not usable.", args0: connection.ConnectionId); return SNICommon.ReportSNIError(SNIProviders.SMUX_PROV, 0, SNICommon.ConnNotUsableError, Strings.SNI_ERROR_19); - } - finally - { - SqlClientEventSource.Log.TrySNIScopeLeaveEvent(scopeID); - } + }; } /// diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIPacket.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIPacket.cs index 58ac68c7c4..8120e346b9 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIPacket.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIPacket.cs @@ -282,7 +282,7 @@ public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback) state: callback, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, - TaskScheduler.Default + TaskScheduler.Current // specifically continue on the current scheduler because we may override it for mars ); } diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs new file mode 100644 index 0000000000..c844f9050c --- /dev/null +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs @@ -0,0 +1,606 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Data.SqlClient.SNI +{ + /// + /// Provides a TaskScheduler that provides control over priorities, fairness, and the underlying threads utilized. + /// + [DebuggerTypeProxy(typeof(QueuedTaskSchedulerDebugView))] + [DebuggerDisplay("Id={Id}, Queues={DebugQueueCount}, ScheduledTasks = {DebugTaskCount}")] + public sealed class QueuedTaskScheduler : TaskScheduler, IDisposable + { + /// Debug view for the QueuedTaskScheduler. + private class QueuedTaskSchedulerDebugView + { + /// The scheduler. + private readonly QueuedTaskScheduler _scheduler; + + /// Initializes the debug view. + /// The scheduler. + public QueuedTaskSchedulerDebugView(QueuedTaskScheduler scheduler) => + _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); + + /// Gets all of the Tasks queued to the scheduler directly. + public IEnumerable ScheduledTasks + { + get + { + var tasks = (_scheduler._targetScheduler != null) ? + (IEnumerable)_scheduler._nonthreadsafeTaskQueue : + _scheduler._blockingTaskQueue; + return tasks.Where(t => t != null).ToList(); + } + } + + /// Gets the prioritized and fair queues. + public IEnumerable Queues + { + get + { + List queues = new List(); + foreach (var group in _scheduler._queueGroups) + queues.AddRange(group.Value.Cast()); + return queues; + } + } + } + + /// + /// A sorted list of round-robin queue lists. Tasks with the smallest priority value + /// are preferred. Priority groups are round-robin'd through in order of priority. + /// + private readonly SortedList _queueGroups = new SortedList(); + /// Cancellation token used for disposal. + private readonly CancellationTokenSource _disposeCancellation = new CancellationTokenSource(); + /// + /// The maximum allowed concurrency level of this scheduler. If custom threads are + /// used, this represents the number of created threads. + /// + private readonly int _concurrencyLevel; + /// Whether we're processing tasks on the current thread. + private static readonly ThreadLocal s_taskProcessingThread = new ThreadLocal(); + + // *** + // *** For when using a target scheduler + // *** + + /// The scheduler onto which actual work is scheduled. + private readonly TaskScheduler _targetScheduler; + /// The queue of tasks to process when using an underlying target scheduler. + private readonly Queue _nonthreadsafeTaskQueue; + /// The number of Tasks that have been queued or that are running whiel using an underlying scheduler. + private int _delegatesQueuedOrRunning = 0; + + // *** + // *** For when using our own threads + // *** + + /// The threads used by the scheduler to process work. + private readonly Thread[] _threads; + /// The collection of tasks to be executed on our custom threads. + private readonly BlockingCollection _blockingTaskQueue; + + // *** + + /// Initializes the scheduler. + public QueuedTaskScheduler() : this(Default, 0) { } + + /// Initializes the scheduler. + /// The target underlying scheduler onto which this sceduler's work is queued. + public QueuedTaskScheduler(TaskScheduler targetScheduler) : this(targetScheduler, 0) { } + + /// Initializes the scheduler. + /// The target underlying scheduler onto which this sceduler's work is queued. + /// The maximum degree of concurrency allowed for this scheduler's work. + public QueuedTaskScheduler( + TaskScheduler targetScheduler, + int maxConcurrencyLevel) + { + if (maxConcurrencyLevel < 0) + throw new ArgumentOutOfRangeException(nameof(maxConcurrencyLevel)); + + // Initialize only those fields relevant to use an underlying scheduler. We don't + // initialize the fields relevant to using our own custom threads. + _targetScheduler = targetScheduler ?? throw new ArgumentNullException("underlyingScheduler"); + _nonthreadsafeTaskQueue = new Queue(); + + // If 0, use the number of logical processors. But make sure whatever value we pick + // is not greater than the degree of parallelism allowed by the underlying scheduler. + _concurrencyLevel = maxConcurrencyLevel != 0 ? maxConcurrencyLevel : Environment.ProcessorCount; + if (targetScheduler.MaximumConcurrencyLevel > 0 && + targetScheduler.MaximumConcurrencyLevel < _concurrencyLevel) + { + _concurrencyLevel = targetScheduler.MaximumConcurrencyLevel; + } + } + + /// Initializes the scheduler. + /// The number of threads to create and use for processing work items. + public QueuedTaskScheduler(int threadCount) : this(threadCount, string.Empty, false, ThreadPriority.Normal, ApartmentState.MTA, 0, null, null) { } + + /// Initializes the scheduler. + /// The number of threads to create and use for processing work items. + /// The name to use for each of the created threads. + /// A Boolean value that indicates whether to use foreground threads instead of background. + /// The priority to assign to each thread. + /// The apartment state to use for each thread. + /// The stack size to use for each thread. + /// An initialization routine to run on each thread. + /// A finalization routine to run on each thread. + public QueuedTaskScheduler( + int threadCount, + string threadName = "", + bool useForegroundThreads = false, + ThreadPriority threadPriority = ThreadPriority.Normal, + ApartmentState threadApartmentState = ApartmentState.MTA, + int threadMaxStackSize = 0, + Action threadInit = null, + Action threadFinally = null) + { + // Validates arguments (some validation is left up to the Thread type itself). + // If the thread count is 0, default to the number of logical processors. + if (threadCount < 0) + throw new ArgumentOutOfRangeException(nameof(threadCount)); + else if (threadCount == 0) + _concurrencyLevel = Environment.ProcessorCount; + else + _concurrencyLevel = threadCount; + + // Initialize the queue used for storing tasks + _blockingTaskQueue = new BlockingCollection(); + + // Create all of the threads + _threads = new Thread[threadCount]; + for (int i = 0; i < threadCount; i++) + { + _threads[i] = new Thread(() => ThreadBasedDispatchLoop(threadInit, threadFinally), threadMaxStackSize) + { + Priority = threadPriority, + IsBackground = !useForegroundThreads, + }; + if (threadName != null) + _threads[i].Name = threadName + " (" + i + ")"; + _threads[i].SetApartmentState(threadApartmentState); + } + + // Start all of the threads + foreach (var thread in _threads) + thread.Start(); + } + + /// The dispatch loop run by all threads in this scheduler. + /// An initialization routine to run when the thread begins. + /// A finalization routine to run before the thread ends. + private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) + { + s_taskProcessingThread.Value = true; + threadInit?.Invoke(); + try + { + // If the scheduler is disposed, the cancellation token will be set and + // we'll receive an OperationCanceledException. That OCE should not crash the process. + try + { + // If a thread abort occurs, we'll try to reset it and continue running. + while (true) + { + try + { + // For each task queued to the scheduler, try to execute it. + foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)) + { + // If the task is not null, that means it was queued to this scheduler directly. + // Run it. + if (task != null) + { + TryExecuteTask(task); + } + // If the task is null, that means it's just a placeholder for a task + // queued to one of the subschedulers. Find the next task based on + // priority and fairness and run it. + else + { + // Find the next task based on our ordering rules... + Task targetTask; + QueuedTaskSchedulerQueue queueForTargetTask; + lock (_queueGroups) + FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); + + // ... and if we found one, run it + if (targetTask != null) + queueForTargetTask.ExecuteTask(targetTask); + } + } + } + catch (ThreadAbortException) + { + // If we received a thread abort, and that thread abort was due to shutting down + // or unloading, let it pass through. Otherwise, reset the abort so we can + // continue processing work items. + if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload()) + { + Thread.ResetAbort(); + } + } + } + } + catch (OperationCanceledException) { } + } + finally + { + // Run a cleanup routine if there was one + threadFinally?.Invoke(); + s_taskProcessingThread.Value = false; + } + } + + /// Gets the number of queues currently activated. + private int DebugQueueCount + { + get + { + int count = 0; + foreach (var group in _queueGroups) + count += group.Value.Count; + return count; + } + } + + /// Gets the number of tasks currently scheduled. + private int DebugTaskCount => (_targetScheduler != null ? + (IEnumerable)_nonthreadsafeTaskQueue : (IEnumerable)_blockingTaskQueue) + .Where(t => t != null).Count(); + + /// Find the next task that should be executed, based on priorities and fairness and the like. + /// The found task, or null if none was found. + /// + /// The scheduler associated with the found task. Due to security checks inside of TPL, + /// this scheduler needs to be used to execute that task. + /// + private void FindNextTask_NeedsLock(out Task targetTask, out QueuedTaskSchedulerQueue queueForTargetTask) + { + targetTask = null; + queueForTargetTask = null; + + // Look through each of our queue groups in sorted order. + // This ordering is based on the priority of the queues. + foreach (var queueGroup in _queueGroups) + { + var queues = queueGroup.Value; + + // Within each group, iterate through the queues in a round-robin + // fashion. Every time we iterate again and successfully find a task, + // we'll start in the next location in the group. + foreach (int i in queues.CreateSearchOrder()) + { + queueForTargetTask = queues[i]; + var items = queueForTargetTask._workItems; + if (items.Count > 0) + { + targetTask = items.Dequeue(); + if (queueForTargetTask._disposed && items.Count == 0) + { + RemoveQueue_NeedsLock(queueForTargetTask); + } + queues.NextQueueIndex = (queues.NextQueueIndex + 1) % queueGroup.Value.Count; + return; + } + } + } + } + + /// Queues a task to the scheduler. + /// The task to be queued. + protected override void QueueTask(Task task) + { + // If we've been disposed, no one should be queueing + if (_disposeCancellation.IsCancellationRequested) + throw new ObjectDisposedException(GetType().Name); + + // If the target scheduler is null (meaning we're using our own threads), + // add the task to the blocking queue + if (_targetScheduler == null) + { + _blockingTaskQueue.Add(task); + } + // Otherwise, add the task to the non-blocking queue, + // and if there isn't already an executing processing task, + // start one up + else + { + // Queue the task and check whether we should launch a processing + // task (noting it if we do, so that other threads don't result + // in queueing up too many). + bool launchTask = false; + lock (_nonthreadsafeTaskQueue) + { + _nonthreadsafeTaskQueue.Enqueue(task); + if (_delegatesQueuedOrRunning < _concurrencyLevel) + { + ++_delegatesQueuedOrRunning; + launchTask = true; + } + } + + // If necessary, start processing asynchronously + if (launchTask) + { + Task.Factory.StartNew(ProcessPrioritizedAndBatchedTasks, + CancellationToken.None, TaskCreationOptions.None, _targetScheduler); + } + } + } + + /// + /// Process tasks one at a time in the best order. + /// This should be run in a Task generated by QueueTask. + /// It's been separated out into its own method to show up better in Parallel Tasks. + /// + private void ProcessPrioritizedAndBatchedTasks() + { + bool continueProcessing = true; + while (!_disposeCancellation.IsCancellationRequested && continueProcessing) + { + try + { + // Note that we're processing tasks on this thread + s_taskProcessingThread.Value = true; + + // Until there are no more tasks to process + while (!_disposeCancellation.IsCancellationRequested) + { + // Try to get the next task. If there aren't any more, we're done. + Task targetTask; + lock (_nonthreadsafeTaskQueue) + { + if (_nonthreadsafeTaskQueue.Count == 0) + break; + targetTask = _nonthreadsafeTaskQueue.Dequeue(); + } + + // If the task is null, it's a placeholder for a task in the round-robin queues. + // Find the next one that should be processed. + QueuedTaskSchedulerQueue queueForTargetTask = null; + if (targetTask == null) + { + lock (_queueGroups) + FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); + } + + // Now if we finally have a task, run it. If the task + // was associated with one of the round-robin schedulers, we need to use it + // as a thunk to execute its task. + if (targetTask != null) + { + if (queueForTargetTask != null) + queueForTargetTask.ExecuteTask(targetTask); + else + TryExecuteTask(targetTask); + } + } + } + finally + { + // Now that we think we're done, verify that there really is + // no more work to do. If there's not, highlight + // that we're now less parallel than we were a moment ago. + lock (_nonthreadsafeTaskQueue) + { + if (_nonthreadsafeTaskQueue.Count == 0) + { + _delegatesQueuedOrRunning--; + continueProcessing = false; + s_taskProcessingThread.Value = false; + } + } + } + } + } + + /// Notifies the pool that there's a new item to be executed in one of the round-robin queues. + private void NotifyNewWorkItem() => QueueTask(null); + + /// Tries to execute a task synchronously on the current thread. + /// The task to execute. + /// Whether the task was previously queued. + /// true if the task was executed; otherwise, false. + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => + // If we're already running tasks on this threads, enable inlining + s_taskProcessingThread.Value && TryExecuteTask(task); + + /// Gets the tasks scheduled to this scheduler. + /// An enumerable of all tasks queued to this scheduler. + /// This does not include the tasks on sub-schedulers. Those will be retrieved by the debugger separately. + protected override IEnumerable GetScheduledTasks() + { + // If we're running on our own threads, get the tasks from the blocking queue... + if (_targetScheduler == null) + { + // Get all of the tasks, filtering out nulls, which are just placeholders + // for tasks in other sub-schedulers + return _blockingTaskQueue.Where(t => t != null).ToList(); + } + // otherwise get them from the non-blocking queue... + else + { + return _nonthreadsafeTaskQueue.Where(t => t != null).ToList(); + } + } + + /// Gets the maximum concurrency level to use when processing tasks. + public override int MaximumConcurrencyLevel => _concurrencyLevel; + + /// Initiates shutdown of the scheduler. + public void Dispose() => _disposeCancellation.Cancel(); + + /// Creates and activates a new scheduling queue for this scheduler. + /// The newly created and activated queue at priority 0. + public TaskScheduler ActivateNewQueue() => ActivateNewQueue(0); + + /// Creates and activates a new scheduling queue for this scheduler. + /// The priority level for the new queue. + /// The newly created and activated queue at the specified priority. + public TaskScheduler ActivateNewQueue(int priority) + { + // Create the queue + var createdQueue = new QueuedTaskSchedulerQueue(priority, this); + + // Add the queue to the appropriate queue group based on priority + lock (_queueGroups) + { + if (!_queueGroups.TryGetValue(priority, out QueueGroup list)) + { + list = new QueueGroup(); + _queueGroups.Add(priority, list); + } + list.Add(createdQueue); + } + + // Hand the new queue back + return createdQueue; + } + + /// Removes a scheduler from the group. + /// The scheduler to be removed. + private void RemoveQueue_NeedsLock(QueuedTaskSchedulerQueue queue) + { + // Find the group that contains the queue and the queue's index within the group + var queueGroup = _queueGroups[queue._priority]; + int index = queueGroup.IndexOf(queue); + + // We're about to remove the queue, so adjust the index of the next + // round-robin starting location if it'll be affected by the removal + if (queueGroup.NextQueueIndex >= index) + queueGroup.NextQueueIndex--; + + // Remove it + queueGroup.RemoveAt(index); + } + + /// A group of queues a the same priority level. + private class QueueGroup : List + { + /// The starting index for the next round-robin traversal. + public int NextQueueIndex = 0; + + /// Creates a search order through this group. + /// An enumerable of indices for this group. + public IEnumerable CreateSearchOrder() + { + for (int i = NextQueueIndex; i < Count; i++) + yield return i; + for (int i = 0; i < NextQueueIndex; i++) + yield return i; + } + } + + /// Provides a scheduling queue associatd with a QueuedTaskScheduler. + [DebuggerDisplay("QueuePriority = {_priority}, WaitingTasks = {WaitingTasks}")] + [DebuggerTypeProxy(typeof(QueuedTaskSchedulerQueueDebugView))] + private sealed class QueuedTaskSchedulerQueue : TaskScheduler, IDisposable + { + /// A debug view for the queue. + private sealed class QueuedTaskSchedulerQueueDebugView + { + /// The queue. + private readonly QueuedTaskSchedulerQueue _queue; + + /// Initializes the debug view. + /// The queue to be debugged. + public QueuedTaskSchedulerQueueDebugView(QueuedTaskSchedulerQueue queue) => + _queue = queue ?? throw new ArgumentNullException(nameof(queue)); + + /// Gets the priority of this queue in its associated scheduler. + public int Priority => _queue._priority; + /// Gets the ID of this scheduler. + public int Id => _queue.Id; + /// Gets all of the tasks scheduled to this queue. + public IEnumerable ScheduledTasks => _queue.GetScheduledTasks(); + /// Gets the QueuedTaskScheduler with which this queue is associated. + public QueuedTaskScheduler AssociatedScheduler => _queue._pool; + } + + /// The scheduler with which this pool is associated. + private readonly QueuedTaskScheduler _pool; + /// The work items stored in this queue. + internal readonly Queue _workItems; + /// Whether this queue has been disposed. + internal bool _disposed; + /// Gets the priority for this queue. + internal int _priority; + + /// Initializes the queue. + /// The priority associated with this queue. + /// The scheduler with which this queue is associated. + internal QueuedTaskSchedulerQueue(int priority, QueuedTaskScheduler pool) + { + _priority = priority; + _pool = pool; + _workItems = new Queue(); + } + + /// Gets the number of tasks waiting in this scheduler. + internal int WaitingTasks => _workItems.Count; + + /// Gets the tasks scheduled to this scheduler. + /// An enumerable of all tasks queued to this scheduler. + protected override IEnumerable GetScheduledTasks() => _workItems.ToList(); + + /// Queues a task to the scheduler. + /// The task to be queued. + protected override void QueueTask(Task task) + { + if (_disposed) + throw new ObjectDisposedException(GetType().Name); + + // Queue up the task locally to this queue, and then notify + // the parent scheduler that there's work available + lock (_pool._queueGroups) + _workItems.Enqueue(task); + _pool.NotifyNewWorkItem(); + } + + /// Tries to execute a task synchronously on the current thread. + /// The task to execute. + /// Whether the task was previously queued. + /// true if the task was executed; otherwise, false. + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => + // If we're using our own threads and if this is being called from one of them, + // or if we're currently processing another task on this thread, try running it inline. + s_taskProcessingThread.Value && TryExecuteTask(task); + + /// Runs the specified ask. + /// The task to execute. + internal void ExecuteTask(Task task) => TryExecuteTask(task); + + /// Gets the maximum concurrency level to use when processing tasks. + public override int MaximumConcurrencyLevel => _pool.MaximumConcurrencyLevel; + + /// Signals that the queue should be removed from the scheduler as soon as the queue is empty. + public void Dispose() + { + if (!_disposed) + { + lock (_pool._queueGroups) + { + // We only remove the queue if it's empty. If it's not empty, + // we still mark it as disposed, and the associated QueuedTaskScheduler + // will remove the queue when its count hits 0 and its _disposed is true. + if (_workItems.Count == 0) + { + _pool.RemoveQueue_NeedsLock(this); + } + } + _disposed = true; + } + } + } + } +} diff --git a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs index 8e390b21d6..b366359bdd 100644 --- a/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/LocalAppContextSwitches.cs @@ -15,13 +15,16 @@ internal static partial class LocalAppContextSwitches internal const string LegacyRowVersionNullString = @"Switch.Microsoft.Data.SqlClient.LegacyRowVersionNullBehavior"; internal const string UseSystemDefaultSecureProtocolsString = @"Switch.Microsoft.Data.SqlClient.UseSystemDefaultSecureProtocols"; internal const string SuppressInsecureTLSWarningString = @"Switch.Microsoft.Data.SqlClient.SuppressInsecureTLSWarning"; + internal const string UseExperimentalMARSThreadingString = @"Switch.Microsoft.Data.SqlClient.UseExperimentalMARSThreading"; private static bool s_makeReadAsyncBlocking; private static bool? s_LegacyRowVersionNullBehavior; private static bool? s_UseSystemDefaultSecureProtocols; - private static bool? s_SuppressInsecureTLSWarning; + private static bool? s_SuppressInsecureTLSWarning; + private static bool? s_useExperimentalMARSThreading; -#if !NETFRAMEWORK + +#if NETCOREAPP31_AND_ABOVE static LocalAppContextSwitches() { IAppContextSwitchOverridesSection appContextSwitch = AppConfigManager.FetchConfigurationSection(AppContextSwitchOverridesSection.Name); @@ -95,5 +98,19 @@ public static bool UseSystemDefaultSecureProtocols return s_UseSystemDefaultSecureProtocols.Value; } } + + public static bool UseExperimentalMARSThreading + { + get + { + if (s_useExperimentalMARSThreading is null) + { + bool result; + result = AppContext.TryGetSwitch(UseExperimentalMARSThreadingString, out result) ? result : false; + s_useExperimentalMARSThreading = result; + } + return s_useExperimentalMARSThreading.Value; + } + } } } From 3607a4067bb039b168e986841aea4f435d1fd514 Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Sat, 26 Mar 2022 00:11:06 +0000 Subject: [PATCH 2/6] address feedback --- .../src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs index 29d653a4e3..9da19db790 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs @@ -81,7 +81,7 @@ public uint StartReceive() { if (LocalAppContextSwitches.UseExperimentalMARSThreading #if NETCOREAPP31_AND_ABOVE - && ThreadPool.PendingWorkItemCount>0 + && ThreadPool.PendingWorkItemCount > 0 #endif ) { From b693179104f63be9a25fd661dff74ea446779614 Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Sat, 2 Apr 2022 01:20:13 +0100 Subject: [PATCH 3/6] add extra debugging --- .../Data/SqlClient/SNI/SNIMarsConnection.cs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs index 9da19db790..10f3476a43 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs @@ -85,6 +85,18 @@ public uint StartReceive() #endif ) { + // for debugging only, remove this + if (s_scheduler is null) + { + AppDomain.CurrentDomain.FirstChanceException += static (object sender, System.Runtime.ExceptionServices.FirstChanceExceptionEventArgs e) => + { + if (e is not null && e.Exception is PlatformNotSupportedException pnse) + { + SqlClientEventSource.Log.SNITrace("PNSE in SNI: " + Environment.NewLine + pnse.StackTrace); + Console.WriteLine("PNSE in SNI: " + Environment.NewLine + pnse.StackTrace); + } + }; + } LazyInitializer.EnsureInitialized(ref s_scheduler, () => new QueuedTaskScheduler(3, "MARSIOScheduler", false, ThreadPriority.AboveNormal)); LazyInitializer.EnsureInitialized(ref s_factory, () => new TaskFactory(s_scheduler)); @@ -112,6 +124,8 @@ static uint StartAsyncReceiveLoopForConnection(object state) }; } + + /// /// Send a packet synchronously /// From c3825ba69c4f0061d83cc6d025cac7b121efa8f5 Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Mon, 4 Apr 2022 21:37:21 +0100 Subject: [PATCH 4/6] remove apartment state from scheduler --- .../src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs index c844f9050c..423b07e2d0 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs @@ -123,14 +123,13 @@ public QueuedTaskScheduler( /// Initializes the scheduler. /// The number of threads to create and use for processing work items. - public QueuedTaskScheduler(int threadCount) : this(threadCount, string.Empty, false, ThreadPriority.Normal, ApartmentState.MTA, 0, null, null) { } + public QueuedTaskScheduler(int threadCount) : this(threadCount, string.Empty, false, ThreadPriority.Normal, 0, null, null) { } /// Initializes the scheduler. /// The number of threads to create and use for processing work items. /// The name to use for each of the created threads. /// A Boolean value that indicates whether to use foreground threads instead of background. /// The priority to assign to each thread. - /// The apartment state to use for each thread. /// The stack size to use for each thread. /// An initialization routine to run on each thread. /// A finalization routine to run on each thread. @@ -139,7 +138,6 @@ public QueuedTaskScheduler( string threadName = "", bool useForegroundThreads = false, ThreadPriority threadPriority = ThreadPriority.Normal, - ApartmentState threadApartmentState = ApartmentState.MTA, int threadMaxStackSize = 0, Action threadInit = null, Action threadFinally = null) @@ -167,7 +165,6 @@ public QueuedTaskScheduler( }; if (threadName != null) _threads[i].Name = threadName + " (" + i + ")"; - _threads[i].SetApartmentState(threadApartmentState); } // Start all of the threads From 898c5379abe9558e8ab745b7f9da326d99b8cf02 Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Thu, 7 Apr 2022 01:29:40 +0100 Subject: [PATCH 5/6] make scheduler internal and force use of scheduler --- .../Data/SqlClient/SNI/SNIMarsConnection.cs | 18 +++--------------- .../Data/SqlClient/SNI/SNITaskScheduler.cs | 2 +- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs index 10f3476a43..f944255348 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs @@ -80,23 +80,11 @@ public uint StartReceive() using (TrySNIEventScope.Create(nameof(SNIMarsConnection))) { if (LocalAppContextSwitches.UseExperimentalMARSThreading -#if NETCOREAPP31_AND_ABOVE - && ThreadPool.PendingWorkItemCount > 0 -#endif +//#if NETCOREAPP31_AND_ABOVE +// && ThreadPool.PendingWorkItemCount > 0 +//#endif ) { - // for debugging only, remove this - if (s_scheduler is null) - { - AppDomain.CurrentDomain.FirstChanceException += static (object sender, System.Runtime.ExceptionServices.FirstChanceExceptionEventArgs e) => - { - if (e is not null && e.Exception is PlatformNotSupportedException pnse) - { - SqlClientEventSource.Log.SNITrace("PNSE in SNI: " + Environment.NewLine + pnse.StackTrace); - Console.WriteLine("PNSE in SNI: " + Environment.NewLine + pnse.StackTrace); - } - }; - } LazyInitializer.EnsureInitialized(ref s_scheduler, () => new QueuedTaskScheduler(3, "MARSIOScheduler", false, ThreadPriority.AboveNormal)); LazyInitializer.EnsureInitialized(ref s_factory, () => new TaskFactory(s_scheduler)); diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs index 423b07e2d0..9b03eb6648 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs @@ -14,7 +14,7 @@ namespace Microsoft.Data.SqlClient.SNI /// [DebuggerTypeProxy(typeof(QueuedTaskSchedulerDebugView))] [DebuggerDisplay("Id={Id}, Queues={DebugQueueCount}, ScheduledTasks = {DebugTaskCount}")] - public sealed class QueuedTaskScheduler : TaskScheduler, IDisposable + internal sealed class QueuedTaskScheduler : TaskScheduler, IDisposable { /// Debug view for the QueuedTaskScheduler. private class QueuedTaskSchedulerDebugView From 7dd268a484d903891d323445c0117a798baeb3df Mon Sep 17 00:00:00 2001 From: Wraith2 Date: Tue, 17 May 2022 18:35:29 +0100 Subject: [PATCH 6/6] simplify scheduler --- .../Data/SqlClient/SNI/SNIMarsConnection.cs | 6 +- .../Data/SqlClient/SNI/SNITaskScheduler.cs | 454 +----------------- 2 files changed, 16 insertions(+), 444 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs index f944255348..4523908979 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNIMarsConnection.cs @@ -18,7 +18,6 @@ internal class SNIMarsConnection private const string s_className = nameof(SNIMarsConnection); private static QueuedTaskScheduler s_scheduler; - private TaskFactory s_factory; private readonly Guid _connectionId = Guid.NewGuid(); private readonly Dictionary _sessions = new Dictionary(); @@ -85,11 +84,10 @@ public uint StartReceive() //#endif ) { - LazyInitializer.EnsureInitialized(ref s_scheduler, () => new QueuedTaskScheduler(3, "MARSIOScheduler", false, ThreadPriority.AboveNormal)); - LazyInitializer.EnsureInitialized(ref s_factory, () => new TaskFactory(s_scheduler)); + LazyInitializer.EnsureInitialized(ref s_scheduler, () => new QueuedTaskScheduler(10, "MARSIOScheduler", useForegroundThreads: false, ThreadPriority.Normal)); // will start an async task on the scheduler and immediatley return so this await is safe - return s_factory.StartNew(StartAsyncReceiveLoopForConnection, this).GetAwaiter().GetResult(); + return s_scheduler.Factory.StartNew(StartAsyncReceiveLoopForConnection, this).GetAwaiter().GetResult(); } else { diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs index 9b03eb6648..7a56cbc88f 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SNI/SNITaskScheduler.cs @@ -12,51 +12,9 @@ namespace Microsoft.Data.SqlClient.SNI /// /// Provides a TaskScheduler that provides control over priorities, fairness, and the underlying threads utilized. /// - [DebuggerTypeProxy(typeof(QueuedTaskSchedulerDebugView))] [DebuggerDisplay("Id={Id}, Queues={DebugQueueCount}, ScheduledTasks = {DebugTaskCount}")] internal sealed class QueuedTaskScheduler : TaskScheduler, IDisposable { - /// Debug view for the QueuedTaskScheduler. - private class QueuedTaskSchedulerDebugView - { - /// The scheduler. - private readonly QueuedTaskScheduler _scheduler; - - /// Initializes the debug view. - /// The scheduler. - public QueuedTaskSchedulerDebugView(QueuedTaskScheduler scheduler) => - _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); - - /// Gets all of the Tasks queued to the scheduler directly. - public IEnumerable ScheduledTasks - { - get - { - var tasks = (_scheduler._targetScheduler != null) ? - (IEnumerable)_scheduler._nonthreadsafeTaskQueue : - _scheduler._blockingTaskQueue; - return tasks.Where(t => t != null).ToList(); - } - } - - /// Gets the prioritized and fair queues. - public IEnumerable Queues - { - get - { - List queues = new List(); - foreach (var group in _scheduler._queueGroups) - queues.AddRange(group.Value.Cast()); - return queues; - } - } - } - - /// - /// A sorted list of round-robin queue lists. Tasks with the smallest priority value - /// are preferred. Priority groups are round-robin'd through in order of priority. - /// - private readonly SortedList _queueGroups = new SortedList(); /// Cancellation token used for disposal. private readonly CancellationTokenSource _disposeCancellation = new CancellationTokenSource(); /// @@ -67,59 +25,12 @@ public IEnumerable Queues /// Whether we're processing tasks on the current thread. private static readonly ThreadLocal s_taskProcessingThread = new ThreadLocal(); - // *** - // *** For when using a target scheduler - // *** - - /// The scheduler onto which actual work is scheduled. - private readonly TaskScheduler _targetScheduler; - /// The queue of tasks to process when using an underlying target scheduler. - private readonly Queue _nonthreadsafeTaskQueue; - /// The number of Tasks that have been queued or that are running whiel using an underlying scheduler. - private int _delegatesQueuedOrRunning = 0; - - // *** - // *** For when using our own threads - // *** - /// The threads used by the scheduler to process work. private readonly Thread[] _threads; /// The collection of tasks to be executed on our custom threads. private readonly BlockingCollection _blockingTaskQueue; - // *** - - /// Initializes the scheduler. - public QueuedTaskScheduler() : this(Default, 0) { } - - /// Initializes the scheduler. - /// The target underlying scheduler onto which this sceduler's work is queued. - public QueuedTaskScheduler(TaskScheduler targetScheduler) : this(targetScheduler, 0) { } - - /// Initializes the scheduler. - /// The target underlying scheduler onto which this sceduler's work is queued. - /// The maximum degree of concurrency allowed for this scheduler's work. - public QueuedTaskScheduler( - TaskScheduler targetScheduler, - int maxConcurrencyLevel) - { - if (maxConcurrencyLevel < 0) - throw new ArgumentOutOfRangeException(nameof(maxConcurrencyLevel)); - - // Initialize only those fields relevant to use an underlying scheduler. We don't - // initialize the fields relevant to using our own custom threads. - _targetScheduler = targetScheduler ?? throw new ArgumentNullException("underlyingScheduler"); - _nonthreadsafeTaskQueue = new Queue(); - - // If 0, use the number of logical processors. But make sure whatever value we pick - // is not greater than the degree of parallelism allowed by the underlying scheduler. - _concurrencyLevel = maxConcurrencyLevel != 0 ? maxConcurrencyLevel : Environment.ProcessorCount; - if (targetScheduler.MaximumConcurrencyLevel > 0 && - targetScheduler.MaximumConcurrencyLevel < _concurrencyLevel) - { - _concurrencyLevel = targetScheduler.MaximumConcurrencyLevel; - } - } + private readonly TaskFactory _factory; /// Initializes the scheduler. /// The number of threads to create and use for processing work items. @@ -158,7 +69,7 @@ public QueuedTaskScheduler( _threads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { - _threads[i] = new Thread(() => ThreadBasedDispatchLoop(threadInit, threadFinally), threadMaxStackSize) + _threads[i] = new Thread(() => DispatchLoop(threadInit, threadFinally), threadMaxStackSize) { Priority = threadPriority, IsBackground = !useForegroundThreads, @@ -167,15 +78,19 @@ public QueuedTaskScheduler( _threads[i].Name = threadName + " (" + i + ")"; } + _factory = new TaskFactory(this); + // Start all of the threads foreach (var thread in _threads) thread.Start(); } + public TaskFactory Factory => _factory; + /// The dispatch loop run by all threads in this scheduler. /// An initialization routine to run when the thread begins. /// A finalization routine to run before the thread ends. - private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) + private void DispatchLoop(Action threadInit, Action threadFinally) { s_taskProcessingThread.Value = true; threadInit?.Invoke(); @@ -197,22 +112,7 @@ private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) // Run it. if (task != null) { - TryExecuteTask(task); - } - // If the task is null, that means it's just a placeholder for a task - // queued to one of the subschedulers. Find the next task based on - // priority and fairness and run it. - else - { - // Find the next task based on our ordering rules... - Task targetTask; - QueuedTaskSchedulerQueue queueForTargetTask; - lock (_queueGroups) - FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); - - // ... and if we found one, run it - if (targetTask != null) - queueForTargetTask.ExecuteTask(targetTask); + bool tried = TryExecuteTask(task); } } } @@ -238,197 +138,34 @@ private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) } } - /// Gets the number of queues currently activated. - private int DebugQueueCount - { - get - { - int count = 0; - foreach (var group in _queueGroups) - count += group.Value.Count; - return count; - } - } - - /// Gets the number of tasks currently scheduled. - private int DebugTaskCount => (_targetScheduler != null ? - (IEnumerable)_nonthreadsafeTaskQueue : (IEnumerable)_blockingTaskQueue) - .Where(t => t != null).Count(); - - /// Find the next task that should be executed, based on priorities and fairness and the like. - /// The found task, or null if none was found. - /// - /// The scheduler associated with the found task. Due to security checks inside of TPL, - /// this scheduler needs to be used to execute that task. - /// - private void FindNextTask_NeedsLock(out Task targetTask, out QueuedTaskSchedulerQueue queueForTargetTask) - { - targetTask = null; - queueForTargetTask = null; - - // Look through each of our queue groups in sorted order. - // This ordering is based on the priority of the queues. - foreach (var queueGroup in _queueGroups) - { - var queues = queueGroup.Value; - - // Within each group, iterate through the queues in a round-robin - // fashion. Every time we iterate again and successfully find a task, - // we'll start in the next location in the group. - foreach (int i in queues.CreateSearchOrder()) - { - queueForTargetTask = queues[i]; - var items = queueForTargetTask._workItems; - if (items.Count > 0) - { - targetTask = items.Dequeue(); - if (queueForTargetTask._disposed && items.Count == 0) - { - RemoveQueue_NeedsLock(queueForTargetTask); - } - queues.NextQueueIndex = (queues.NextQueueIndex + 1) % queueGroup.Value.Count; - return; - } - } - } - } - /// Queues a task to the scheduler. /// The task to be queued. protected override void QueueTask(Task task) { // If we've been disposed, no one should be queueing if (_disposeCancellation.IsCancellationRequested) - throw new ObjectDisposedException(GetType().Name); - - // If the target scheduler is null (meaning we're using our own threads), - // add the task to the blocking queue - if (_targetScheduler == null) { - _blockingTaskQueue.Add(task); - } - // Otherwise, add the task to the non-blocking queue, - // and if there isn't already an executing processing task, - // start one up - else - { - // Queue the task and check whether we should launch a processing - // task (noting it if we do, so that other threads don't result - // in queueing up too many). - bool launchTask = false; - lock (_nonthreadsafeTaskQueue) - { - _nonthreadsafeTaskQueue.Enqueue(task); - if (_delegatesQueuedOrRunning < _concurrencyLevel) - { - ++_delegatesQueuedOrRunning; - launchTask = true; - } - } - - // If necessary, start processing asynchronously - if (launchTask) - { - Task.Factory.StartNew(ProcessPrioritizedAndBatchedTasks, - CancellationToken.None, TaskCreationOptions.None, _targetScheduler); - } - } - } - - /// - /// Process tasks one at a time in the best order. - /// This should be run in a Task generated by QueueTask. - /// It's been separated out into its own method to show up better in Parallel Tasks. - /// - private void ProcessPrioritizedAndBatchedTasks() - { - bool continueProcessing = true; - while (!_disposeCancellation.IsCancellationRequested && continueProcessing) - { - try - { - // Note that we're processing tasks on this thread - s_taskProcessingThread.Value = true; - - // Until there are no more tasks to process - while (!_disposeCancellation.IsCancellationRequested) - { - // Try to get the next task. If there aren't any more, we're done. - Task targetTask; - lock (_nonthreadsafeTaskQueue) - { - if (_nonthreadsafeTaskQueue.Count == 0) - break; - targetTask = _nonthreadsafeTaskQueue.Dequeue(); - } - - // If the task is null, it's a placeholder for a task in the round-robin queues. - // Find the next one that should be processed. - QueuedTaskSchedulerQueue queueForTargetTask = null; - if (targetTask == null) - { - lock (_queueGroups) - FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); - } - - // Now if we finally have a task, run it. If the task - // was associated with one of the round-robin schedulers, we need to use it - // as a thunk to execute its task. - if (targetTask != null) - { - if (queueForTargetTask != null) - queueForTargetTask.ExecuteTask(targetTask); - else - TryExecuteTask(targetTask); - } - } - } - finally - { - // Now that we think we're done, verify that there really is - // no more work to do. If there's not, highlight - // that we're now less parallel than we were a moment ago. - lock (_nonthreadsafeTaskQueue) - { - if (_nonthreadsafeTaskQueue.Count == 0) - { - _delegatesQueuedOrRunning--; - continueProcessing = false; - s_taskProcessingThread.Value = false; - } - } - } + throw new ObjectDisposedException(GetType().Name); } + _blockingTaskQueue.Add(task); } - /// Notifies the pool that there's a new item to be executed in one of the round-robin queues. - private void NotifyNewWorkItem() => QueueTask(null); - /// Tries to execute a task synchronously on the current thread. /// The task to execute. /// Whether the task was previously queued. /// true if the task was executed; otherwise, false. protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => // If we're already running tasks on this threads, enable inlining - s_taskProcessingThread.Value && TryExecuteTask(task); + false; // s_taskProcessingThread.Value && TryExecuteTask(task); /// Gets the tasks scheduled to this scheduler. /// An enumerable of all tasks queued to this scheduler. /// This does not include the tasks on sub-schedulers. Those will be retrieved by the debugger separately. protected override IEnumerable GetScheduledTasks() { - // If we're running on our own threads, get the tasks from the blocking queue... - if (_targetScheduler == null) - { - // Get all of the tasks, filtering out nulls, which are just placeholders - // for tasks in other sub-schedulers - return _blockingTaskQueue.Where(t => t != null).ToList(); - } - // otherwise get them from the non-blocking queue... - else - { - return _nonthreadsafeTaskQueue.Where(t => t != null).ToList(); - } + // Get all of the tasks, filtering out nulls, which are just placeholders + // for tasks in other sub-schedulers + return _blockingTaskQueue.Where(t => t != null).ToList(); } /// Gets the maximum concurrency level to use when processing tasks. @@ -436,168 +173,5 @@ protected override IEnumerable GetScheduledTasks() /// Initiates shutdown of the scheduler. public void Dispose() => _disposeCancellation.Cancel(); - - /// Creates and activates a new scheduling queue for this scheduler. - /// The newly created and activated queue at priority 0. - public TaskScheduler ActivateNewQueue() => ActivateNewQueue(0); - - /// Creates and activates a new scheduling queue for this scheduler. - /// The priority level for the new queue. - /// The newly created and activated queue at the specified priority. - public TaskScheduler ActivateNewQueue(int priority) - { - // Create the queue - var createdQueue = new QueuedTaskSchedulerQueue(priority, this); - - // Add the queue to the appropriate queue group based on priority - lock (_queueGroups) - { - if (!_queueGroups.TryGetValue(priority, out QueueGroup list)) - { - list = new QueueGroup(); - _queueGroups.Add(priority, list); - } - list.Add(createdQueue); - } - - // Hand the new queue back - return createdQueue; - } - - /// Removes a scheduler from the group. - /// The scheduler to be removed. - private void RemoveQueue_NeedsLock(QueuedTaskSchedulerQueue queue) - { - // Find the group that contains the queue and the queue's index within the group - var queueGroup = _queueGroups[queue._priority]; - int index = queueGroup.IndexOf(queue); - - // We're about to remove the queue, so adjust the index of the next - // round-robin starting location if it'll be affected by the removal - if (queueGroup.NextQueueIndex >= index) - queueGroup.NextQueueIndex--; - - // Remove it - queueGroup.RemoveAt(index); - } - - /// A group of queues a the same priority level. - private class QueueGroup : List - { - /// The starting index for the next round-robin traversal. - public int NextQueueIndex = 0; - - /// Creates a search order through this group. - /// An enumerable of indices for this group. - public IEnumerable CreateSearchOrder() - { - for (int i = NextQueueIndex; i < Count; i++) - yield return i; - for (int i = 0; i < NextQueueIndex; i++) - yield return i; - } - } - - /// Provides a scheduling queue associatd with a QueuedTaskScheduler. - [DebuggerDisplay("QueuePriority = {_priority}, WaitingTasks = {WaitingTasks}")] - [DebuggerTypeProxy(typeof(QueuedTaskSchedulerQueueDebugView))] - private sealed class QueuedTaskSchedulerQueue : TaskScheduler, IDisposable - { - /// A debug view for the queue. - private sealed class QueuedTaskSchedulerQueueDebugView - { - /// The queue. - private readonly QueuedTaskSchedulerQueue _queue; - - /// Initializes the debug view. - /// The queue to be debugged. - public QueuedTaskSchedulerQueueDebugView(QueuedTaskSchedulerQueue queue) => - _queue = queue ?? throw new ArgumentNullException(nameof(queue)); - - /// Gets the priority of this queue in its associated scheduler. - public int Priority => _queue._priority; - /// Gets the ID of this scheduler. - public int Id => _queue.Id; - /// Gets all of the tasks scheduled to this queue. - public IEnumerable ScheduledTasks => _queue.GetScheduledTasks(); - /// Gets the QueuedTaskScheduler with which this queue is associated. - public QueuedTaskScheduler AssociatedScheduler => _queue._pool; - } - - /// The scheduler with which this pool is associated. - private readonly QueuedTaskScheduler _pool; - /// The work items stored in this queue. - internal readonly Queue _workItems; - /// Whether this queue has been disposed. - internal bool _disposed; - /// Gets the priority for this queue. - internal int _priority; - - /// Initializes the queue. - /// The priority associated with this queue. - /// The scheduler with which this queue is associated. - internal QueuedTaskSchedulerQueue(int priority, QueuedTaskScheduler pool) - { - _priority = priority; - _pool = pool; - _workItems = new Queue(); - } - - /// Gets the number of tasks waiting in this scheduler. - internal int WaitingTasks => _workItems.Count; - - /// Gets the tasks scheduled to this scheduler. - /// An enumerable of all tasks queued to this scheduler. - protected override IEnumerable GetScheduledTasks() => _workItems.ToList(); - - /// Queues a task to the scheduler. - /// The task to be queued. - protected override void QueueTask(Task task) - { - if (_disposed) - throw new ObjectDisposedException(GetType().Name); - - // Queue up the task locally to this queue, and then notify - // the parent scheduler that there's work available - lock (_pool._queueGroups) - _workItems.Enqueue(task); - _pool.NotifyNewWorkItem(); - } - - /// Tries to execute a task synchronously on the current thread. - /// The task to execute. - /// Whether the task was previously queued. - /// true if the task was executed; otherwise, false. - protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => - // If we're using our own threads and if this is being called from one of them, - // or if we're currently processing another task on this thread, try running it inline. - s_taskProcessingThread.Value && TryExecuteTask(task); - - /// Runs the specified ask. - /// The task to execute. - internal void ExecuteTask(Task task) => TryExecuteTask(task); - - /// Gets the maximum concurrency level to use when processing tasks. - public override int MaximumConcurrencyLevel => _pool.MaximumConcurrencyLevel; - - /// Signals that the queue should be removed from the scheduler as soon as the queue is empty. - public void Dispose() - { - if (!_disposed) - { - lock (_pool._queueGroups) - { - // We only remove the queue if it's empty. If it's not empty, - // we still mark it as disposed, and the associated QueuedTaskScheduler - // will remove the queue when its count hits 0 and its _disposed is true. - if (_workItems.Count == 0) - { - _pool.RemoveQueue_NeedsLock(this); - } - } - _disposed = true; - } - } - } } }