From 8eafbcb88c47bbf01a63b7bbf49fc85bee5e4de7 Mon Sep 17 00:00:00 2001 From: Thomas Ibel Date: Mon, 24 Feb 2014 21:40:40 +0100 Subject: [PATCH] build Parallel class for portable --- src/Numerics/Compatibility.cs | 133 +++++++++++++++++++++++ src/Numerics/Threading/CommonParallel.cs | 121 +-------------------- 2 files changed, 138 insertions(+), 116 deletions(-) diff --git a/src/Numerics/Compatibility.cs b/src/Numerics/Compatibility.cs index 5a8ef9e1..c18e5b0b 100644 --- a/src/Numerics/Compatibility.cs +++ b/src/Numerics/Compatibility.cs @@ -2,6 +2,10 @@ namespace MathNet.Numerics { using System; + using System.Threading; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; [AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct)] public class SerializableAttribute : Attribute @@ -12,6 +16,135 @@ namespace MathNet.Numerics public class SpecialNameAttribute : Attribute { } + + internal static class Partitioner + { + public static IEnumerable> Create(int fromInclusive, int toExclusive) + { + var rangeSize = Math.Max(1, (toExclusive - fromInclusive) / Control.NumberOfParallelWorkerThreads); + return Create(fromInclusive, toExclusive, rangeSize); + } + + public static IEnumerable> Create(int fromInclusive, int toExclusive, int rangeSize) + { + if (toExclusive <= fromInclusive) + { + throw new ArgumentOutOfRangeException("toExclusive"); + } + if (rangeSize <= 0) + { + throw new ArgumentOutOfRangeException("rangeSize"); + } + + return CreateRanges(fromInclusive, toExclusive, rangeSize); + } + + private static IEnumerable> CreateRanges(int fromInclusive, int toExclusive, int rangeSize) + { + bool flag = false; + int num = fromInclusive; + while (num < toExclusive && !flag) + { + int item = num; + int num2; + try + { + num2 = checked(num + rangeSize); + } + catch (OverflowException) + { + num2 = toExclusive; + flag = true; + } + if (num2 > toExclusive) + { + num2 = toExclusive; + } + yield return new Tuple(item, num2); + num += rangeSize; + } + } + } + + internal class ParallelOptions + { + public TaskScheduler TaskScheduler { get; set; } + public int MaxDegreeOfParallelism { get; set; } + public CancellationToken CancellationToken { get; set; } + + public ParallelOptions() + { + TaskScheduler = TaskScheduler.Default; + MaxDegreeOfParallelism = -1; + CancellationToken = CancellationToken.None; + } + } + + internal class ParallelLoopState + { + } + + internal static class Parallel + { + public static void ForEach(IEnumerable source, ParallelOptions parallelOptions, Action body) + { + var chunks = source.ToArray(); + var tasks = new Task[chunks.Length]; + + for (var i = 0; i < tasks.Length; i++) + { + var chunk = chunks[i]; + tasks[i] = Task.Factory.StartNew(() => body(chunk), parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler); + } + + Task.WaitAll(tasks, parallelOptions.CancellationToken); + } + + public static void Invoke(ParallelOptions parallelOptions, params Action[] actions) + { + var tasks = new Task[actions.Length]; + + for (var i = 0; i < tasks.Length; i++) + { + var action = actions[i]; + if (action == null) + { + throw new ArgumentException(String.Format(Properties.Resources.ArgumentItemNull, "actions"), "actions"); + } + + tasks[i] = Task.Factory.StartNew(action, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler); + } + + Task.WaitAll(tasks); + } + + public static void ForEach( + IEnumerable source, + ParallelOptions parallelOptions, + Func localInit, + Func body, + Action localFinally) + { + var chunks = source.ToArray(); + var tasks = new Task[chunks.Length]; + var loopState = new ParallelLoopState(); + + for (var i = 0; i < tasks.Length; i++) + { + var chunk = chunks[i]; + + tasks[i] = Task.Factory.StartNew(() => + { + var local = localInit(); + local = body(chunk, loopState, local); + localFinally(local); + return local; + }, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler); + } + + Task.WaitAll(tasks, parallelOptions.CancellationToken); + } + } } #endif diff --git a/src/Numerics/Threading/CommonParallel.cs b/src/Numerics/Threading/CommonParallel.cs index b7ccd152..50cf31aa 100644 --- a/src/Numerics/Threading/CommonParallel.cs +++ b/src/Numerics/Threading/CommonParallel.cs @@ -38,10 +38,7 @@ namespace MathNet.Numerics.Threading using Partitioner = MathNet.Numerics.Partitioner; #endif -#if PORTABLE - using System.Linq; - using Properties; -#else +#if !PORTABLE using System.Collections.Concurrent; #endif @@ -92,30 +89,11 @@ namespace MathNet.Numerics.Threading return; } -#if PORTABLE - var tasks = new Task[Math.Min(maxDegreeOfParallelism, length/rangeSize)]; - rangeSize = (toExclusive - fromInclusive)/tasks.Length; - - // partition the jobs into separate sets for each but the last worked thread - for (var i = 0; i < tasks.Length - 1; i++) - { - var start = fromInclusive + (i*rangeSize); - var stop = fromInclusive + ((i + 1)*rangeSize); - - tasks[i] = Task.Factory.StartNew(() => body(start, stop)); - } - - // add another set for last worker thread - tasks[tasks.Length - 1] = - Task.Factory.StartNew(() => body(fromInclusive + ((tasks.Length - 1)*rangeSize), toExclusive)); - - Task.WaitAll(tasks); -#else + // Common case Parallel.ForEach( Partitioner.Create(fromInclusive, toExclusive, rangeSize), new ParallelOptions {MaxDegreeOfParallelism = maxDegreeOfParallelism}, - (range, loopState) => body(range.Item1, range.Item2)); -#endif + range => body(range.Item1, range.Item2)); } /// @@ -150,27 +128,12 @@ namespace MathNet.Numerics.Threading } // Common case -#if PORTABLE - var tasks = new Task[actions.Length]; - for (var i = 0; i < tasks.Length; i++) - { - Action action = actions[i]; - if (action == null) - { - throw new ArgumentException(String.Format(Resources.ArgumentItemNull, "actions"), "actions"); - } - - tasks[i] = Task.Factory.StartNew(action); - } - Task.WaitAll(tasks); -#else Parallel.Invoke( new ParallelOptions { MaxDegreeOfParallelism = Control.NumberOfParallelWorkerThreads }, actions); -#endif } /// @@ -215,43 +178,7 @@ namespace MathNet.Numerics.Threading return reduce(mapped); } -#if PORTABLE - var tasks = new Task[Control.NumberOfParallelWorkerThreads]; - var size = (toExclusive - fromInclusive) / tasks.Length; - - // partition the jobs into separate sets for each but the last worked thread - for (var i = 0; i < tasks.Length - 1; i++) - { - var start = fromInclusive + (i * size); - var stop = fromInclusive + ((i + 1) * size); - - tasks[i] = Task.Factory.StartNew(() => - { - var mapped = new T[stop - start]; - for (int k = 0; k < mapped.Length; k++) - { - mapped[k] = select(k + start); - } - return reduce(mapped); - }); - } - - // add another set for last worker thread - tasks[tasks.Length - 1] = Task.Factory.StartNew(() => - { - var start = fromInclusive + ((tasks.Length - 1) * size); - var mapped = new T[toExclusive - start]; - for (int k = 0; k < mapped.Length; k++) - { - mapped[k] = select(k + start); - } - return reduce(mapped); - }); - - return Task.Factory - .ContinueWhenAll(tasks, tsk => reduce(tsk.Select(t => t.Result).ToArray())) - .Result; -#else + // Common case var intermediateResults = new List(); var syncLock = new object(); var maxThreads = Control.DisableParallelization ? 1 : Control.NumberOfParallelWorkerThreads; @@ -277,7 +204,6 @@ namespace MathNet.Numerics.Threading } }); return reduce(intermediateResults.ToArray()); -#endif } /// @@ -321,43 +247,7 @@ namespace MathNet.Numerics.Threading return reduce(mapped); } -#if PORTABLE - var tasks = new Task[Control.NumberOfParallelWorkerThreads]; - var size = array.Length / tasks.Length; - - // partition the jobs into separate sets for each but the last worked thread - for (var i = 0; i < tasks.Length - 1; i++) - { - var start = (i * size); - var stop = ((i + 1) * size); - - tasks[i] = Task.Factory.StartNew(() => - { - var mapped = new TOut[stop - start]; - for (int k = 0; k < mapped.Length; k++) - { - mapped[k] = select(k + start, array[k + start]); - } - return reduce(mapped); - }); - } - - // add another set for last worker thread - tasks[tasks.Length - 1] = Task.Factory.StartNew(() => - { - var start = ((tasks.Length - 1) * size); - var mapped = new TOut[array.Length - start]; - for (int k = 0; k < mapped.Length; k++) - { - mapped[k] = select(k + start, array[k + start]); - } - return reduce(mapped); - }); - - return Task.Factory - .ContinueWhenAll(tasks, tsk => reduce(tsk.Select(t => t.Result).ToArray())) - .Result; -#else + // Common case var intermediateResults = new List(); var syncLock = new object(); var maxThreads = Control.DisableParallelization ? 1 : Control.NumberOfParallelWorkerThreads; @@ -383,7 +273,6 @@ namespace MathNet.Numerics.Threading } }); return reduce(intermediateResults.ToArray()); -#endif } ///