//
// Copyright (c) James Jackson-South and contributors.
// Licensed under the Apache License, Version 2.0.
//
namespace ImageProcessorCore
{
using System;
using System.Threading;
using System.Threading.Tasks;
///
/// Allows the application of processors using parallel processing.
///
public abstract class ParallelImageProcessor : IImageProcessor
{
///
public event ProgressEventHandler OnProgress;
///
/// Gets or sets the count of workers to run the process in parallel.
///
public virtual int Parallelism { get; set; } = Environment.ProcessorCount * 2;
///
/// The number of rows processed by a derived class.
///
private int numRowsProcessed;
///
/// The total number of rows that will be processed by a derived class.
///
private int totalRows;
///
public void Apply(ImageBase target, ImageBase source, Rectangle sourceRectangle)
{
try
{
this.OnApply(source, target, target.Bounds, sourceRectangle);
this.numRowsProcessed = 0;
this.totalRows = sourceRectangle.Height;
if (this.Parallelism > 1)
{
int partitionCount = this.Parallelism;
Task[] tasks = new Task[partitionCount];
for (int p = 0; p < partitionCount; p++)
{
int current = p;
tasks[p] = Task.Run(
() =>
{
int batchSize = sourceRectangle.Height / partitionCount;
int yStart = sourceRectangle.Y + (current * batchSize);
int yEnd = current == partitionCount - 1
? sourceRectangle.Bottom
: yStart + batchSize;
this.Apply(target, source, target.Bounds, sourceRectangle, yStart, yEnd);
});
}
Task.WaitAll(tasks);
}
else
{
this.Apply(target, source, target.Bounds, sourceRectangle, sourceRectangle.Y, sourceRectangle.Bottom);
}
this.AfterApply(source, target, target.Bounds, sourceRectangle);
}
catch (Exception ex)
{
throw new ImageProcessingException($"An error occured when processing the image using {this.GetType().Name}. See the inner exception for more detail.", ex);
}
}
///
public void Apply(ImageBase target, ImageBase source, int width, int height, Rectangle targetRectangle = default(Rectangle), Rectangle sourceRectangle = default(Rectangle))
{
try
{
float[] pixels = new float[width * height * 4];
target.SetPixels(width, height, pixels);
if (sourceRectangle == Rectangle.Empty)
{
sourceRectangle = source.Bounds;
}
this.OnApply(source, target, target.Bounds, sourceRectangle);
targetRectangle = target.Bounds;
this.numRowsProcessed = 0;
this.totalRows = targetRectangle.Bottom;
if (this.Parallelism > 1)
{
int partitionCount = this.Parallelism;
Task[] tasks = new Task[partitionCount];
for (int p = 0; p < partitionCount; p++)
{
int current = p;
tasks[p] = Task.Run(() =>
{
int batchSize = targetRectangle.Bottom / partitionCount;
int yStart = current * batchSize;
int yEnd = current == partitionCount - 1 ? targetRectangle.Bottom : yStart + batchSize;
this.Apply(target, source, targetRectangle, sourceRectangle, yStart, yEnd);
});
}
Task.WaitAll(tasks);
}
else
{
this.Apply(target, source, targetRectangle, sourceRectangle, targetRectangle.Y, targetRectangle.Bottom);
}
this.AfterApply(source, target, target.Bounds, sourceRectangle);
}
catch (Exception ex)
{
throw new ImageProcessingException($"An error occured when processing the image using {this.GetType().Name}. See the inner exception for more detail.", ex);
}
}
///
/// This method is called before the process is applied to prepare the processor.
///
/// The source image. Cannot be null.
/// Target image to apply the process to.
///
/// The structure that specifies the location and size of the drawn image.
/// The image is scaled to fit the rectangle.
///
///
/// The structure that specifies the portion of the image object to draw.
///
protected virtual void OnApply(ImageBase source, ImageBase target, Rectangle targetRectangle, Rectangle sourceRectangle)
{
}
///
/// Applies the process to the specified portion of the specified at the specified location
/// and with the specified size.
///
/// Target image to apply the process to.
/// The source image. Cannot be null.
///
/// The structure that specifies the location and size of the drawn image.
/// The image is scaled to fit the rectangle.
///
///
/// The structure that specifies the portion of the image object to draw.
///
/// The index of the row within the source image to start processing.
/// The index of the row within the source image to end processing.
///
/// The method keeps the source image unchanged and returns the
/// the result of image process as new image.
///
protected abstract void Apply(ImageBase target, ImageBase source, Rectangle targetRectangle, Rectangle sourceRectangle, int startY, int endY);
///
/// This method is called after the process is applied to prepare the processor.
///
/// The source image. Cannot be null.
/// Target image to apply the process to.
///
/// The structure that specifies the location and size of the drawn image.
/// The image is scaled to fit the rectangle.
///
///
/// The structure that specifies the portion of the image object to draw.
///
protected virtual void AfterApply(ImageBase source, ImageBase target, Rectangle targetRectangle, Rectangle sourceRectangle)
{
}
///
/// Must be called by derived classes after processing a single row.
///
protected void OnRowProcessed()
{
if (this.OnProgress != null)
{
int currThreadNumRows = Interlocked.Add(ref this.numRowsProcessed, 1);
// Multi-pass filters process multiple times more rows than totalRows, so update totalRows on the fly
if (currThreadNumRows > this.totalRows)
{
this.totalRows = currThreadNumRows;
}
// Report progress. This may be on the client's thread, or on a Task library thread.
this.OnProgress(this, new ProgressEventArgs { RowsProcessed = currThreadNumRows, TotalRows = this.totalRows });
}
}
}
}