The Aggregator Service Orchestration



Background


Think of this implementation as a BizTalk orchestration imitating the functionality of a Windows Service i.e. you have the ability to start, interrupt and stop the orchestration, otherwise like a service it keeps on running (when not processing BizTalk dehydrates it). The orchestration asynchronously spawns other orchestrations and waits for call back from them.


We used this implementation to aggregate results from multiple instances of (asynchronously spawned) orchestrations each running their own long running transactions which could last for a number of days. The input messages were split up into different types by Worker orchestrations (started asynchronously) each record being individually processed. The Worker orchestrations then called back the Aggregator Service orchestration which aggregated results into different message types before pushing the final result aggregated messages out at scheduled times during the day.


Other implementations of this pattern are out there but so far I’ve found none as simple and elegant as the one shown here.



Notes


              To keep things simple I have not shown any implementation of how to actually aggregate messages to learn this I suggest you google “BizTalk aggregator pattern”. There are a few implementations out there. I’m hounding Anil to blog his latest implementation of this pattern which apparently performs better than the ones currently blogged.


              Orphans & Zombies!!! If you terminate the Aggregator Service you may end up with orphaned orchestrations and zombie messages which it started asynchronously but have not called back to the service yet.


              Terminating the asynchronously spawned orchestrations will not hinder the running of the Aggregator Service but obviously you will loose the message data that that orchestration is processing.


              I wanted to have some fun so re-wrote the sample in BizTalk 2006 Beta1 but works fine in 2004.


              As you will see I have just provided a simple example of the pattern. I have not included the implementation of scopes, compensation and exception handling or aggregation of the messages.


              The example I have given here is for aggregating messages however there are many other uses.



Implementation


So lets start with a few pictures and then I’ll explain how it all works.


 


Diagram 1: The AggregatorService orchestration


 


Diagram 2: The Worker orchestration


The AggregatorService orchestration looks complex at first so we’ll divide it into two parts to make the explanation easier. The first part is starting up the aggregator service and asynchronously spawning the worker orchestrations shown in the following diagram look familiar? It’s basically a sequential parallel convoy that kinda loops back on itself. I’ll go through in order shape by shape in the following table; I’ve left out the group shapes as these are basically comments.


 


Diagram 3: Start service and asynchronously spawn worker orchestration


































 Shape


Purpose


Receive (Start Command)


Start up the service like a windows service you start only one instance of this orchestration up and leave it running continuously


Send (Start command response)


Send back a response that the server has been started but the main purpose of this send is to initialize a correlation set which is used by the following two Receive shapes in the convoy


Parallel actions left branch


The process you can see implement by the left hand side of the parallel action shape is to asynchronously spawn another orchestration or orchestrations to do some work with an incoming message (the input)


Loop


This is an infinite loop which continues looping until the orchestration is terminated in HAT or a Stop command message is received on the 2nd branch of the Listen shape


Listen


Listens for an input data message to do some processing on or a Stop command to terminate the orchestration “service“


Receive (Left listen branch) (Input data)


Receive a data input


Start Orchestration (Left listen branch)


Asynchronously start the worker orchestration passing it a self correlated port and the message data input. The self correlated port acts like a delegate for the Worker orchestration to call the AggregatorService orchestration back on with a result which may be aggregated


Receive (Right listen branch) (Stop command)


Receives a command to stop the orchestration


Terminate (Right listen branch)


Terminate orchestration see Orphans and Zombies in notes above


 


The second part of the AggregatorService orchestration is concerned with receiving call backs containing results to be aggregated from the Worker orchestration anytime in the future when the worker has finished processing could be seconds to weeks. Again I’ll go through the shapes one by one


 


Diagram 4: Receive asynchronous call backs and aggregate messages




















































 Shape


Purpose


Parallel actions right branch


The purpose of this branch is to receive the processed messages from the callbacks and aggregate those messages


Construct message


Construct an empty message to aggregate into this is an initialization step


Message Assignment


Using a DOM document load the message seed XML (usually this is the root node without any record data) and assign the DOM document to the aggregated message variable


Loop


This is an infinite loop which continues looping until the orchestration is terminated in HAT or a Stop command message is received on the 2nd branch of the Listen shape


Listen


We are either waiting for a call back message from the Worker orchestration or if we haven’t received anything for 1 minute push out the aggregated message and reinitialize it.


 


Yes I know the logic isn’t full proof here but it’s just for the sake of example. It’s easy to implement better scheduling logic i.e. put a test to see if the scheduled time has passed in the left branch of the listen shape and then send and reinitialize the aggregated message.


Receive (Left listen branch) (Call back)


Receive the processed data via a callback from the self correlated port on the Worker orchestration


Construct message (Left listen branch)


Construct a temp message and aggregate the current aggregated message with the processed data we have just received


Transform message (Left listen branch)


Map using inline xslt to aggregate the two messages


Construct message (Left listen branch)


Overwrite the current aggregate message with the temp message


Transform message (Left listen branch)


Simple message assignment


Delay (Right listen branch)


wait for 1 minute


Decide (Left branch, right listen branch)


Test to see if there are any records in the aggregated message no point sending out an empty message


Send (Left decide branch, right listen branch)


Send out the aggregated message


Construct message (Left decide branch, right listen branch)


Construct an empty message to aggregate into this is an initialization step


Message Assignment (Left decide branch, right listen branch)


Using a DOM document load the message seed XML (usually this is the root node without any record data) and assign the DOM document to the aggregated message variable


Just briefly all my Worker orchestration does is simulate some processing of the input data and wait for 10 seconds before calling back on the self correlated port which is one of its arguments.


R. Addis