Code Listings
Chapter 23: Asynchronous Methods
Asynchronator (Visual Studio Test Project)
Blocking sockets server
using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;
public class Server
public void Serve (IPAddress address, int port)
ThreadPool.SetMinThreads (50, 50); // Refer to Chapter 21
ThreadPool.SetMaxThreads (50, 50); // Refer to Chapter 21
TcpListener listener = new TcpListener (address, port);
while (true)
TcpClient c = listener.AcceptTcpClient();
ThreadPool.QueueUserWorkItem (Accept, c);
void Accept (object clientObject)
using (TcpClient client = (TcpClient) clientObject)
using (NetworkStream n = client.GetStream())
byte[] data = new byte [5000];
int bytesRead = 0; int chunkSize = 1;
while (bytesRead < data.Length && chunkSize > 0)
bytesRead +=
chunkSize = n.Read
(data, bytesRead, data.Length - bytesRead); // BLOCKS
Array.Reverse (data);
n.Write (data, 0, data.Length); // BLOCKS
Non-blocking sockets server
public class Server
public void Serve (IPAddress address, int port)
ThreadPool.SetMinThreads (50, 50);
TcpListener listener = new TcpListener (address, port);
while (true)
TcpClient c = listener.AcceptTcpClient();
ThreadPool.QueueUserWorkItem (ReverseEcho, c);
void ReverseEcho (object client)
new ReverseEcho().Begin ((TcpClient)client);
class ReverseEcho
TcpClient _client;
NetworkStream _stream;
byte[] _data = new byte [5000];
int _bytesRead = 0;
internal void Begin (TcpClient c)
_client = c;
_stream = c.GetStream();
catch (Exception ex) { ProcessException (ex); }
void Read() // Read in a nonblocking fashion.
while (true)
IAsyncResult r = _stream.BeginRead
(_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);
// This will nearly always return in the next line:
if (!r.CompletedSynchronously) return; // Handled by callback
if (!EndRead (r)) break;
void ReadCallback (IAsyncResult r)
if (r.CompletedSynchronously) return;
if (EndRead (r))
Read(); // More data to read!
catch (Exception ex) { ProcessException (ex); }
bool EndRead (IAsyncResult r) // Returns false if there’s no more data
int chunkSize = _stream.EndRead (r);
_bytesRead += chunkSize;
return chunkSize > 0 && _bytesRead < _data.Length; // More to read
void Write()
Array.Reverse (_data);
_stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
void WriteCallback (IAsyncResult r)
try { _stream.EndWrite (r); }
catch (Exception ex) { ProcessException (ex); }
void ProcessException (Exception ex)
Console.WriteLine ("Error: " + ex.Message);
void Cleanup()
if (_stream != null) _stream.Close();
if (_client != null) _client.Close();
Non-blocking server: with Tasks
public class Server
public void Serve (IPAddress address, int port)
ThreadPool.SetMinThreads (50, 50);
TcpListener listener = new TcpListener (address, port);
while (true)
TcpClient c = listener.AcceptTcpClient();
new ReverseEcho().BeginAsync (c);
class ReverseEcho
TcpClient _client;
NetworkStream _stream;
byte[] _data = new byte [5000];
int _bytesRead = 0;
internal void BeginAsync (TcpClient c)
_client = c;
_stream = c.GetStream();
var task = Task.Factory.StartNew (Read);
// Set up centralized error handling and cleanup:
task.ContinueWith (ant =>
Console.WriteLine ("Error: " + ant.Exception.Message),
task.ContinueWith (ant =>
if (_stream != null) _stream.Close();
if (_client != null) _client.Close();
void Read() // This will create a child task.
Task<int> readChunk = Task<int>.Factory.FromAsync (
_stream.BeginRead, _stream.EndRead,
_data, _bytesRead, _data.Length - _bytesRead, null,
readChunk.ContinueWith (Write, TaskContinuationOptions.NotOnFaulted
| TaskCreationOptions.AttachedToParent);
void Write (Task<int> readChunk)
_bytesRead += readChunk.Result;
if (readChunk.Result > 0 && _bytesRead < _data.Length)
Read(); // More data to read!
Array.Reverse (_data);
Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
_data, 0, _data.Length, null,
Async methods and iterators
public class Server
public void Serve (IPAddress address, int port)
ThreadPool.SetMinThreads (50, 50);
TcpListener listener = new TcpListener (address, port);
while (true)
TcpClient c = listener.AcceptTcpClient();
Task.Factory.Iterate (ReverseEcho(c)).ContinueWith (t =>
Console.WriteLine ("Error: " + t.Exception.Message),
IEnumerable<Task> ReverseEcho (TcpClient client)
using (client)
using (var stream = client.GetStream())
byte[] data = new byte[Program.MessageLength];
int bytesRead = 0;
while (true)
// ReadASync is an extension method in the samples.
Task<int> readChunk = stream.ReadAsync
(data, bytesRead, data.Length - bytesRead);
yield return readChunk;
bytesRead += readChunk.Result;
if (readChunk.Result <= 0 || bytesRead >= data.Length)
yield return stream.WriteAsync (data, 0, bytesRead);
Writing async methods
public class MessagingServices
public static IAsyncResult BeginReverseEcho (TcpClient client,
AsyncCallback callback,
object userState)
var re = new ReverseEcho();
re.Begin (client, callback, userState);
return re;
public static byte[] EndReverseEcho (IAsyncResult r)
return ((ReverseEcho)r).End();
class ReverseEcho : IAsyncResult
TcpClient _client;
NetworkStream _stream;
object _userState;
ManualResetEvent _waitHandle = new ManualResetEvent (false);
int _bytesRead = 0;
byte[] _data = new byte [5000];
Exception _exception;
internal ReverseEcho() { }
// IAsyncResult members:
public object AsyncState { get { return _userState; } }
public WaitHandle AsyncWaitHandle { get { return _waitHandle; } }
public bool CompletedSynchronously { get { return false; } }
public bool IsCompleted
get { return _waitHandle.WaitOne (0, false); }
internal void Begin (TcpClient c, AsyncCallback callback, object state)
_client = c;
_userState = state;
_stream = _client.GetStream();
Task.Factory.StartNew (Read).ContinueWith (ant =>
_exception = ant.Exception; // In case an exception occurred.
if (_stream != null)
try { _stream.Close(); }
catch (Exception ex) { _exception = ex; };
if (callback != null) callback (this);
internal byte[] End() // Wait for completion + rethrow any error.
if (_exception != null) throw _exception;
return _data;
void Read()
Task<int> readChunk = Task<int>.Factory.FromAsync (
_stream.BeginRead, _stream.EndRead,
_data, _bytesRead, _data.Length - _bytesRead, null);
readChunk.ContinueWith (ContinueRead,
| TaskContinuationOptions.AttachedToParent);
void ContinueRead (Task<int> readChunk)
_bytesRead += readChunk.Result;
if (readChunk.Result > 0 && _bytesRead < _data.Length)
Read(); // More data to read!
Array.Reverse (_data);
Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
_data, 0, _data.Length, null);