Chapter 22 - Parallel Programming
PLINQ
Calculating prime numbers
// Calculate prime numbers using a simple (unoptimized) algorithm.
// This calculates prime numbers between 3 and a million, using all available cores:
IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);
var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
primes.Take(100).Dump();
Calculating prime numbers - ordered
// Calculate prime numbers with ordered output.
IEnumerable<int> numbers = Enumerable.Range (3, 1000000-3);
var parallelQuery =
from n in numbers.AsParallel().AsOrdered()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
primes.Take(100).Dump();
// In this example, we could alternatively call OrderBy at the end of the query.
Parallel spell checker
string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");
if (!File.Exists (wordLookupFile)) // Contains about 150,000 words
File.WriteAllText (wordLookupFile,
await new HttpClient().GetStringAsync (
"http://www.albahari.com/ispell/allwords.txt"));
var wordLookup = new HashSet<string> (
File.ReadAllLines (wordLookupFile),
StringComparer.InvariantCultureIgnoreCase);
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; // Introduce a couple
wordsToTest [23456] = "wubsie"; // of spelling mistakes.
var query = wordsToTest
.AsParallel()
.Select ((word, index) => new IndexedWord { Word = word, Index = index })
.Where (iword => !wordLookup.Contains (iword.Word))
.OrderBy (iword => iword.Index);
query.Dump();
struct IndexedWord { public string Word; public int Index; }
Parallel spell checker - enhanced
void Main()
{
string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");
if (!File.Exists (wordLookupFile)) // Contains about 150,000 words
new WebClient().DownloadFile (
"http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
var wordLookup = new HashSet<string> (
File.ReadAllLines (wordLookupFile),
StringComparer.InvariantCultureIgnoreCase);
string[] wordList = wordLookup.ToArray();
// Here, we're using ThreadLocal to generate a thread-safe random number generator,
// so we can parallelize the building of the wordsToTest array.
var localRandom = new ThreadLocal<Random>
( () => new Random (Guid.NewGuid().GetHashCode()) );
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
.Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; // Introduce a couple
wordsToTest [23456] = "wubsie"; // of spelling mistakes.
var query = wordsToTest
.AsParallel()
.Select ((word, index) => new IndexedWord { Word=word, Index=index })
.Where (iword => !wordLookup.Contains (iword.Word))
.OrderBy (iword => iword.Index);
query.Dump();
}
struct IndexedWord { public string Word; public int Index; }
Functional purity
{
int i = 0;
(from n in Enumerable.Range(0,999).AsParallel() select n * i++).Dump ("unsafe");
}
{
Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i).Dump ("safe");
}
Changing degree of parallelism
"The Quick Brown Fox"
.AsParallel().WithDegreeOfParallelism (2)
.Where (c => !char.IsWhiteSpace (c))
.AsParallel().WithDegreeOfParallelism (3) // Forces Merge + Partition
.Select (c => char.ToUpper (c))
Cancellation
IEnumerable<int> million = Enumerable.Range (3, 1000000);
var cancelSource = new CancellationTokenSource();
var primeNumberQuery =
from n in million.AsParallel().WithCancellation (cancelSource.Token)
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
new Thread (() => {
Thread.Sleep (100); // Cancel query after
cancelSource.Cancel(); // 100 milliseconds.
}
).Start();
try
{
// Start query running:
int[] primes = primeNumberQuery.ToArray();
// We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
Console.WriteLine ("Query canceled");
}
Output-side optimization
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
Input-side optimization - forcing chunk partitioning
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (n => n % 2 == 0);
parallelQuery.Dump();
Optimizing aggregations - simple use of Aggregate
int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n); // 6
sum.Dump();
Optimizing aggregations - seed factory function (contrived)
new[] { 1, 2, 3 }.AsParallel().Aggregate (
() => 0, // seedFactory
(localTotal, n) => localTotal + n, // updateAccumulatorFunc
(mainTot, localTot) => mainTot + localTot, // combineAccumulatorFunc
finalResult => finalResult) // resultSelector
Optimizing aggregations - letter frequencies imperative
string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
};
letterFrequencies.Dump();
Optimizing aggregations - letter frequencies functional
string text = "Let’s suppose this is a really long string";
int[] result =
text.Aggregate (
new int[26], // Create the "accumulator"
(letterFrequencies, c) => // Aggregate a letter into the accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
return letterFrequencies;
});
result.Dump();
Optimizing aggregations - letter frequencies parallel
string text = "Let’s suppose this is a really long string";
int[] result =
text.AsParallel().Aggregate (
() => new int[26], // Create a new local accumulator
(localFrequencies, c) => // Aggregate into the local accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) localFrequencies [index]++;
return localFrequencies;
},
// Aggregate local->main accumulator
(mainFreq, localFreq) =>
mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
finalResult => finalResult // Perform any final transformation
); // on the end result.
result.Dump();
The Parallel Class
Parallel.Invoke
Parallel.Invoke (
() => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
() => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));
Parallel.For
var keyPairs = new string[6];
Parallel.For (0, keyPairs.Length,
i => keyPairs[i] = RSA.Create().ToXmlString (true));
keyPairs.Dump();
PLINQ version
ParallelEnumerable.Range (0, 6)
.Select (i => RSA.Create().ToXmlString (true))
.ToArray()
Parallel.Foreach - indexed
Parallel.ForEach ("Hello, world", (c, state, i) =>
{
Console.WriteLine (c.ToString() + i);
});
Parallel Spellchecker with TPL
string wordLookupFile = Path.Combine (Path.GetTempPath(), "WordLookup.txt");
if (!File.Exists (wordLookupFile)) // Contains about 150,000 words
new WebClient().DownloadFile (
"http://www.albahari.com/ispell/allwords.txt", wordLookupFile);
var wordLookup = new HashSet<string> (
File.ReadAllLines (wordLookupFile),
StringComparer.InvariantCultureIgnoreCase);
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; // Introduce a couple
wordsToTest [23456] = "wubsie"; // of spelling mistakes.
var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
misspellings.Add (Tuple.Create ((int) i, word));
});
misspellings.Dump();
Breaking early out of loops
Parallel.ForEach ("Hello, world", (c, loopState) =>
{
if (c == ',')
loopState.Break();
else
Console.Write (c);
});
Optimization with local values - problem
object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
i => { lock (locker) total += Math.Sqrt (i); });
total.Dump();
Optimization with local values - solution
object locker = new object();
double grandTotal = 0;
Parallel.For (1, 10000000,
() => 0.0, // Initialize the local value.
(i, state, localTotal) => // Body delegate. Notice that it
localTotal + Math.Sqrt (i), // returns the new local total.
localTotal => // Add the local value
{ lock (locker) grandTotal += localTotal; } // to the master value.
);
grandTotal.Dump();
PLINQ version sum
ParallelEnumerable.Range (1, 10000000)
.Sum (i => Math.Sqrt (i))
Task Parallelism
Creating and starting tasks
// Note: see Chapter 14 for a basic introduction to tasks.
var task = Task.Run (() => Console.WriteLine ("Hello from a task!"));
task.Wait(); // Wait for task to finish
Decoupling task creation and execution
// You can create "cold" (unstarted) tasks with Task's constructor:
var task = new Task (() => Console.Write ("Hello"));
"We can do something else here...".Dump();
task.Start();
Specifying a state object
static void Main()
{
var task = Task.Factory.StartNew (Greet, "Hello");
}
static void Greet (object state) { Console.Write (state); } // Hello
Putting the state object to better use
static void Main()
{
var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
Console.WriteLine (task.AsyncState); // Greeting
}
static void Greet (string message) { Console.WriteLine (message); }
Child tasks
Task parent = Task.Factory.StartNew (() =>
{
Console.WriteLine ("I am a parent");
Task.Factory.StartNew (() => // Detached task
{
Console.WriteLine ("I am detached");
});
Task.Factory.StartNew (() => // Child task
{
Console.WriteLine ("I am a child");
}, TaskCreationOptions.AttachedToParent);
});
parent.Wait();
Console.WriteLine ("Parent completed");
Exception-handling child tasks
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => // Child
{
Task.Factory.StartNew (() => { throw null; }, atp); // Grandchild
}, atp);
});
// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();
Canceling tasks
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter (500);
Task task = Task.Factory.StartNew (() =>
{
Thread.Sleep (1000);
token.ThrowIfCancellationRequested(); // Check for cancellation request
}, token);
try { task.Wait(); }
catch (AggregateException ex)
{
Console.WriteLine (ex.InnerException is TaskCanceledException); // True
Console.WriteLine (task.IsCanceled); // True
Console.WriteLine (task.Status); // Canceled
}
Continuations
Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));
Continuations with return values
Task.Factory.StartNew<int> (() => 8)
.ContinueWith (ant => ant.Result * 2)
.ContinueWith (ant => Math.Sqrt (ant.Result))
.ContinueWith (ant => Console.WriteLine (ant.Result)); // 4
Continuations and exceptions
Task task1 = Task.Factory.StartNew (() => { throw null; });
Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));
task2.Wait(); // throws an AggregateException
Continuations - rethrowing antecedent exceptions
Task continuation = Task.Factory.StartNew (() => { throw null; })
.ContinueWith (ant =>
{
if (ant.Exception != null) throw ant.Exception;
// Continue processing...
});
continuation.Wait(); // Exception is now thrown back to caller.
Continuations - exceptions and TaskContinuationOptions
Task task1 = Task.Factory.StartNew (() => { throw null; });
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
TaskContinuationOptions.OnlyOnFaulted);
Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
TaskContinuationOptions.NotOnFaulted);
error.Wait();
Continuations - extension to swallow exceptions
void Main()
{
Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();
}
static class Extensions
{
public static void IgnoreExceptions (this Task task)
{
// This could be improved by adding code to log the exception
task.ContinueWith (t => { var ignore = t.Exception; },
TaskContinuationOptions.OnlyOnFaulted);
}
}
Continuations and child tasks
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
TaskContinuationOptions.OnlyOnFaulted)
.Wait(); // throws AggregateException containing three NullReferenceExceptions
Continuations - conditional
Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
TaskContinuationOptions.OnlyOnFaulted);
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3")); // This executes
Continuations - conditional (solution)
Task t1 = Task.Factory.StartNew (() => Console.WriteLine ("nothing awry here"));
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
TaskContinuationOptions.OnlyOnFaulted);
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
TaskContinuationOptions.NotOnCanceled); // Does not execute
Continuations with multiple antecedents
var task1 = Task.Factory.StartNew (() => Console.Write ("X"));
var task2 = Task.Factory.StartNew (() => Console.Write ("Y"));
var continuation = Task.Factory.ContinueWhenAll (
new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));
Continuations with multiple antecedents - collating data
// task1 and task2 would call complex functions in real life:
Task<int> task1 = Task.Factory.StartNew (() => 123);
Task<int> task2 = Task.Factory.StartNew (() => 456);
Task<int> task3 = Task<int>.Factory.ContinueWhenAll (
new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result));
Console.WriteLine (task3.Result); // 579
Multiple continuations on a single antecedents - collating data
var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
var c1 = t.ContinueWith (ant => Console.Write ("X"));
var c2 = t.ContinueWith (ant => Console.Write ("Y"));
Task.WaitAll (c1, c2);
Task Schedulers and UIs
void Main()
{
new MyWindow().ShowDialog();
}
public partial class MyWindow : System.Windows.Window
{
Label lblResult = new Label();
TaskScheduler _uiScheduler; // Declare this as a field so we can use
// it throughout our class.
public MyWindow()
{
InitializeComponent();
}
protected override void OnActivated (EventArgs e)
{
// Get the UI scheduler for the thread that created the form:
_uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
Task.Factory.StartNew<string> (SomeComplexWebService)
.ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
}
void InitializeComponent()
{
lblResult.FontSize = 20;
Content = lblResult;
}
string SomeComplexWebService() { Thread.Sleep (1000); return "Foo"; }
}
Creating your own Task Factories
var factory = new TaskFactory (
TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
TaskContinuationOptions.None);
Task task1 = factory.StartNew (() => "foo".Dump());
Task task2 = factory.StartNew (() => "far".Dump());
Working with AggregateException
AggregateException
try
{
var query = from i in ParallelEnumerable.Range (0, 1000000)
select 100 / i;
// Enumerate query
query.Dump();
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine (ex.Message);
}
Flatten
try
{
var query = from i in ParallelEnumerable.Range (0, 1000000)
select 100 / i;
// Enumerate query
query.Dump();
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.Flatten().InnerExceptions)
ex.Dump();
}
Handle
var parent = Task.Factory.StartNew (() =>
{
// We’ll throw 3 exceptions at once using 3 child tasks:
int[] numbers = { 0 };
var childFactory = new TaskFactory
(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
childFactory.StartNew (() => 5 / numbers[0]); // Division by zero
childFactory.StartNew (() => numbers [1]); // Index out of range
childFactory.StartNew (() => { throw null; }); // Null reference
});
try { parent.Wait(); }
catch (AggregateException aex)
{
aex.Flatten().Handle (ex => // Note that we still need to call Flatten
{
if (ex is DivideByZeroException)
{
Console.WriteLine ("Divide by zero");
return true; // This exception is "handled"
}
if (ex is IndexOutOfRangeException)
{
Console.WriteLine ("Index out of range");
return true; // This exception is "handled"
}
return false; // All other exceptions will get rethrown
});
}
Concurrent Collections
Producer-Consumer Queue
void Main()
{
using (var q = new PCQueue(1))
{
q.EnqueueTask (() => "Foo".Dump());
q.EnqueueTask (() => "Far".Dump());
}
}
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public void EnqueueTask (Action action) { _taskQ.Add (action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called.
foreach (Action action in _taskQ.GetConsumingEnumerable())
action(); // Perform task.
}
}
Producer-Consumer Queue - with Tasks
void Main()
{
using (var pcQ = new PCQueue(1))
{
Task task1 = pcQ.Enqueue (() => Console.WriteLine ("Too"));
Task task2 = pcQ.Enqueue (() => Console.WriteLine ("Easy!"));
task1.ContinueWith (_ => "Task 1 complete".Dump());
task2.ContinueWith (_ => "Task 2 complete".Dump());
}
}
public class PCQueue : IDisposable
{
BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public Task Enqueue (Action action, CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task (action, cancelToken);
_taskQ.Add (task);
return task;
}
public Task<TResult> Enqueue<TResult> (Func<TResult> func,
CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task<TResult> (func, cancelToken);
_taskQ.Add (task);
return task;
}
void Consume()
{
foreach (var task in _taskQ.GetConsumingEnumerable())
try
{
if (!task.IsCanceled) task.RunSynchronously();
}
catch (InvalidOperationException) { } // Race condition
}
public void Dispose() { _taskQ.CompleteAdding(); }
}
EXTRA - Channels
Single Producer - Multiple Consumers
// The consumer is half as fast as the producer. We compensate by starting two consumers.
Channel<string> channel =
Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
{
// Specifying SingleReader and/or SingleWriter
// allows the Channel to make optimizing assumptions
SingleReader = false,
SingleWriter = true,
});
var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer1 = Consume(1);
var consumer2 = Consume(2);
await Task.WhenAll(consumer1, consumer2);
async Task Produce()
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync ($"Msg {i}");
await Task.Delay (1000);
}
Console.WriteLine ("Producer done.");
}
async Task Consume(int id) // We add an ID just to visualize which one processed a given message
{
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead (out string data))
{
Console.WriteLine ($"Processed on {id}: {data}");
// Simulate processing takes twice as long as producing
await Task.Delay (2000);
}
}
Console.WriteLine ($"Consumer {id} done.");
}
Single Producer - Single Consumer
// The consumer is half as fast as the producer. The producer will finish first.
Channel<string> channel =
Channel.CreateBounded<string> (new BoundedChannelOptions (1000)
{
// Specifying SingleReader and/or SingleWriter
// allows the Channel to make optimizing assumptions
SingleReader = true,
SingleWriter = true,
});
var producer = Produce().ContinueWith (_ => channel.Writer.Complete());
var consumer = Consume();
async Task Produce()
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync ($"Msg {i}");
await Task.Delay(1000);
}
Console.WriteLine("Producer done.");
}
async Task Consume()
{
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out string data))
{
Console.WriteLine($"Processed: {data}");
// Simulate processing takes twice as long as producing
await Task.Delay(2000);
}
}
Console.WriteLine("Consumer done.");
}
EXTRA - SpinLock and SpinWait
SpinLock
// See http://www.albahari.com/threading/part5.aspx for the accompanying text on SpinLock and SpinWait
var spinLock = new SpinLock (true); // Enable owner tracking
bool lockTaken = false;
try
{
spinLock.Enter (ref lockTaken);
// Do stuff...
}
finally
{
if (lockTaken) spinLock.Exit();
}
SpinWait - SpinUntil
bool _proceed;
void Main()
{
var task = Task.Factory.StartNew (Test);
Thread.Sleep(1000);
_proceed = true;
task.Wait();
}
void Test()
{
SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
"Done!".Dump();
}
SpinWait - SpinOnce
bool _proceed;
void Main()
{
var task = Task.Run (Test);
Thread.Sleep(1000);
_proceed = true;
task.Wait();
}
void Test()
{
var spinWait = new SpinWait();
while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
"Done!".Dump();
}
SpinWait - Lock-free updates with CompareExchange
int x = 2;
void Main()
{
// We can perform three multiplications on the same variable using 3 concurrent threads
// safely without locks by using SpinWait with Interlocked.CompareExchange.
var task1 = Task.Factory.StartNew (() => MultiplyXBy (3));
var task2 = Task.Factory.StartNew (() => MultiplyXBy (4));
var task3 = Task.Factory.StartNew (() => MultiplyXBy (5));
Task.WaitAll (task1, task2, task3);
x.Dump();
}
void MultiplyXBy (int factor)
{
var spinWait = new SpinWait();
while (true)
{
int snapshot1 = x;
Thread.MemoryBarrier();
int calc = snapshot1 * factor;
int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
if (snapshot1 == snapshot2) return; // No one preempted us.
spinWait.SpinOnce();
}
}