Chapter 21 - Advanced Threading
Exclusive Locking
Simple use of lock
static readonly object _locker = new object();
static int _val1, _val2;
static void Main()
{
for (int i = 1; i <= 1000; i++)
{
if (i % 100 == 0) Console.WriteLine ($"Tried {i} times to get DivideByZeroException");
var t1 = new Thread (Go); t1.Start();
var t2 = new Thread (Go); t2.Start();
var t3 = new Thread (Go); t3.Start();
t1.Join(); t2.Join(); t3.Join();
}
}
static void Go()
{
lock (_locker) // Threadsafe: will never get DivideByZeroException
{
if (_val2 != 0) Console.WriteLine (_val1 / _val2);
_val2 = 0;
}
}
Nested locking
object locker = new object();
lock (locker)
{
AnotherMethod();
// We still have the lock - because locks are reentrant.
}
void AnotherMethod()
{
lock (locker) { Console.WriteLine ("Another method"); }
}
Deadlocks
object locker1 = new object();
object locker2 = new object();
new Thread (() => {
lock (locker1)
{
Thread.Sleep (1000);
lock (locker2) { } // Deadlock
}
}).Start();
lock (locker2)
{
Thread.Sleep (1000);
lock (locker1) { } // Deadlock
}
Mutex
// To test this in LINQPad, run the query then clone the query (Shift+Ctrl+C) and run the copy at the same time.
static void Main()
{
// Naming a Mutex makes it available computer-wide. Use a name that's
// unique to your company and application (e.g., include your URL).
using (var mutex = new Mutex (false, "oreilly.com OneAtATimeDemo"))
{
// Wait a few seconds if contended, in case another instance
// of the program is still in the process of shutting down.
if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false))
{
Console.WriteLine ("Another instance of the app is running. Bye!");
return;
}
RunProgram();
}
}
static void RunProgram()
{
Console.WriteLine ("Running. Press Enter to exit");
Console.ReadLine();
}
Locking and Thread Safety
Thread safety and Framework types
static List <string> _list = new List <string>();
static void Main()
{
new Thread (AddItem).Start();
new Thread (AddItem).Start();
}
static void AddItem()
{
lock (_list) _list.Add ("Item " + _list.Count);
string[] items;
lock (_list) items = _list.ToArray();
foreach (string s in items) Console.WriteLine (s);
}
// Note: In LINQPad, press Shift+F5 to clear static variables.
Thread safety in application servers
void Main()
{
new Thread (() => UserCache.GetUser (1).Dump()).Start();
new Thread (() => UserCache.GetUser (1).Dump()).Start();
new Thread (() => UserCache.GetUser (1).Dump()).Start();
}
static class UserCache
{
static Dictionary <int, User> _users = new Dictionary <int, User>();
internal static User GetUser (int id)
{
User u = null;
lock (_users)
if (_users.TryGetValue (id, out u))
return u;
u = RetrieveUser (id); // Method to retrieve from database;
lock (_users) _users [id] = u;
return u;
}
static User RetrieveUser (int id)
{
Thread.Sleep(1000); // simulate a time-consuming operation
return new User { ID = id };
}
}
class User { public int ID; }
Thread safety in application servers - enhanced
async Task Main()
{
new Thread (() => UserCache.GetUserAsync (1).Dump()).Start();
new Thread (() => UserCache.GetUserAsync (1).Dump()).Start();
new Thread (() => UserCache.GetUserAsync (1).Dump()).Start();
// You can also await this method:
User user = await UserCache.GetUserAsync (1);
user.Dump();
}
static class UserCache
{
static Dictionary <int, Task<User>> _userTasks =
new Dictionary <int, Task<User>>();
internal static Task<User> GetUserAsync (int id)
{
lock (_userTasks)
if (_userTasks.TryGetValue (id, out var userTask))
return userTask;
else
return _userTasks [id] = Task.Run (() => RetrieveUser (id));
}
static User RetrieveUser (int id)
{
Thread.Sleep(1000); // simulate a time-consuming operation
return new User { ID = id };
}
}
class User { public int ID; }
Non-exclusive Locking
Semaphore
static SemaphoreSlim _sem = new SemaphoreSlim (3); // Capacity of 3
static void Main()
{
for (int i = 1; i <= 5; i++) new Thread (Enter).Start (i);
}
static void Enter (object id)
{
Console.WriteLine (id + " wants to enter");
_sem.Wait();
Console.WriteLine (id + " is in!"); // Only three threads
Thread.Sleep (1000 * (int) id); // can be here at
Console.WriteLine (id + " is leaving"); // a time.
_sem.Release();
}
Async semaphores and locks
SemaphoreSlim _semaphore = new SemaphoreSlim (4); // 4 downloads at a time
void Main()
{
Util.AutoScrollResults = true;
for (int i = 0; i < 50; i++)
{
int local = i;
DownloadWithSemaphoreAsync ("http://SomeUri/" + i);
}
}
async void DownloadWithSemaphoreAsync (string uri)
{
using (await _semaphore.EnterAsync())
await Task.Delay (500); // Simulate delay while downloading
$"Downloaded {uri}".Dump();
}
static class Extensions
{
public static async Task<IDisposable> EnterAsync (this SemaphoreSlim ss)
{
await ss.WaitAsync().ConfigureAwait (false);
return Disposable.Create (() => ss.Release());
}
}
Parallel.ForEachAsync
Util.AutoScrollResults = true;
// These are the 50 URIs that we want to download.
var uris = Enumerable.Range (0, 50).Select (i => "http://SomeUri/" + i);
await Parallel.ForEachAsync (uris,
new ParallelOptions
{
MaxDegreeOfParallelism = 5, // Limit concurrency to a maximum of 5 downloads at a time.
CancellationToken = QueryCancelToken // Use LINQPad's cancellation token. Test by pressing Stop button on toolbar.
},
async (uri, cancelToken) =>
{
var progressBar = new Util.ProgressBar ("Downloading " + uri, true).Dump();
// Simulate delay while downloading
for (int i = 0; i <= 10; i++)
{
await Task.Delay (100, cancelToken);
progressBar.Percent = i * 10;
}
$"Downloaded {uri}".Dump();
});
ReaderWriterLockSlim
static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static List<int> _items = new List<int>();
static Random _rand = new Random();
static void Main()
{
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Write).Start ("A");
new Thread (Write).Start ("B");
}
static void Read()
{
while (true)
{
_rw.EnterReadLock();
foreach (int i in _items) Thread.Sleep (10);
_rw.ExitReadLock();
}
}
static void Write (object threadID)
{
while (true)
{
int newNumber = GetRandNum (100);
_rw.EnterWriteLock();
_items.Add (newNumber);
_rw.ExitWriteLock();
Console.WriteLine ("Thread " + threadID + " added " + newNumber);
Thread.Sleep (100);
}
}
static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }
ReaderWriterLockSlim - upgradeable locks
static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
static List<int> _items = new List<int>();
static Random _rand = new Random();
static void Main()
{
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Read).Start();
new Thread (Write).Start ("A");
new Thread (Write).Start ("B");
}
static void Read()
{
while (true)
{
_rw.EnterReadLock();
foreach (int i in _items) Thread.Sleep (10);
_rw.ExitReadLock();
}
}
static void Write (object threadID)
{
while (true)
{
int newNumber = GetRandNum (100);
_rw.EnterUpgradeableReadLock();
if (!_items.Contains (newNumber))
{
_rw.EnterWriteLock();
_items.Add (newNumber);
_rw.ExitWriteLock();
Console.WriteLine ("Thread " + threadID + " added " + newNumber);
}
_rw.ExitUpgradeableReadLock();
Thread.Sleep (100);
}
}
static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }
ReaderWriterLockSlim - lock recursion
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();
rw.EnterWriteLock();
rw.EnterReadLock();
Console.WriteLine (rw.IsReadLockHeld); // True
Console.WriteLine (rw.IsWriteLockHeld); // True
rw.ExitReadLock();
rw.ExitWriteLock();
Signaling with Event Wait Handles
AutoResetEvent
static EventWaitHandle _waitHandle = new AutoResetEvent (false);
static void Main()
{
new Thread (Waiter).Start();
Thread.Sleep (1000); // Pause for a second...
_waitHandle.Set(); // Wake up the Waiter.
}
static void Waiter()
{
Console.WriteLine ("Waiting...");
_waitHandle.WaitOne(); // Wait for notification
Console.WriteLine ("Notified");
}
Two-way signaling
static EventWaitHandle _ready = new AutoResetEvent (false);
static EventWaitHandle _go = new AutoResetEvent (false);
static readonly object _locker = new object();
static string _message;
static void Main()
{
new Thread (Work).Start();
_ready.WaitOne(); // First wait until worker is ready
lock (_locker) _message = "ooo";
_go.Set(); // Tell worker to go
_ready.WaitOne();
lock (_locker) _message = "ahhh"; // Give the worker another message
_go.Set();
_ready.WaitOne();
lock (_locker) _message = null; // Signal the worker to exit
_go.Set();
}
static void Work()
{
while (true)
{
_ready.Set(); // Indicate that we're ready
_go.WaitOne(); // Wait to be kicked off...
lock (_locker)
{
if (_message == null) return; // Gracefully exit
Console.WriteLine (_message);
}
}
}
CountdownEvent
var countdown = new CountdownEvent (3); // Initialize with "count" of 3.
new Thread (SaySomething).Start ("I am thread 1");
new Thread (SaySomething).Start ("I am thread 2");
new Thread (SaySomething).Start ("I am thread 3");
countdown.Wait(); // Blocks until Signal has been called 3 times
Console.WriteLine ("All threads have finished speaking!");
void SaySomething (object thing)
{
Thread.Sleep (1000);
Console.WriteLine (thing);
countdown.Signal();
}
Wait Handles and continuations
var starter = new ManualResetEvent (false);
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject (starter, Go, "Some Data", -1, true);
Thread.Sleep (5000);
Console.WriteLine ("Signaling worker...");
starter.Set();
Console.ReadLine();
reg.Unregister (starter); // Clean up when we’re done.
void Go (object data, bool timedOut)
{
Console.WriteLine ("Started - " + data);
// Perform task...
}
The Barrier Class
Barrier
var barrier = new Barrier (3);
new Thread (Speak).Start();
new Thread (Speak).Start();
new Thread (Speak).Start();
void Speak()
{
for (int i = 0; i < 5; i++)
{
Console.Write (i + " ");
barrier.SignalAndWait();
}
}
Barrier - post-phase action
static Barrier _barrier = new Barrier (3, barrier => Console.WriteLine());
static void Main()
{
new Thread (Speak).Start();
new Thread (Speak).Start();
new Thread (Speak).Start();
}
static void Speak()
{
for (int i = 0; i < 5; i++)
{
Console.Write (i + " ");
_barrier.SignalAndWait();
}
}
Lazy Initialization
Intro
new Foo().Expensive.Dump();
class Foo
{
Expensive _expensive;
public Expensive Expensive // Lazily instantiate Expensive
{
get
{
if (_expensive == null) _expensive = new Expensive();
return _expensive;
}
}
}
class Expensive { /* Suppose this is expensive to construct */ }
Intro - with lock
void Main()
{
new Foo().Expensive.Dump();
}
class Foo
{
Expensive _expensive;
readonly object _expenseLock = new object();
public Expensive Expensive
{
get
{
lock (_expenseLock)
{
if (_expensive == null) _expensive = new Expensive();
return _expensive;
}
}
}
}
class Expensive { /* Suppose this is expensive to construct */ }
Lazy of T
void Main()
{
new Foo().Expensive.Dump();
}
class Foo
{
Lazy<Expensive> _expensive = new Lazy<Expensive> (() => new Expensive(), true);
public Expensive Expensive { get { return _expensive.Value; } }
}
class Expensive { /* Suppose this is expensive to construct */ }
LazyInitializer
void Main()
{
new Foo().Expensive.Dump();
}
class Foo
{
Expensive _expensive;
public Expensive Expensive
{ // Implement double-checked locking
get
{
LazyInitializer.EnsureInitialized (ref _expensive, () => new Expensive());
return _expensive;
}
}
}
class Expensive { /* Suppose this is expensive to construct */ }
Thread-local Storage
ThreadStatic
[ThreadStatic] static int _x;
void Main()
{
new Thread (() => { Thread.Sleep(1000); _x++; _x.Dump(); }).Start();
new Thread (() => { Thread.Sleep(2000); _x++; _x.Dump(); }).Start();
new Thread (() => { Thread.Sleep(3000); _x++; _x.Dump(); }).Start();
}
ThreadLocal
static ThreadLocal<int> _x = new ThreadLocal<int> (() => 3);
void Main()
{
new Thread (() => { Thread.Sleep(1000); _x.Value++; _x.Dump(); }).Start();
new Thread (() => { Thread.Sleep(2000); _x.Value++; _x.Dump(); }).Start();
new Thread (() => { Thread.Sleep(3000); _x.Value++; _x.Dump(); }).Start();
}
GetData and SetData
void Main()
{
var test = new Test();
new Thread (() => { Thread.Sleep(1000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start();
new Thread (() => { Thread.Sleep(2000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start();
new Thread (() => { Thread.Sleep(3000); test.SecurityLevel++; test.SecurityLevel.Dump(); }).Start();
}
class Test
{
// The same LocalDataStoreSlot object can be used across all threads.
LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel");
// This property has a separate value on each thread.
public int SecurityLevel
{
get
{
object data = Thread.GetData (_secSlot);
return data == null ? 0 : (int) data; // null == uninitialized
}
set
{
Thread.SetData (_secSlot, value);
}
}
}
AsyncLocal
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();
async Task Main()
{
Thread.CurrentThread.ManagedThreadId.Dump ("Current Thread ID");
_asyncLocalTest.Value = "test";
await Task.Delay (1000);
Thread.CurrentThread.ManagedThreadId.Dump ("Current Thread ID");
Console.WriteLine (_asyncLocalTest.Value);
}
AsyncLocal - concurrent
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();
void Main()
{
new Thread (() => Test ("one")).Start();
new Thread (() => Test ("two")).Start();
}
async void Test (string value)
{
_asyncLocalTest.Value = value;
await Task.Delay (1000);
Console.WriteLine (value + " " + _asyncLocalTest.Value);
}
AsyncLocal - inherited value
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();
void Main()
{
_asyncLocalTest.Value = "test";
new Thread (AnotherMethod).Start();
}
void AnotherMethod() => Console.WriteLine (_asyncLocalTest.Value); // test
AsyncLocal - inherited value - copy
static AsyncLocal<string> _asyncLocalTest = new AsyncLocal<string>();
void Main()
{
_asyncLocalTest.Value = "test";
var t = new Thread (AnotherMethod);
t.Start(); t.Join();
Console.WriteLine (_asyncLocalTest.Value); // test (not ha-ha!)
}
void AnotherMethod() => _asyncLocalTest.Value = "ha-ha!";
AsyncLocal - inherited value - copy - limitation
static AsyncLocal<StringBuilder> _asyncLocalTest = new AsyncLocal<StringBuilder>();
void Main()
{
_asyncLocalTest.Value = new StringBuilder ("test");
var t = new Thread (AnotherMethod);
t.Start(); t.Join();
Console.WriteLine (_asyncLocalTest.Value.ToString()); // test haha!
}
void AnotherMethod() => _asyncLocalTest.Value.Append (" ha-ha!");
Timers
Asynchronous looping
Util.NewProcess = true; // Tell LINQPad not to recycle the process
StartPeriodicOperation();
async void StartPeriodicOperation()
{
while (true)
{
await Task.Delay (1000);
Console.WriteLine ("Tick"); // Do some action
}
}
PeriodicTimer
Util.NewProcess = true; // Tell LINQPad not to recycle the process
var timer = new PeriodicTimer (TimeSpan.FromSeconds (1));
StartPeriodicOperation();
async void StartPeriodicOperation()
{
while (await timer.WaitForNextTickAsync())
Console.WriteLine ("Tick"); // Do some action
}
PeriodicTimer - with disposal
var timer = new PeriodicTimer (TimeSpan.FromSeconds (1));
StartPeriodicOperation();
async void StartPeriodicOperation()
{
while (await timer.WaitForNextTickAsync())
Console.WriteLine ("Tick"); // Do some action
}
Console.WriteLine ("Press enter to stop");
Console.ReadLine();
timer.Dispose();
Multithreaded timers - Threading Timer
// First interval = 5000ms; subsequent intervals = 1000ms
Timer tmr = new Timer (Tick, "tick...", 5000, 1000);
Console.WriteLine ("Press Enter to stop");
Console.ReadLine();
tmr.Dispose(); // This both stops the timer and cleans up.
void Tick (object data)
{
// This runs on a pooled thread
Console.WriteLine (data); // Writes "tick..."
}
Multithreaded timers - System.Timer
var tmr = new System.Timers.Timer(); // Doesn't require any args
tmr.Interval = 500;
tmr.Elapsed += tmr_Elapsed; // Uses an event instead of a delegate
tmr.Start(); // Start the timer
Console.ReadLine();
tmr.Stop(); // Stop the timer
Console.ReadLine();
tmr.Start(); // Restart the timer
Console.ReadLine();
tmr.Dispose(); // Permanently stop the timer
static void tmr_Elapsed (object sender, EventArgs e)
{
Console.WriteLine ("Tick");
}
EXTRA - Wait and Pulse
Signaling with Wait and Pulse
// See http://www.albahari.com/threading/part4.aspx ("Signaling with Wait and Pulse") for the accompanying text.
static readonly object _locker = new object();
static bool _go;
static void Main()
{ // The new thread will block because _go==false.
new Thread (Work).Start();
Console.WriteLine ("Press Enter to signal");
Console.ReadLine(); // Wait for user to hit Enter
lock (_locker) // Let's now wake up the thread by
{ // setting _go=true and pulsing.
_go = true;
Monitor.Pulse (_locker);
}
}
static void Work()
{
lock (_locker)
while (!_go)
Monitor.Wait (_locker); // Lock is released while we’re waiting
Console.WriteLine ("Woken!!!");
}
Now not to use Wait and Pulse
// Non-deterministic!
static readonly object _locker = new object();
static void Main()
{
new Thread (Work).Start();
lock (_locker) Monitor.Pulse (_locker);
}
static void Work()
{
lock (_locker) Monitor.Wait (_locker);
Console.WriteLine ("Woken!!!");
}
Producer-consumer queue
static void Main()
{
PCQueue q = new PCQueue (2);
Console.WriteLine ("Enqueuing 10 items...");
for (int i = 0; i < 10; i++)
{
int itemNumber = i; // To avoid the captured variable trap
q.EnqueueItem (() =>
{
Thread.Sleep (1000); // Simulate time-consuming work
Console.Write (" Task" + itemNumber);
});
}
q.Shutdown (true);
Console.WriteLine();
Console.WriteLine ("Workers complete!");
}
public class PCQueue
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action> _itemQ = new Queue<Action>();
public PCQueue (int workerCount)
{
_workers = new Thread [workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(_workers [i] = new Thread (Consume)).Start();
}
public void Shutdown (bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem (null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem (Action item)
{
lock (_locker)
{
_itemQ.Enqueue (item); // We must pulse because we're
Monitor.Pulse (_locker); // changing a blocking condition.
}
}
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise.
Action item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait (_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(); // Execute item.
}
}
}
Two-way signaling and races
static readonly object _locker = new object();
static bool _go;
static void Main()
{
new Thread (SaySomething).Start();
for (int i = 0; i < 5; i++)
lock (_locker)
{
_go = true;
Monitor.PulseAll (_locker); }
}
static void SaySomething()
{
for (int i = 0; i < 5; i++)
lock (_locker)
{
while (!_go) Monitor.Wait (_locker);
_go = false;
Console.WriteLine ("Wassup?");
}
}
Two-way signaling and races - solution
static readonly object _locker = new object();
static bool _ready, _go;
static void Main()
{
new Thread (SaySomething).Start();
for (int i = 0; i < 5; i++)
lock (_locker)
{
while (!_ready) Monitor.Wait (_locker);
_ready = false;
_go = true;
Monitor.PulseAll (_locker);
}
}
static void SaySomething()
{
for (int i = 0; i < 5; i++)
lock (_locker)
{
_ready = true;
Monitor.PulseAll (_locker); // Remember that calling
while (!_go) Monitor.Wait (_locker); // Monitor.Wait releases
_go = false; // and reacquires the lock.
Console.WriteLine ("Wassup?");
}
}
Simulating a ManualResetEvent
void Main()
{
new Thread (() => { Thread.Sleep (2000); Set(); }).Start();
Console.WriteLine ("Waiting...");
WaitOne();
Console.WriteLine ("Signaled");
}
readonly object _locker = new object();
bool _signal;
void WaitOne()
{
lock (_locker)
{
while (!_signal) Monitor.Wait (_locker);
}
}
void Set()
{
lock (_locker) { _signal = true; Monitor.PulseAll (_locker); }
}
void Reset() { lock (_locker) _signal = false; }
Writing a CountdownEvent
void Main()
{
var cd = new Countdown(5);
new Thread (() =>
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(1000);
cd.Signal();
Console.WriteLine ("Signal " + i);
}
}).Start();
Console.WriteLine ("Waiting");
cd.Wait();
Console.WriteLine ("Unblocked");
}
public class Countdown
{
object _locker = new object ();
int _value;
public Countdown() { }
public Countdown (int initialCount) { _value = initialCount; }
public void Signal() { AddCount (-1); }
public void AddCount (int amount)
{
lock (_locker)
{
_value += amount;
if (_value <= 0) Monitor.PulseAll (_locker);
}
}
public void Wait()
{
lock (_locker)
while (_value > 0)
Monitor.Wait (_locker);
}
}
Thread rendezvous
static object _locker = new object();
static CountdownEvent _countdown = new CountdownEvent(2);
public static void Main()
{
// Get each thread to sleep a random amount of time.
Random r = new Random();
new Thread (Mate).Start (r.Next (10000));
Thread.Sleep (r.Next (10000));
_countdown.Signal();
_countdown.Wait();
Console.Write ("Mate! ");
}
static void Mate (object delay)
{
Thread.Sleep ((int) delay);
_countdown.Signal();
_countdown.Wait();
Console.Write ("Mate! ");
}