// // Math.NET Numerics, part of the Math.NET Project // http://mathnet.opensourcedotnet.info // // Copyright (c) 2009 Math.NET // // Permission is hereby granted, free of charge, to any person // obtaining a copy of this software and associated documentation // files (the "Software"), to deal in the Software without // restriction, including without limitation the rights to use, // copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following // conditions: // // The above copyright notice and this permission notice shall be // included in all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES // OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, // WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR // OTHER DEALINGS IN THE SOFTWARE. // namespace MathNet.Numerics.Threading { using System; using System.Collections.Generic; using System.Threading; /// /// Internal Parallel Thread Queue. /// internal static class ThreadQueue { /// /// Sync Object for the thread queue state. /// private static readonly object _stateSync = new object(); /// /// Sync Object for queue access (to be sure it's used by us only). /// private static readonly object _queueSync = new object(); /// /// Queue holding the pending jobs. /// private static readonly Queue _queue = new Queue(); /// /// Maximum number of jobs that can be in the queue at the same time. /// private const int MaximumQueueLength = 1024; /// /// Counting Semaphore to make the worker thread wait for jobs /// private static Semaphore _tasksAvailableSemaphore; /// /// Running flag, used to signal worker threads to stop cleanly. /// private static bool _running = true; /// /// Worker threads /// private static Thread[] _threads; /// /// Gets the number of worker threads. /// internal static int ThreadCount { get; private set; } /// /// Initializes static members of the ThreadQueue class. /// static ThreadQueue() { // TODO: Control.ThreadCount instead of Environment.ProcessorCount Start(Environment.ProcessorCount); } /// /// Add a job to the queue. /// /// The job to run. internal static void Enqueue(Task task) { if (!_running) { Start(); } lock (_queueSync) { _queue.Enqueue(task); } _tasksAvailableSemaphore.Release(); } /// /// Add a set of jobs to the queue. /// /// The jobs to run. internal static void Enqueue(IList tasks) { if (!_running) { Start(); } lock (_queueSync) { foreach (var task in tasks) { _queue.Enqueue(task); } } _tasksAvailableSemaphore.Release(tasks.Count); } /// /// Worker Thread Program /// private static void WorkerThreadStart() { while (_running) { // Wait until a job is available, or we should shut down _tasksAvailableSemaphore.WaitOne(); // Check whether we should shut down if (!_running) { _tasksAvailableSemaphore.Release(); break; } // Get the job... Task task = null; lock (_queueSync) { if (_queue.Count > 0) { task = _queue.Dequeue(); } } // ...and run it if (task != null) { task.Compute(); task.Set(); } } } /// /// Start or restart the queue with the specified number of worker threads. /// /// Number of worker threads. internal static void Start(int numberOfThreads) { lock (_stateSync) { if (_threads != null) { if (_threads.Length == numberOfThreads) { return; } Shutdown(); } ThreadCount = numberOfThreads; Start(); } } /// /// Start the thread queue, if it is not already running. /// internal static void Start() { lock (_stateSync) { if (_threads != null) { return; } _tasksAvailableSemaphore = new Semaphore(_queue.Count, MaximumQueueLength); _running = true; _threads = new Thread[ThreadCount]; for (var i = 0; i < _threads.Length; i++) { _threads[i] = new Thread(WorkerThreadStart) { IsBackground = true }; _threads[i].Start(); } } } /// /// Stop the thread queue, if it is running. /// internal static void Shutdown() { lock (_stateSync) { if (_threads == null) { return; } // try to stop the worker threads cleanly _running = false; _tasksAvailableSemaphore.Release(); // wait until all threads have stopped foreach (var thread in _threads) { thread.Join(); } _tasksAvailableSemaphore.Close(); _tasksAvailableSemaphore = null; _threads = null; } } } }