Introduction
In part one, I explained the role of SynchronizationContext
within the .NET framework 2.0. It is mostly used to allow threads to communicate with the UI thread. We have learned in part one that the SynchronizationContext
by itself does nothing to marshal code between threads; in fact, this class should be an abstract class. The .NET framework provided us a version of this class to marshal code into the UI thread, but what about coding your own version of SynchronizationContext
to do something else? Sounds like a daunting task, but it is really not that bad. I will try to show in this article how to code your own SynchronizationContext
for marshaling code from any thread into an STA thread. In part III of the series, I will leverage this class within the WCF framework. But, before we start running, we must learn to walk. So, let's start.
Switching work into STA threads
You might be wondering why I am coding this in the first place. STA is a threading model that is used by COM. A long time ago, before the age of .NET Remoting and WCF (sounds like hundred years ago), developers coded COM classes that could only run on Single Apartment Threads (STA). Why? Because the COM runtime would handle thread marshaling for the developer, and always made sure that your COM class would execute on the same thread. This way, the COM developer did not need to worry about multi-threading, Mutexes, Semaphores, Events, and all the other multi-threading toys out there. Just for the record, COM also provided a Multi Threading Apartment model (for the brave ones). With MTA, the developer had to worry about multi-threading issues, but had more control. There is a lot of documentation out there about MTA and STA, all you have to do is Google "STA MTA" and read all the history about it. Thank God, I don't need to code COM anymore. However, there is a lot of COM out there, and within our company, a lot of business logic is coded into COM classes that can only execute on an STA thread. To be able to call these classes from any thread in .NET, I decided to code a custom STA thread synchronization context. After all the sweat and work put into it, it felt right to share it with you, hoping some poor developer out there might find it useful. Although this article explains how to marshal code into an STA thread, you can take the information in this article and have your own sync context do something else.
How do we switch between two threads?
The first question is how would we manage marshaling between two running threads. This problem is typically solved by implementing some sort of a common communication block that both threads can read and write from. An ideal communication object between two threads is a queue. A queue provides us the ability to send work from one thread to another based on the invocation order. This is the typical Consumer / Provider model, where one thread plays the role of a consumer (reading from the queue), and another thread plays the role of a provider (writing items to the queue). To simplify things, let's see how this might work:
- Thread 1: Sends a message to a common queue.
- Thread 2: Listens for incoming messages from the common queue. In our case, Thread 2 will be an STA thread.
A closer look at the blocking queue
I wanted to have a queue to queue up work items from thread X to my STA thread. I also wanted my thread to dequeue items only when there are items in the queue. If there are no items, I want the Dequeue
method to wait until something pops into the queue. Typically, this is called a "Blocking Queue". Let's see the code:
internal interface IQueueReader<t> : IDisposable
{
T Dequeue();
void ReleaseReader();
}
internal interface IQueueWriter<t> : IDisposable
{
void Enqueue(T data);
}
internal class BlockingQueue<t> : IQueueReader<t>,
IQueueWriter<t>, IDisposable
{
// use a .NET queue to store the data
private Queue<t> mQueue = new Queue<t>();
// create a semaphore that contains the items in the queue as resources.
// initialize the semaphore to zero available resources (empty queue).
private Semaphore mSemaphore = new Semaphore(0, int.MaxValue);
// a event that gets triggered when the reader thread is exiting
private ManualResetEvent mKillThread = new ManualResetEvent(false);
// wait handles that are used to unblock a Dequeue operation.
// Either when there is an item in the queue
// or when the reader thread is exiting.
private WaitHandle[] mWaitHandles;
public BlockingQueue()
{
mWaitHandles = new WaitHandle[2] { mSemaphore, mKillThread };
}
public void Enqueue(T data)
{
lock (mQueue) mQueue.Enqueue(data);
// add an available resource to the semaphore,
// because we just put an item
// into the queue.
mSemaphore.Release();
}
public T Dequeue()
{
// wait until there is an item in the queue
WaitHandle.WaitAny(mWaitHandles);
lock (mQueue)
{
if (mQueue.Count > 0)
return mQueue.Dequeue();
}
return default(T);
}
public void ReleaseReader()
{
mKillThread.Set();
}
void IDisposable.Dispose()
{
if (mSemaphore != null)
{
mSemaphore.Close();
mQueue.Clear();
mSemaphore = null;
}
}
}
- Because this queue is used by multiple threads, notice that I am blocking access to the queue using the
lock
statement. - Normally, I block the
Dequeue
method until there is an item in the queue. The way this works is by having a semaphore that represents all the items in the queue as resources. When the semaphore is created for the first time, the queue is empty, and therefore there are no resources available, so callingDequeue
will block (notice there is zero at the first argument indicating no available resources, and a large number for the second argument representing the size of the queue).private Semaphore mSemaphore = new Semaphore(0, int.MaxValue);
- Notice that when I dequeue an item, I block on an array of
WaitHandle
s (WaitHandle.WaitAny(mWaitHandles);)
. This code means "Wait until there is a message, or until the read thread is marked to stop running." - I have not shown the actual thread yet, I will show it next. However, the STA thread will be the reading thread, spending most of its time waiting for a message on the queue or processing a message from the queue.
- Notice that when a message is enqueued into the queue, it releases the semaphore, indicating that a resource is available; this will cause the
Dequeue
method to unblock. - The semaphore has a max limit of
Int.Max
; we should never reach anything close to this limit as long as the thread is dequeue-ing more of less as fast as it is enqueue-ing.
The SendOrPostCallbackItem class
Notice that the blocking queue class is generic, this was done in case I decide to re-use it in another application (and you are free to use it for your needs as well). So, what are we planning to put into this queue? Considering this queue is responsible to marshal code from one thread to another, the ideal item to queue is a delegate. Still, we need a little more than a delegate, and not just a simple delegate, but a SendOrPostCallback
delegate.
internal enum ExecutionType
{
Post,
Send
}
internal class SendOrPostCallbackItem
{
object mState;
private ExecutionType mExeType;
SendOrPostCallback mMethod;
ManualResetEvent mAsyncWaitHandle = new ManualResetEvent(false);
Exception mException = null;
internal SendOrPostCallbackItem(SendOrPostCallback callback,
object state, ExecutionType type)
{
mMethod = callback;
mState = state;
mExeType = type;
}
internal Exception Exception
{
get { return mException; }
}
internal bool ExecutedWithException
{
get { return mException != null; }
}
// this code must run ont the STA thread
internal void Execute()
{
if (mExeType == ExecutionType.Send)
Send();
else
Post();
}
// calling thread will block until mAsyncWaitHandle is set
internal void Send()
{
try
{
// call the thread
mMethod(mState);
}
catch (Exception e)
{
mException = e;
}
finally
{
mAsyncWaitHandle.Set();
}
}
/// <summary />
/// Unhandled exceptions will terminate the STA thread
/// </summary />
internal void Post()
{
mMethod(mState);
}
internal WaitHandle ExecutionCompleteWaitHandle
{
get { return mAsyncWaitHandle; }
}
}
SendOrPostCallbackItem
contains the delegate we wish to execute on the STA thread.- The
Send
andPost
are really helper methods, they are both responsible for launching the code, and they are both designed to be called from the STA thread. However, because theSend
is required to block, and to report exceptions back to the calling thread (non-STA thread), I use aManualResentEvent
when the execution is complete. I also keep track of the exception; if there is one, it will be thrown on the non-STA thread (producer thread). Post
is simple. It just calls the method, no need to notify when it is done, and there is no need to track the exception either.
Overall, this class is responsible for two main tasks. Storing the delegate to execute, and executing it in two possible modes: Send
and Post
. Send
requires additional tracking (such as the exception and notification of complication). Post
just executes the method without doing anything else. Normally, if Post
is executed on the STA thread, any exceptions reported by the delegate will cause the thread to end. I will explain this more in part III of the article when I introduce WCF into the mix. But for now, just keep this issue in mind
The STA thread and all its glory
Finally, we can show and explain the meat and potatoes of this sync context. Now that we have a queue, and we know what we are planning to push into it, let's look at the STA thread (the thread responsible for marshaling code).
internal class StaThread
{
private Thread mStaThread;
private IQueueReader<sendorpostcallbackitem> mQueueConsumer;
private ManualResetEvent mStopEvent = new ManualResetEvent(false);
internal StaThread(IQueueReader<sendorpostcallbackitem> reader)
{
mQueueConsumer = reader;
mStaThread = new Thread(Run);
mStaThread.Name = "STA Worker Thread";
mStaThread.SetApartmentState(ApartmentState.STA);
}
internal void Start()
{
mStaThread.Start();
}
internal void Join()
{
mStaThread.Join();
}
private void Run()
{
while (true)
{
bool stop = mStopEvent.WaitOne(0);
if (stop)
{
break;
}
SendOrPostCallbackItem workItem = mQueueConsumer.Dequeue();
if (workItem != null)
workItem.Execute();
}
}
internal void Stop()
{
mStopEvent.Set();
mQueueConsumer.ReleaseReader();
mStaThread.Join();
mQueueConsumer.Dispose();
}
}
One of the most important parts of this class is in the constructor, so let's take a look at it again.
internal StaThread(IQueueReader<sendorpostcallbackitem> reader)
{
mQueueConsumer = reader;
mStaThread = new Thread(Run);
mStaThread.Name = "STA Worker Thread";
mStaThread.SetApartmentState(ApartmentState.STA);
}
- This class takes an interface of type
IQueueReader
, this is really our blocking queue. The reason I decided to put an interface here is because this thread is a reading thread and should not have access to writing methods. - The thread is being setup as an STA thread. Giving the thread a name helps when debugging using the thread output window.
- Notice, the thread is not started yet. A method called
Start
will start the thread, and this will happen within ourStaSynchronizationContext
class, which I will show soon.
Let's take a look at the Run
method. The Run
method represents our STA thread. Its main job is to dequeue items from our blocking queue and execute them. Executing any work items on the Run
method means executing them on the STA thread. Therefore, it doesn't really matter which thread has placed them in the queue, what's important is that items are read within the STA thread, and executed in the STA thread. If you think about this, this is, in fact, thread marshalling in action.
private void Run()
{
while (true)
{
bool stop = mStopEvent.WaitOne(0);
if (stop)
{
break;
}
SendOrPostCallbackItem workItem = mQueueConsumer.Dequeue();
if (workItem != null)
workItem.Execute();
}
}
I tried to keep the Run
method as simple as possible, but let's stress out a few points.
- The STA thread is running all the time, so I have made a
while(true)
loop. Normally, I am not a fan of this type of loop, but I wanted the reader of the code to understand that this thread is not supposed to go down unless the context class is disposed. Awhile(true)
sends this type of a message. mStopEvent
is aManualResetEvent
, it is signaled when the STA thread is marked to stop running. When theStop()
method is called, themStopEvent
is set, causing the main loop to exit. TheStop
method also releases any waiting dequeue operation by marking the queue to stop processing messages.mQueueConsumer.Dequeue()
is responsible for reading work items from the queue. This method will block until a work item is in the queue.- When a work-item is dequeued, the work-item is executed.
Execute()
, if you remember, will execute the code in the delegate associated with the work item. It is during thisExecute
method that the code is marshaled on the STA thread.
Creating the STA synchronization context class
We have almost all the pieces we need to have our STA Sync Context running. We got a work item that contains our delegate to execute on the STA thread. We got a nice little blocking queue to handle communication between the STA thread and any other thread. We even have our little STA Run
method always looking at our queue, pumping messages out of it, and running any work items that are fetched. The only thing we are missing now is the actual Synchronization Context class itself. So, let's see it, and go over the code in detail...
StaSynchronizationContext.cs
public class StaSynchronizationContext : SynchronizationContext, IDisposable
{
private BlockingQueue<sendorpostcallbackitem > mQueue;
private StaThread mStaThread;
public StaSynchronizationContext()
: base()
{
mQueue = new BlockingQueue<sendorpostcallbackitem />();
mStaThread = new StaThread(mQueue);
mStaThread.Start();
}
public override void Send(SendOrPostCallback d, object state)
{
// create an item for execution
SendOrPostCallbackItem item = new SendOrPostCallbackItem(d, state,
ExecutionType.Send);
// queue the item
mQueue.Enqueue(item);
// wait for the item execution to end
item.ExecutionCompleteWaitHandle.WaitOne();
// if there was an exception, throw it on the caller thread, not the
// sta thread.
if (item.ExecutedWithException)
throw item.Exception;
}
public override void Post(SendOrPostCallback d, object state)
{
// queue the item and don't wait for its execution. This is risky because
// an unhandled exception will terminate the STA thread. Use with caution.
SendOrPostCallbackItem item = new SendOrPostCallbackItem(d, state,
ExecutionType.Post);
mQueue.Enqueue(item);
}
public void Dispose()
{
mStaThread.Stop();
}
public override SynchronizationContext CreateCopy()
{
return this;
}
}
This is really the class that uses all the other classes I have shown before. I have named it the StaSynchronizationContext
because it is responsible to marshal code into an STA thread, allowing the caller to execute COM APIs that must be on an STA thread. Let's look a the Send API which is responsible for sending work on the STA thread. Notice, this class inherits from SynchronizationContext
, but overrides the default Send
and Post
methods.
public override void Send(SendOrPostCallback d, object state)
{
// create an item for execution
SendOrPostCallbackItem item =
new SendOrPostCallbackItem(d, state, ExecutionType.Send);
// queue the item
mQueue.Enqueue(item);
// wait for the item execution to end
item.ExecutionCompleteWaitHandle.WaitOne();
// if there was an exception, throw it on the caller thread, not the
// sta thread.
if (item.ExecutedWithException)
throw item.Exception;
}
Notice that the send operation is a blocking operation, this means we block until the operation on the STA thread is complete. Remember, we have placed a ManualReset
event on the SendOrPostCallbackItem
class, so we know when the execution is done. We are also trapping and caching any exceptions within SendOrPostCallbackItem
so we can throw them on the calling thread and not on the STA thread. The Post
, on the other hand, is not a waiting call, so all we do is queue the item and we are not waiting for the delegate execution to finish.
That's it, we now have a SynchronizationContext
that will marshal code between any thread into a single STA thread. In my case, this will allow me to execute COM APIs within my STA thread, so COM classes can feel at home, as if they are running in VB6. To actually test this class, I have created a test program; here is the code:
public class Params
{
public string Output {get; set;}
public int CallCounter { get; set; }
public int OriginalThread { get; set; }
}
class Program
{
private static int mCount = 0;
private static StaSynchronizationContext mStaSyncContext = null;
static void Main(string[] args)
{
mStaSyncContext = new StaSynchronizationContext();
for (int i = 0; i < 100; i++)
{
ThreadPool.QueueUserWorkItem(NonStaThread);
}
Console.WriteLine("Processing");
Console.WriteLine("Press any key to dispose SyncContext");
Console.ReadLine();
mStaSyncContext.Dispose();
}
private static void NonStaThread(object state)
{
int id = Thread.CurrentThread.ManagedThreadId;
for (int i = 0; i < 10; i++)
{
var param = new Params { OriginalThread = id, CallCounter = i };
mStaSyncContext.Send(RunOnStaThread, param);
Debug.Assert(param.Output == "Processed", "Unexpected behavior by STA thread");
}
}
private static void RunOnStaThread(object state)
{
mCount++;
Console.WriteLine(mCount);
int id = Thread.CurrentThread.ManagedThreadId;
var args = (Params)state;
Trace.WriteLine("STA id " + id + " original thread " +
args.OriginalThread + " call count " + args.CallCounter);
args.Output = "Processed";
}
}
The test program will create a number of threads using the thread pool. These thread pool threads then use the StaSynchronizationContext
to execute the code within the method RunOnStaThread
. Running this test program, I have blasted the STA thread from 100 .NET threads, each thread marshaling the code RunOnStaThread
10 times. Notice that the STA thread ID will always be the same. Notice the results below, the STA thread is always 11, and calls are coming from multiple threads (I have shrunk the output).
STA id 11 original thread 7 call count 0
STA id 11 original thread 12 call count 0
STA id 11 original thread 7 call count 1
STA id 11 original thread 12 call count 1
STA id 11 original thread 7 call count 2
STA id 11 original thread 12 call count 2
STA id 11 original thread 7 call count 3
STA id 11 original thread 12 call count 3
STA id 11 original thread 7 call count 4
STA id 11 original thread 12 call count 4
STA id 11 original thread 7 call count 5
STA id 11 original thread 12 call count 5
STA id 11 original thread 7 call count 6
STA id 11 original thread 12 call count 6
STA id 11 original thread 7 call count 7
STA id 11 original thread 12 call count 7
STA id 11 original thread 7 call count 8
STA id 11 original thread 12 call count 8
STA id 11 original thread 7 call count 9
STA id 11 original thread 12 call count 9
STA id 11 original thread 7 call count 0
STA id 11 original thread 12 call count 0
STA id 11 original thread 7 call count 1
STA id 11 original thread 12 call count 1
STA id 11 original thread 7 call count 2
STA id 11 original thread 12 call count 2
STA id 11 original thread 7 call count 3
STA id 11 original thread 12 call count 3
STA id 11 original thread 7 call count 4
STA id 11 original thread 12 call count 4
STA id 11 original thread 7 call count 5
STA id 11 original thread 12 call count 5
STA id 11 original thread 7 call count 6
STA id 11 original thread 12 call count 6
STA id 11 original thread 7 call count 7
STA id 11 original thread 12 call count 7
STA id 11 original thread 7 call count 8
STA id 11 original thread 12 call count 8
STA id 11 original thread 7 call count 9
STA id 11 original thread 12 call count 9
STA id 11 original thread 7 call count 0
STA id 11 original thread 12 call count 0
STA id 11 original thread 7 call count 1
STA id 11 original thread 12 call count 1
STA id 11 original thread 7 call count 2
STA id 11 original thread 12 call count 2
STA id 11 original thread 7 call count 3
STA id 11 original thread 12 call count 3
STA id 11 original thread 7 call count 4
STA id 11 original thread 12 call count 4
STA id 11 original thread 7 call count 5
STA id 11 original thread 12 call count 5
STA id 11 original thread 7 call count 6
STA id 11 original thread 12 call count 6
Wait a minute, I can do the same thing without using a SynchronizationContext, so why bother?
True, there is really no need to build a synchronization context to marshal code into another thread. I could just use my queue and the STA thread to do all the work directly within my code. It's nice that I am playing by the rules of .NET, and I am providing my own implementation, so others can use it, but it is not really required. So, why bother? The reason is because of WCF. WCF allows you to provide a SynchronizationContext
for the execution of operations within a service. This is a very powerful feature of WCF that is normally used to marshal WCF service calls into the UI thread. However, in my case, because I played nice and I created a SynchronizationContext
class and not just any class, I can now tell WCF to execute all my service methods on an STA thread. This was the main reason for writing the series on this class.
Conclusion
This article shows that you can create your own version of a the SynchronizationContext
class. I have shown that a Blocking Queue is a good candidate for communication between threads. We successfully marshaled code from any thread into a single STA thread. Now that we have our Synchronization Context class, we can put it to work within a WCF service. This will be the main focus of the third part of the series. Once you understand how to build your own Synchronization Context and apply it to a WCF service, you are in control of "where" your code is running when a WCF method is executed - a very powerful feature within a very powerful WCF framework.
For those of you that want the code, I will provide a full VS2008 solution in the third part of this article series.
No comments:
Post a Comment