Browse Source

build Parallel class for portable

pull/197/head
Thomas Ibel 12 years ago
parent
commit
8eafbcb88c
  1. 133
      src/Numerics/Compatibility.cs
  2. 121
      src/Numerics/Threading/CommonParallel.cs

133
src/Numerics/Compatibility.cs

@ -2,6 +2,10 @@
namespace MathNet.Numerics
{
using System;
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct)]
public class SerializableAttribute : Attribute
@ -12,6 +16,135 @@ namespace MathNet.Numerics
public class SpecialNameAttribute : Attribute
{
}
internal static class Partitioner
{
public static IEnumerable<Tuple<int, int>> Create(int fromInclusive, int toExclusive)
{
var rangeSize = Math.Max(1, (toExclusive - fromInclusive) / Control.NumberOfParallelWorkerThreads);
return Create(fromInclusive, toExclusive, rangeSize);
}
public static IEnumerable<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize)
{
if (toExclusive <= fromInclusive)
{
throw new ArgumentOutOfRangeException("toExclusive");
}
if (rangeSize <= 0)
{
throw new ArgumentOutOfRangeException("rangeSize");
}
return CreateRanges(fromInclusive, toExclusive, rangeSize);
}
private static IEnumerable<Tuple<int, int>> CreateRanges(int fromInclusive, int toExclusive, int rangeSize)
{
bool flag = false;
int num = fromInclusive;
while (num < toExclusive && !flag)
{
int item = num;
int num2;
try
{
num2 = checked(num + rangeSize);
}
catch (OverflowException)
{
num2 = toExclusive;
flag = true;
}
if (num2 > toExclusive)
{
num2 = toExclusive;
}
yield return new Tuple<int, int>(item, num2);
num += rangeSize;
}
}
}
internal class ParallelOptions
{
public TaskScheduler TaskScheduler { get; set; }
public int MaxDegreeOfParallelism { get; set; }
public CancellationToken CancellationToken { get; set; }
public ParallelOptions()
{
TaskScheduler = TaskScheduler.Default;
MaxDegreeOfParallelism = -1;
CancellationToken = CancellationToken.None;
}
}
internal class ParallelLoopState
{
}
internal static class Parallel
{
public static void ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body)
{
var chunks = source.ToArray();
var tasks = new Task[chunks.Length];
for (var i = 0; i < tasks.Length; i++)
{
var chunk = chunks[i];
tasks[i] = Task.Factory.StartNew(() => body(chunk), parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler);
}
Task.WaitAll(tasks, parallelOptions.CancellationToken);
}
public static void Invoke(ParallelOptions parallelOptions, params Action[] actions)
{
var tasks = new Task[actions.Length];
for (var i = 0; i < tasks.Length; i++)
{
var action = actions[i];
if (action == null)
{
throw new ArgumentException(String.Format(Properties.Resources.ArgumentItemNull, "actions"), "actions");
}
tasks[i] = Task.Factory.StartNew(action, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler);
}
Task.WaitAll(tasks);
}
public static void ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
ParallelOptions parallelOptions,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
var chunks = source.ToArray();
var tasks = new Task[chunks.Length];
var loopState = new ParallelLoopState();
for (var i = 0; i < tasks.Length; i++)
{
var chunk = chunks[i];
tasks[i] = Task.Factory.StartNew(() =>
{
var local = localInit();
local = body(chunk, loopState, local);
localFinally(local);
return local;
}, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler);
}
Task.WaitAll(tasks, parallelOptions.CancellationToken);
}
}
}
#endif

121
src/Numerics/Threading/CommonParallel.cs

@ -38,10 +38,7 @@ namespace MathNet.Numerics.Threading
using Partitioner = MathNet.Numerics.Partitioner;
#endif
#if PORTABLE
using System.Linq;
using Properties;
#else
#if !PORTABLE
using System.Collections.Concurrent;
#endif
@ -92,30 +89,11 @@ namespace MathNet.Numerics.Threading
return;
}
#if PORTABLE
var tasks = new Task[Math.Min(maxDegreeOfParallelism, length/rangeSize)];
rangeSize = (toExclusive - fromInclusive)/tasks.Length;
// partition the jobs into separate sets for each but the last worked thread
for (var i = 0; i < tasks.Length - 1; i++)
{
var start = fromInclusive + (i*rangeSize);
var stop = fromInclusive + ((i + 1)*rangeSize);
tasks[i] = Task.Factory.StartNew(() => body(start, stop));
}
// add another set for last worker thread
tasks[tasks.Length - 1] =
Task.Factory.StartNew(() => body(fromInclusive + ((tasks.Length - 1)*rangeSize), toExclusive));
Task.WaitAll(tasks);
#else
// Common case
Parallel.ForEach(
Partitioner.Create(fromInclusive, toExclusive, rangeSize),
new ParallelOptions {MaxDegreeOfParallelism = maxDegreeOfParallelism},
(range, loopState) => body(range.Item1, range.Item2));
#endif
range => body(range.Item1, range.Item2));
}
/// <summary>
@ -150,27 +128,12 @@ namespace MathNet.Numerics.Threading
}
// Common case
#if PORTABLE
var tasks = new Task[actions.Length];
for (var i = 0; i < tasks.Length; i++)
{
Action action = actions[i];
if (action == null)
{
throw new ArgumentException(String.Format(Resources.ArgumentItemNull, "actions"), "actions");
}
tasks[i] = Task.Factory.StartNew(action);
}
Task.WaitAll(tasks);
#else
Parallel.Invoke(
new ParallelOptions
{
MaxDegreeOfParallelism = Control.NumberOfParallelWorkerThreads
},
actions);
#endif
}
/// <summary>
@ -215,43 +178,7 @@ namespace MathNet.Numerics.Threading
return reduce(mapped);
}
#if PORTABLE
var tasks = new Task<T>[Control.NumberOfParallelWorkerThreads];
var size = (toExclusive - fromInclusive) / tasks.Length;
// partition the jobs into separate sets for each but the last worked thread
for (var i = 0; i < tasks.Length - 1; i++)
{
var start = fromInclusive + (i * size);
var stop = fromInclusive + ((i + 1) * size);
tasks[i] = Task.Factory.StartNew(() =>
{
var mapped = new T[stop - start];
for (int k = 0; k < mapped.Length; k++)
{
mapped[k] = select(k + start);
}
return reduce(mapped);
});
}
// add another set for last worker thread
tasks[tasks.Length - 1] = Task.Factory.StartNew(() =>
{
var start = fromInclusive + ((tasks.Length - 1) * size);
var mapped = new T[toExclusive - start];
for (int k = 0; k < mapped.Length; k++)
{
mapped[k] = select(k + start);
}
return reduce(mapped);
});
return Task.Factory
.ContinueWhenAll(tasks, tsk => reduce(tsk.Select(t => t.Result).ToArray()))
.Result;
#else
// Common case
var intermediateResults = new List<T>();
var syncLock = new object();
var maxThreads = Control.DisableParallelization ? 1 : Control.NumberOfParallelWorkerThreads;
@ -277,7 +204,6 @@ namespace MathNet.Numerics.Threading
}
});
return reduce(intermediateResults.ToArray());
#endif
}
/// <summary>
@ -321,43 +247,7 @@ namespace MathNet.Numerics.Threading
return reduce(mapped);
}
#if PORTABLE
var tasks = new Task<TOut>[Control.NumberOfParallelWorkerThreads];
var size = array.Length / tasks.Length;
// partition the jobs into separate sets for each but the last worked thread
for (var i = 0; i < tasks.Length - 1; i++)
{
var start = (i * size);
var stop = ((i + 1) * size);
tasks[i] = Task.Factory.StartNew(() =>
{
var mapped = new TOut[stop - start];
for (int k = 0; k < mapped.Length; k++)
{
mapped[k] = select(k + start, array[k + start]);
}
return reduce(mapped);
});
}
// add another set for last worker thread
tasks[tasks.Length - 1] = Task.Factory.StartNew(() =>
{
var start = ((tasks.Length - 1) * size);
var mapped = new TOut[array.Length - start];
for (int k = 0; k < mapped.Length; k++)
{
mapped[k] = select(k + start, array[k + start]);
}
return reduce(mapped);
});
return Task.Factory
.ContinueWhenAll(tasks, tsk => reduce(tsk.Select(t => t.Result).ToArray()))
.Result;
#else
// Common case
var intermediateResults = new List<TOut>();
var syncLock = new object();
var maxThreads = Control.DisableParallelization ? 1 : Control.NumberOfParallelWorkerThreads;
@ -383,7 +273,6 @@ namespace MathNet.Numerics.Threading
}
});
return reduce(intermediateResults.ToArray());
#endif
}
/// <summary>

Loading…
Cancel
Save