Chapter 14 - Concurrency and Asynchrony
Threading Basics
Creating a thread
Thread t = new Thread (WriteY); // Kick off a new thread
t.Start(); // running WriteY()
// Simultaneously, do something on the main thread.
for (int i = 0; i < 1000; i++) Console.Write ("x");
void WriteY()
{
for (int i = 0; i < 1000; i++) Console.Write ("y");
}
Join
Thread t = new Thread (Go);
t.Start();
t.Join();
Console.WriteLine ("Thread t has ended!");
void Go()
{
for (int i = 0; i < 1000; i++) Console.Write ("y");
}
Sleep
Thread.Sleep (500); // Sleep the current thread for 500 milliseconds
Local state
new Thread (Go).Start(); // Call Go() on a new thread
Go(); // Call Go() on the main thread
void Go()
{
// Declare and use a local variable - 'cycles'
for (int cycles = 0; cycles < 5; cycles++) Console.Write ('?');
}
Shared state - unsafe
bool _done = false;
new Thread (Go).Start();
Go();
void Go()
{
if (!_done) { _done = true; Console.WriteLine ("Done"); }
}
Shared state with fields - unsafe
var tt = new ThreadTest();
new Thread (tt.Go).Start();
tt.Go();
class ThreadTest
{
bool _done;
public void Go()
{
if (!_done) { _done = true; Console.WriteLine ("Done"); }
}
}
Shared state with closure - unsafe
bool done = false;
ThreadStart action = () =>
{
if (!done) { done = true; Console.WriteLine ("Done"); }
};
new Thread (action).Start();
action();
Shared state with statics - unsafe
void Main()
{
ThreadTest.Main();
}
class ThreadTest
{
static bool _done; // Static fields are shared between all threads
// in the same application domain.
public static void Main()
{
new Thread (Go).Start();
Go();
}
static void Go()
{
if (!_done) { _done = true; Console.WriteLine ("Done"); }
}
}
Shared state - safe
void Main()
{
ThreadSafe.Main();
}
class ThreadSafe
{
static bool _done;
static readonly object _locker = new object();
public static void Main()
{
new Thread (Go).Start();
Go();
}
static void Go()
{
lock (_locker)
{
if (!_done) { Console.WriteLine ("Done"); _done = true; }
}
}
}
Passing in data with a lambda expression
Thread t = new Thread ( () => Print ("Hello from t!") );
t.Start();
void Print (string message) => Console.WriteLine (message);
Multi-statement lambda
new Thread (() =>
{
Console.WriteLine ("I'm running on another thread!");
Console.WriteLine ("This is so easy!");
}).Start();
Lambdas and captured variables - unsafe
for (int i = 0; i < 10; i++)
new Thread (() => Console.Write (i)).Start();
Lambdas and captured variables - safe
for (int i = 0; i < 10; i++)
{
int temp = i;
new Thread (() => Console.Write (temp)).Start();
}
Exception handling - wrong place
try
{
new Thread (Go).Start();
}
catch (Exception ex)
{
// We'll never get here!
Console.WriteLine ("Exception!");
}
static void Go() { throw null; } // Throws a NullReferenceException
Exception handling - right place
new Thread (Go).Start();
void Go()
{
try
{
throw null; // The NullReferenceException will get caught below
}
catch (Exception ex)
{
//Typically log the exception, and/or signal another thread
// that we've come unstuck
ex.Dump ("Caught!");
}
}
Basic signaling
var signal = new ManualResetEvent (false);
new Thread (() =>
{
Console.WriteLine ("Waiting for signal...");
signal.WaitOne();
signal.Dispose();
Console.WriteLine ("Got signal!");
}).Start();
Thread.Sleep(2000);
signal.Set(); // “Open” the signal
Threading in rich-client apps
new MyWindow().ShowDialog();
partial class MyWindow : Window
{
TextBox txtMessage;
public MyWindow()
{
InitializeComponent();
new Thread (Work).Start();
}
void Work()
{
Thread.Sleep (5000); // Simulate time-consuming task
UpdateMessage ("The answer");
}
void UpdateMessage (string message)
{
Action action = () => txtMessage.Text = message;
Dispatcher.BeginInvoke (action);
}
void InitializeComponent()
{
SizeToContent = SizeToContent.WidthAndHeight;
WindowStartupLocation = WindowStartupLocation.CenterScreen;
Content = txtMessage = new TextBox { Width=250, Margin=new Thickness (10), Text="Ready" };
}
}
Synchronization contexts
Util.CreateSynchronizationContext();
new MyWindow().ShowDialog();
partial class MyWindow : Window
{
TextBox txtMessage;
SynchronizationContext _uiSyncContext;
public MyWindow()
{
InitializeComponent();
// Capture the synchronization context for the current UI thread:
_uiSyncContext = SynchronizationContext.Current;
new Thread (Work).Start();
}
void Work()
{
Thread.Sleep (5000); // Simulate time-consuming task
UpdateMessage ("The answer");
}
void UpdateMessage (string message)
{
// Marshal the delegate to the UI thread:
_uiSyncContext.Post (_ => txtMessage.Text = message, null);
}
void InitializeComponent()
{
SizeToContent = SizeToContent.WidthAndHeight;
WindowStartupLocation = WindowStartupLocation.CenterScreen;
Content = txtMessage = new TextBox { Width=250, Margin=new Thickness (10), Text="Ready" };
}
}
Entering the ThreadPool
// Task is in System.Threading.Tasks
Task.Run (() => Console.WriteLine ("Hello from the thread pool"));
// The old-school way:
ThreadPool.QueueUserWorkItem (notUsed => Console.WriteLine ("Hello, old-school"));
Tasks
Starting a Task
Task.Run (() => Console.WriteLine ("Foo"));
Wait
Task task = Task.Run (() =>
{
Console.WriteLine ("Task started");
Thread.Sleep (2000);
Console.WriteLine ("Foo");
});
Console.WriteLine (task.IsCompleted); // False
task.Wait(); // Blocks until task is complete
Long-running task
Task task = Task.Factory.StartNew (() =>
{
Console.WriteLine ("Task started");
Thread.Sleep (2000);
Console.WriteLine ("Foo");
}, TaskCreationOptions.LongRunning);
task.Wait(); // Blocks until task is complete
Returning a value
Task<int> task = Task.Run (() => { Console.WriteLine ("Foo"); return 3; });
int result = task.Result; // Blocks if not already finished
Console.WriteLine (result); // 3
Count prime numbers
Task<int> primeNumberTask = Task.Run (() =>
Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
Console.WriteLine ("Task running...");
Console.WriteLine ("The answer is " + primeNumberTask.Result);
Exceptions
// Start a Task that throws a NullReferenceException:
Task task = Task.Run (() => { throw null; });
try
{
task.Wait();
}
catch (AggregateException aex)
{
if (aex.InnerException is NullReferenceException)
Console.WriteLine ("Null!");
else
throw;
}
Continuations - GetAwaiter
Task<int> primeNumberTask = Task.Run (() =>
Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
var awaiter = primeNumberTask.GetAwaiter();
awaiter.OnCompleted (() =>
{
int result = awaiter.GetResult();
Console.WriteLine (result); // Writes result
});
Continuations - ContinueWith
// (See Chapter 22 for more on using ContinueWith.)
Task<int> primeNumberTask = Task.Run (() =>
Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
primeNumberTask.ContinueWith (antecedent =>
{
int result = antecedent.Result;
Console.WriteLine (result); // Writes 123
});
TaskCompletionSource - Print 42 after 5 seconds
var tcs = new TaskCompletionSource<int>();
new Thread (() => { Thread.Sleep (5000); tcs.SetResult (42); }).Start();
Task<int> task = tcs.Task; // Our "slave" task.
Console.WriteLine (task.Result); // 42
TaskCompletionSource - Our own Run method
Task<int> task = Run (() => { Thread.Sleep (5000); return 42; });
task.Result.Dump();
Task<TResult> Run<TResult> (Func<TResult> function)
{
var tcs = new TaskCompletionSource<TResult>();
new Thread (() =>
{
try { tcs.SetResult (function()); }
catch (Exception ex) { tcs.SetException (ex); }
}).Start();
return tcs.Task;
}
TaskCompletionSource - GetAnswerToLife
var awaiter = GetAnswerToLife().GetAwaiter();
awaiter.OnCompleted (() => Console.WriteLine (awaiter.GetResult()));
Task<int> GetAnswerToLife()
{
var tcs = new TaskCompletionSource<int>();
// Create a timer that fires once in 5000 ms:
var timer = new System.Timers.Timer (5000) { AutoReset = false };
timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (42); };
timer.Start();
return tcs.Task;
}
Writing Delay method
Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));
Task Delay (int milliseconds)
{
var tcs = new TaskCompletionSource<object>();
var timer = new System.Timers.Timer (milliseconds) { AutoReset = false };
timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (null); };
timer.Start();
return tcs.Task;
}
Delay times 10000
for (int i = 0; i < 10000; i++)
Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));
Task Delay (int milliseconds)
{
var tcs = new TaskCompletionSource<object>();
var timer = new System.Timers.Timer (milliseconds) { AutoReset = false };
timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult (null); };
timer.Start();
return tcs.Task;
}
Task.Delay
Task.Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));
// Another way to attach a continuation:
Task.Delay (5000).ContinueWith (ant => Console.WriteLine (42));
ValueTask
var vt1 = AnswerQuestionAsync ("What's the answer to life?");
var vt2 = AnswerQuestionAsync ("Is the sun shining?");
Console.WriteLine ($"vt1.IsCompleted: {vt1.IsCompleted}"); // True
Console.WriteLine ($"vt2.IsCompleted: {vt2.IsCompleted}"); // False
var a1 = await vt1;
Console.WriteLine ($"a1: {a1}"); // Immediate
var a2 = await vt2;
Console.WriteLine ($"a2: {a2}"); // Takes 5 seconds to appear
async ValueTask<string> AnswerQuestionAsync (string question)
{
if (question == "What's the answer to life?")
return "42"; // ValueTask<string>
return await AskCortanaAsync(question); // ValueTask<Task<string>>
}
async Task<string> AskCortanaAsync (string question)
{
await Task.Delay(5000);
return "I don't know.";
}
Principles of Asynchrony
GetPrimesCount
void Main()
{
DisplayPrimeCounts();
}
void DisplayPrimeCounts()
{
for (int i = 0; i < 10; i++)
Console.WriteLine (GetPrimesCount (i*1000000 + 2, 1000000) +
" primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
Console.WriteLine ("Done!");
}
int GetPrimesCount (int start, int count)
{
return
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0));
}
Course-grained asynchrony
void Main()
{
Task.Run (() => DisplayPrimeCounts());
}
void DisplayPrimeCounts()
{
for (int i = 0; i < 10; i++)
Console.WriteLine (GetPrimesCount (i*1000000 + 2, 1000000) +
" primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
Console.WriteLine ("Done!");
}
int GetPrimesCount (int start, int count)
{
return
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0));
}
Fine-grained asynchrony
void Main()
{
DisplayPrimeCounts();
}
void DisplayPrimeCounts()
{
DisplayPrimeCountsFrom (0);
}
void DisplayPrimeCountsFrom (int i) // This is starting to get awkward!
{
var awaiter = GetPrimesCountAsync (i*1000000 + 2, 1000000).GetAwaiter();
awaiter.OnCompleted (() =>
{
Console.WriteLine (awaiter.GetResult() + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
if (i++ < 10) DisplayPrimeCountsFrom (i);
else Console.WriteLine ("Done");
});
}
Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}
Making DisplayPrimesCount asynchronous
void Main()
{
DisplayPrimeCountsAsync();
}
Task DisplayPrimeCountsAsync()
{
var machine = new PrimesStateMachine();
machine.DisplayPrimeCountsFrom (0);
return machine.Task;
}
class PrimesStateMachine // Even more awkward!!
{
TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
public Task Task { get { return _tcs.Task; } }
public void DisplayPrimeCountsFrom (int i)
{
var awaiter = GetPrimesCountAsync (i*1000000+2, 1000000).GetAwaiter();
awaiter.OnCompleted (() =>
{
Console.WriteLine (awaiter.GetResult());
if (i++ < 10) DisplayPrimeCountsFrom (i);
else { Console.WriteLine ("Done"); _tcs.SetResult (null); }
});
}
Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}
}
Asynchronous functions to the rescue
void Main()
{
DisplayPrimeCountsAsync();
}
async Task DisplayPrimeCountsAsync()
{
for (int i = 0; i < 10; i++)
Console.WriteLine (await GetPrimesCountAsync (i*1000000 + 2, 1000000) +
" primes between " + (i*1000000) + " and " + ((i+1)*1000000-1));
Console.WriteLine ("Done!");
}
Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}
Asynchronous Functions in C# 5.0
Awaiting
void Main()
{
DisplayPrimesCount();
}
async void DisplayPrimesCount()
{
int result = await GetPrimesCountAsync (2, 1000000);
Console.WriteLine (result);
}
Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
}
Capturing local state
void Main()
{
DisplayPrimeCounts();
}
async void DisplayPrimeCounts()
{
for (int i = 0; i < 10; i++)
Console.WriteLine (await GetPrimesCountAsync (i*1000000+2, 1000000));
}
Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));
}
Awaiting in a UI - synchronous
void Main()
{
new TestUI().ShowDialog();
}
class TestUI : Window // Notice how the window becomes unresponsive while working
{
Button _button = new Button { Content = "Go" };
TextBlock _results = new TextBlock();
public TestUI()
{
var panel = new StackPanel();
panel.Children.Add (_button);
panel.Children.Add (_results);
Content = panel;
_button.Click += (sender, args) => Go();
}
void Go()
{
for (int i = 1; i < 5; i++)
_results.Text += GetPrimesCount (i * 1000000, 1000000) +
" primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine;
}
int GetPrimesCount (int start, int count)
{
return ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0));
}
}
Awaiting in a UI - asynchronous
void Main()
{
new TestUI().ShowDialog();
}
class TestUI : Window // Notice how the window becomes unresponsive while working
{
Button _button = new Button { Content = "Go" };
TextBlock _results = new TextBlock();
public TestUI()
{
var panel = new StackPanel();
panel.Children.Add (_button);
panel.Children.Add (_results);
Content = panel;
_button.Click += (sender, args) => Go();
}
async void Go()
{
_button.IsEnabled = false;
for (int i = 1; i < 5; i++)
_results.Text += await GetPrimesCountAsync (i * 1000000, 1000000) +
" primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine;
_button.IsEnabled = true;
}
Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}
}
Awaiting in a UI - IO-bound
void Main()
{
new TestUI().ShowDialog();
}
class TestUI : Window // Notice how the window becomes unresponsive while working
{
Button _button = new Button { Content = "Go" };
TextBlock _results = new TextBlock();
public TestUI()
{
var panel = new StackPanel();
panel.Children.Add (_button);
panel.Children.Add (_results);
Content = panel;
_button.Click += (sender, args) => Go();
}
async void Go()
{
_button.IsEnabled = false;
string[] urls = "www.albahari.com www.oreilly.com www.linqpad.net".Split();
int totalLength = 0;
try
{
foreach (string url in urls)
{
var uri = new Uri ("http://" + url);
byte[] data = await new WebClient().DownloadDataTaskAsync (uri);
_results.Text += "Length of " + url + " is " + data.Length + Environment.NewLine;
totalLength += data.Length;
}
_results.Text += "Total length: " + totalLength;
}
catch (WebException ex)
{
_results.Text += "Error: " + ex.Message;
}
finally { _button.IsEnabled = true; }
}
Task<int> GetPrimesCountAsync (int start, int count)
{
return Task.Run (() =>
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0)));
}
}
Awaiting in a UI - Comparison to course-grained concurrency
void Main()
{
new TestUI().ShowDialog();
}
class TestUI : Window // Notice how the window becomes unresponsive while working
{
Button _button = new Button { Content = "Go" };
TextBlock _results = new TextBlock();
public TestUI()
{
var panel = new StackPanel();
panel.Children.Add (_button);
panel.Children.Add (_results);
Content = panel;
_button.Click += (sender, args) =>
{
_button.IsEnabled = false;
Task.Run (() => Go());
};
}
void Go()
{
// Notice the race condition (run it and look at what's wrong with the results):
for (int i = 1; i < 5; i++)
{
int result = GetPrimesCount (i * 1000000, 1000000);
Dispatcher.BeginInvoke (new Action (() =>
_results.Text += result + " primes between " + (i*1000000) + " and " + ((i+1)*1000000-1) + Environment.NewLine));
}
Dispatcher.BeginInvoke (new Action (() => _button.IsEnabled = true));
}
int GetPrimesCount (int start, int count)
{
return
ParallelEnumerable.Range (start, count).Count (n =>
Enumerable.Range (2, (int) Math.Sqrt(n)-1).All (i => n % i > 0));
}
}
Writing asynchronous functions
void Main()
{
Go();
}
async Task Go()
{
await PrintAnswerToLife();
Console.WriteLine ("Done");
}
async Task PrintAnswerToLife() // We can return Task instead of void
{
await Task.Delay (5000);
int answer = 21 * 2;
Console.WriteLine (answer);
}
Returning Task of TResult
void Main()
{
Go();
}
async Task Go()
{
await PrintAnswerToLife();
Console.WriteLine ("Done");
}
async Task PrintAnswerToLife()
{
int answer = await GetAnswerToLife();
Console.WriteLine (answer);
}
async Task<int> GetAnswerToLife()
{
await Task.Delay (5000);
int answer = 21 * 2;
return answer;
}
Blocking versions of the above
void Main()
{
Go();
}
void Go()
{
PrintAnswerToLife();
Console.WriteLine ("Done");
}
void PrintAnswerToLife()
{
int answer = GetAnswerToLife();
Console.WriteLine (answer);
}
int GetAnswerToLife()
{
Thread.Sleep (5000);
int answer = 21 * 2;
return answer;
}
Parallelism
void Main()
{
Go();
}
async Task Go()
{
var task1 = PrintAnswerToLife();
var task2 = PrintAnswerToLife();
await task1; await task2;
Console.WriteLine ("Done");
}
async Task PrintAnswerToLife()
{
int answer = await GetAnswerToLife();
Console.WriteLine (answer);
}
async Task<int> GetAnswerToLife()
{
await Task.Delay (5000);
int answer = 21 * 2;
return answer;
}
Asynchronous lambda expressions
// Named asynchronous method:
async Task NamedMethod()
{
await Task.Delay (1000);
Console.WriteLine ("Foo");
}
async void Main()
{
// Unnamed asynchronous method:
Func<Task> unnamed = async () =>
{
await Task.Delay (1000);
Console.WriteLine ("Foo");
};
// We can call the two in the same way:
await NamedMethod();
await unnamed();
}
Asynchronous lambda expressions - event handlers
var myButton = new Button { Height = 30, Content = "Wait..." };
myButton.Click += async (sender, args) =>
{
await Task.Delay (1000);
myButton.Content = "Done";
};
myButton.Dump();
Asynchronous lambda expressions - returning Task of TResult
Func<Task<int>> unnamed = async () =>
{
await Task.Delay (1000);
return 123;
};
int answer = await unnamed();
answer.Dump();
Optimizations - Completing synchronously
async void Main()
{
string html = await GetWebPageAsync ("http://www.linqpad.net");
html.Length.Dump ("Characters downloaded");
// Let's try again. It should be instant this time:
html = await GetWebPageAsync ("http://www.linqpad.net");
html.Length.Dump ("Characters downloaded");
}
static Dictionary<string,string> _cache = new Dictionary<string,string>();
async Task<string> GetWebPageAsync (string uri)
{
string html;
if (_cache.TryGetValue (uri, out html)) return html;
return _cache [uri] = await new WebClient().DownloadStringTaskAsync (uri);
}
Optimizations - Caching Tasks
async void Main()
{
string html = await GetWebPageAsync ("http://www.linqpad.net");
html.Length.Dump ("Characters downloaded");
// Let's try again. It should be instant this time:
html = await GetWebPageAsync ("http://www.linqpad.net");
html.Length.Dump ("Characters downloaded");
}
static Dictionary<string,Task<string>> _cache =
new Dictionary<string,Task<string>>();
Task<string> GetWebPageAsync (string uri)
{
Task<string> downloadTask;
if (_cache.TryGetValue (uri, out downloadTask)) return downloadTask;
return _cache [uri] = new WebClient().DownloadStringTaskAsync (uri);
}
Optimizations - Caching Tasks fully threadsafe
async void Main()
{
string html = await GetWebPageAsync ("http://www.linqpad.net");
html.Length.Dump ("Characters downloaded");
// Let's try again. It should be instant this time:
html = await GetWebPageAsync ("http://www.linqpad.net");
html.Length.Dump ("Characters downloaded");
}
static Dictionary<string,Task<string>> _cache =
new Dictionary<string,Task<string>>();
Task<string> GetWebPageAsync (string uri)
{
lock (_cache)
{
Task<string> downloadTask;
if (_cache.TryGetValue (uri, out downloadTask)) return downloadTask;
return _cache [uri] = new WebClient().DownloadStringTaskAsync (uri);
}
}
Optimizations - Avoiding excessive bouncing
void Main()
{
A();
}
async void A()
{
await B();
}
async Task B()
{
for (int i = 0; i < 1000; i++)
await C().ConfigureAwait (false);
}
async Task C() { /*...*/ }
Asynchronous Streams from C# 8.0
Asynchronous Stream vs Task of IEnumerable
Console.WriteLine($"Starting async Task<IEnumerable<int>>. Data arrives in one group.");
foreach (var data in await RangeTaskAsync(0, 10, 500))
Console.WriteLine (data);
Console.WriteLine($"Starting async Task<IEnumerable<int>>. Data arrives as available.");
await foreach (var number in RangeAsync (0, 10, 500))
Console.WriteLine (number);
static async Task<IEnumerable<int>> RangeTaskAsync(int start, int count, int delay)
{
List<int> data = new List<int>();
for (int i = start; i < start + count; i++)
{
await Task.Delay (delay);
data.Add (i);
}
return data;
}
async IAsyncEnumerable<int> RangeAsync (
int start, int count, int delay)
{
for (int i = start; i < start + count; i++)
{
await Task.Delay (delay);
yield return i;
}
}
Asynchronous Streams and LINQ
async Task Main()
{
IAsyncEnumerable<int> query =
from i in RangeAsync (0, 10, 500)
where i % 2 == 0 // Even numbers only.
select i * 10; // Multiply by 10.
await foreach (var number in query)
Console.WriteLine (number);
query.Dump(); // in LINQPad, you can directly dump IAsyncEnumerable<T>
}
async IAsyncEnumerable<int> RangeAsync (
int start, int count, int delay)
{
for (int i = start; i < start + count; i++)
{
await Task.Delay (delay);
yield return i;
}
}
Asynchronous Patterns
Cancellation
async void Main()
{
var token = new CancellationToken();
Task.Delay (5000).ContinueWith (ant => token.Cancel()); // Tell it to cancel in two seconds.
await Foo (token);
}
// This is a simplified version of the CancellationToken type in System.Threading:
class CancellationToken
{
public bool IsCancellationRequested { get; private set; }
public void Cancel() { IsCancellationRequested = true; }
public void ThrowIfCancellationRequested()
{
if (IsCancellationRequested) throw new OperationCanceledException();
}
}
async Task Foo (CancellationToken cancellationToken)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine (i);
await Task.Delay (1000);
cancellationToken.ThrowIfCancellationRequested();
}
}
Using the real CancellationToken
async void Main()
{
var cancelSource = new CancellationTokenSource();
Task.Delay (5000).ContinueWith (ant => cancelSource.Cancel()); // Tell it to cancel in two seconds.
await Foo (cancelSource.Token);
}
async Task Foo (CancellationToken cancellationToken)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine (i);
await Task.Delay (1000);
cancellationToken.ThrowIfCancellationRequested();
}
}
Using the real CancellationToken - improved version
async void Main()
{
var cancelSource = new CancellationTokenSource (5000); // This tells it to cancel in 5 seconds
await Foo (cancelSource.Token);
}
async Task Foo (CancellationToken cancellationToken)
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine (i);
await Task.Delay (1000, cancellationToken); // Cancellation tokens propagate nicely
}
}
Progress reporting - with a delegate
async void Main()
{
Action<int> progress = i => Console.WriteLine (i + " %");
await Foo (progress);
}
Task Foo (Action<int> onProgressPercentChanged)
{
return Task.Run (() =>
{
for (int i = 0; i < 1000; i++)
{
if (i % 10 == 0) onProgressPercentChanged (i / 10);
// Do something compute-bound...
}
});
}
Progress reporting - with IProgress
async void Main()
{
Action<int> progress = i => Console.WriteLine (i + " %");
await Foo (progress);
}
Task Foo (Action<int> onProgressPercentChanged)
{
return Task.Run (() =>
{
for (int i = 0; i < 1000; i++)
{
if (i % 10 == 0) onProgressPercentChanged (i / 10);
// Do something compute-bound...
}
});
}
Task combinators - WhenAny
async void Main()
{
Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3());
Console.WriteLine ("Done");
Console.WriteLine (winningTask.Result); // 1
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - await winning task
async void Main()
{
Task<int> winningTask = await Task.WhenAny (Delay1(), Delay2(), Delay3());
Console.WriteLine ("Done");
Console.WriteLine (await winningTask); // 1
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - in one step
async void Main()
{
int answer = await await Task.WhenAny (Delay1(), Delay2(), Delay3());
answer.Dump();
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAny - timeouts
async void Main()
{
Task<string> task = SomeAsyncFunc();
Task winner = await (Task.WhenAny (task, Task.Delay(5000)));
if (winner != task) throw new TimeoutException();
string result = await task; // Unwrap result/re-throw
}
async Task<string> SomeAsyncFunc()
{
await Task.Delay (10000);
return "foo";
}
Task combinators - WhenAll
async void Main()
{
await Task.WhenAll (Delay1(), Delay2(), Delay3());
"Done".Dump();
}
async Task<int> Delay1() { await Task.Delay (1000); return 1; }
async Task<int> Delay2() { await Task.Delay (2000); return 2; }
async Task<int> Delay3() { await Task.Delay (3000); return 3; }
Task combinators - WhenAll - exceptions
async void Main()
{
Task task1 = Task.Run (() => { throw null; } );
Task task2 = Task.Run (() => { throw null; } );
Task all = Task.WhenAll (task1, task2);
try { await all; }
catch
{
Console.WriteLine (all.Exception.InnerExceptions.Count); // 2
}
}
Task combinators - WhenAll - return values
async void Main()
{
Task<int> task1 = Task.Run (() => 1);
Task<int> task2 = Task.Run (() => 2);
int[] results = await Task.WhenAll (task1, task2); // { 1, 2 }
results.Dump();
}
Task combinators - WhenAll - web page downloads
async void Main()
{
int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split());
totalSize.Dump();
}
async Task<int> GetTotalSize (string[] uris)
{
IEnumerable<Task<byte[]>> downloadTasks = uris.Select (uri => new WebClient().DownloadDataTaskAsync (uri));
byte[][] contents = await Task.WhenAll (downloadTasks);
return contents.Sum (c => c.Length);
}
Task combinators - WhenAll - web page downloads improved
async void Main()
{
int totalSize = await GetTotalSize ("http://www.linqpad.net http://www.albahari.com http://stackoverflow.com".Split());
totalSize.Dump();
}
async Task<int> GetTotalSize (string[] uris)
{
IEnumerable<Task<int>> downloadTasks = uris.Select (async uri =>
(await new WebClient().DownloadDataTaskAsync (uri)).Length);
int[] contentLengths = await Task.WhenAll (downloadTasks);
return contentLengths.Sum();
}
Custom combinators - WithTimeout
async void Main()
{
string result = await SomeAsyncFunc().WithTimeout (TimeSpan.FromSeconds (2));
result.Dump();
}
async Task<string> SomeAsyncFunc()
{
await Task.Delay (10000);
return "foo";
}
public static class Extensions
{
public async static Task<TResult> WithTimeout<TResult> (this Task<TResult> task, TimeSpan timeout)
{
Task winner = await (Task.WhenAny (task, Task.Delay (timeout)));
if (winner != task) throw new TimeoutException();
return await task; // Unwrap result/re-throw
}
}
Custom combinators - WithCancellation
async void Main()
{
var cts = new CancellationTokenSource (3000); // Cancel after 3 seconds
string result = await SomeAsyncFunc().WithCancellation (cts.Token);
result.Dump();
}
async Task<string> SomeAsyncFunc()
{
await Task.Delay (10000);
return "foo";
}
public static class Extensions
{
public static Task<TResult> WithCancellation<TResult> (this Task<TResult> task, CancellationToken cancelToken)
{
var tcs = new TaskCompletionSource<TResult>();
var reg = cancelToken.Register (() => tcs.TrySetCanceled ());
task.ContinueWith (ant =>
{
reg.Dispose();
if (ant.IsCanceled)
tcs.TrySetCanceled();
else if (ant.IsFaulted)
tcs.TrySetException (ant.Exception.InnerException);
else
tcs.TrySetResult (ant.Result);
});
return tcs.Task;
}
}
Custom combinators - WhenAllOrError
// This will throw an exception immediately.
async void Main()
{
Task<int> task1 = Task.Run (() => { throw null; return 42; } );
Task<int> task2 = Task.Delay (5000).ContinueWith (ant => 53);
int[] results = await WhenAllOrError (task1, task2);
}
async Task<TResult[]> WhenAllOrError<TResult> (params Task<TResult>[] tasks)
{
var killJoy = new TaskCompletionSource<TResult[]>();
foreach (var task in tasks)
task.ContinueWith (ant =>
{
if (ant.IsCanceled)
killJoy.TrySetCanceled();
else if (ant.IsFaulted)
killJoy.TrySetException (ant.Exception.InnerException);
});
return await await Task.WhenAny (killJoy.Task, Task.WhenAll (tasks));
}