The Aggregator Service Orchestration
Background
Think of this implementation as a BizTalk orchestration imitating the functionality of a Windows Service i.e. you have the ability to start, interrupt and stop the orchestration, otherwise like a service it keeps on running (when not processing BizTalk dehydrates it). The orchestration asynchronously spawns other orchestrations and waits for call back from them.
We used this implementation to aggregate results from multiple instances of (asynchronously spawned) orchestrations each running their own long running transactions which could last for a number of days. The input messages were split up into different types by Worker orchestrations (started asynchronously) each record being individually processed. The Worker orchestrations then called back the Aggregator Service orchestration which aggregated results into different message types before pushing the final result aggregated messages out at scheduled times during the day.
Other implementations of this pattern are out there but so far I’ve found none as simple and elegant as the one shown here.
Notes
• To keep things simple I have not shown any implementation of how to actually aggregate messages to learn this I suggest you google “BizTalk aggregator pattern”. There are a few implementations out there. I’m hounding Anil to blog his latest implementation of this pattern which apparently performs better than the ones currently blogged.
• Orphans & Zombies!!! If you terminate the Aggregator Service you may end up with orphaned orchestrations and zombie messages which it started asynchronously but have not called back to the service yet.
• Terminating the asynchronously spawned orchestrations will not hinder the running of the Aggregator Service but obviously you will loose the message data that that orchestration is processing.
• I wanted to have some fun so re-wrote the sample in BizTalk 2006 Beta1 but works fine in 2004.
• As you will see I have just provided a simple example of the pattern. I have not included the implementation of scopes, compensation and exception handling or aggregation of the messages.
• The example I have given here is for aggregating messages however there are many other uses.
Implementation
So lets start with a few pictures and then I’ll explain how it all works.
Diagram 1: The AggregatorService orchestration
Diagram 2: The Worker orchestration
The AggregatorService orchestration looks complex at first so we’ll divide it into two parts to make the explanation easier. The first part is starting up the aggregator service and asynchronously spawning the worker orchestrations shown in the following diagram look familiar? It’s basically a sequential parallel convoy that kinda loops back on itself. I’ll go through in order shape by shape in the following table; I’ve left out the group shapes as these are basically comments.
Diagram 3: Start service and asynchronously spawn worker orchestration
Shape | Purpose |
Receive (Start Command) | Start up the service like a windows service you start only one instance of this orchestration up and leave it running continuously |
Send (Start command response) | Send back a response that the server has been started but the main purpose of this send is to initialize a correlation set which is used by the following two Receive shapes in the convoy |
Parallel actions left branch | The process you can see implement by the left hand side of the parallel action shape is to asynchronously spawn another orchestration or orchestrations to do some work with an incoming message (the input) |
Loop | This is an infinite loop which continues looping until the orchestration is terminated in HAT or a Stop command message is received on the 2nd branch of the Listen shape |
Listen | Listens for an input data message to do some processing on or a Stop command to terminate the orchestration “service“ |
Receive (Left listen branch) (Input data) | Receive a data input |
Start Orchestration (Left listen branch) | Asynchronously start the worker orchestration passing it a self correlated port and the message data input. The self correlated port acts like a delegate for the Worker orchestration to call the AggregatorService orchestration back on with a result which may be aggregated |
Receive (Right listen branch) (Stop command) | Receives a command to stop the orchestration |
Terminate (Right listen branch) | Terminate orchestration see Orphans and Zombies in notes above |
The second part of the AggregatorService orchestration is concerned with receiving call backs containing results to be aggregated from the Worker orchestration anytime in the future when the worker has finished processing could be seconds to weeks. Again I’ll go through the shapes one by one
Diagram 4: Receive asynchronous call backs and aggregate messages
Shape | Purpose |
Parallel actions right branch | The purpose of this branch is to receive the processed messages from the callbacks and aggregate those messages |
Construct message | Construct an empty message to aggregate into this is an initialization step |
Message Assignment | Using a DOM document load the message seed XML (usually this is the root node without any record data) and assign the DOM document to the aggregated message variable |
Loop | This is an infinite loop which continues looping until the orchestration is terminated in HAT or a Stop command message is received on the 2nd branch of the Listen shape |
Listen | We are either waiting for a call back message from the Worker orchestration or if we haven’t received anything for 1 minute push out the aggregated message and reinitialize it.
Yes I know the logic isn’t full proof here but it’s just for the sake of example. It’s easy to implement better scheduling logic i.e. put a test to see if the scheduled time has passed in the left branch of the listen shape and then send and reinitialize the aggregated message. |
Receive (Left listen branch) (Call back) | Receive the processed data via a callback from the self correlated port on the Worker orchestration |
Construct message (Left listen branch) | Construct a temp message and aggregate the current aggregated message with the processed data we have just received |
Transform message (Left listen branch) | Map using inline xslt to aggregate the two messages |
Construct message (Left listen branch) | Overwrite the current aggregate message with the temp message |
Transform message (Left listen branch) | Simple message assignment |
Delay (Right listen branch) | wait for 1 minute |
Decide (Left branch, right listen branch) | Test to see if there are any records in the aggregated message no point sending out an empty message |
Send (Left decide branch, right listen branch) | Send out the aggregated message |
Construct message (Left decide branch, right listen branch) | Construct an empty message to aggregate into this is an initialization step |
Message Assignment (Left decide branch, right listen branch) | Using a DOM document load the message seed XML (usually this is the root node without any record data) and assign the DOM document to the aggregated message variable |
Just briefly all my Worker orchestration does is simulate some processing of the input data and wait for 10 seconds before calling back on the self correlated port which is one of its arguments.
R. Addis