Windows Azure Service Bus Splitter and Aggregator

This article will cover basic implementations of the Splitter and Aggregator patterns using the Windows Azure Service Bus. The content will be included in the next release of the “Windows Azure Service Bus Developer Guide”, along with some other patterns I am working on.

I’ve taken the pattern descriptions from the book “Enterprise Integration Patterns” by Gregor Hohpe. I bought a copy of the book in 2004, and recently dusted it off when I started to look at implementing the patterns on the Windows Azure Service Bus. Gregor has also presented an session in 2011 “Enterprise Integration Patterns: Past, Present and Future” which is well worth a look.

I’ll be covering more patterns in the coming weeks, I’m currently working on Wire-Tap and Scatter-Gather. There will no doubt be a section on implementing these patterns in my “SOA, Connectivity and Integration using the Windows Azure Service Bus” course.

There are a number of scenarios where a message needs to be divided into a number of sub messages, and also where a number of sub messages need to be combined to form one message. The splitter and aggregator patterns provide a definition of how this can be achieved. This section will focus on the implementation of basic splitter and aggregator patens using the Windows Azure Service Bus direct programming model.

In BizTalk Server receive pipelines are typically used to implement the splitter patterns, with sequential convoy orchestrations often used to aggregate messages. In the current release of the Service Bus, there is no functionality in the direct programming model that implements these patterns, so it is up to the developer to implement them in the applications that send and receive messages.

Splitter

A message splitter takes a message and spits the message into a number of sub messages. As there are different scenarios for how a message can be split into sub messages, message splitters are implemented using different algorithms.

The Enterprise Integration Patterns book describes the splatter pattern as follows:

How can we process a message if it contains multiple elements, each of which may have to be processed in a different way?

Use a Splitter to break out the composite message into a series of individual messages, each containing data related to one item.

The Enterprise Integration Patterns website provides a description of the Splitter pattern here.

In some scenarios a batch message could be split into the sub messages that are contained in the batch. The splitting of a message could be based on the message type of sub-message, or the trading partner that the sub message is to be sent to.

Aggregator

An aggregator takes a stream or related messages and combines them together to form one message.

The Enterprise Integration Patterns book describes the aggregator pattern as follows:

How do we combine the results of individual, but related messages so that they can be processed as a whole?

Use a stateful filter, an Aggregator, to collect and store individual messages until a complete set of related messages has been received. Then, the Aggregator publishes a single message distilled from the individual messages.

The Enterprise Integration Patterns website provides a description of the Aggregator pattern here.

A common example of the need for an aggregator is in scenarios where a stream of messages needs to be combined into a daily batch to be sent to a legacy line-of-business application. The BizTalk Server EDI functionality provides support for batching messages in this way using a sequential convoy orchestration.

Scenario

The scenario for this implementation of the splitter and aggregator patterns is the sending and receiving of large messages using a Service Bus queue. In the current release, the Windows Azure Service Bus currently supports a maximum message size of 256 KB, with a maximum header size of 64 KB. This leaves a safe maximum body size of 192 KB.

The BrokeredMessage class will support messages larger than 256 KB; in fact the Size property is of type long, implying that very large messages may be supported at some point in the future. The 256 KB size restriction is set in the service bus components that are deployed in the Windows Azure data centers.

One of the ways of working around this size restriction is to split large messages into a sequence of smaller sub messages in the sending application, send them via a queue, and then reassemble them in the receiving application. This scenario will be used to demonstrate the pattern implementations.

Implementation

The splitter and aggregator will be used to provide functionality to send and receive large messages over the Windows Azure Service Bus. In order to make the implementations generic and reusable they will be implemented as a class library. The splitter will be implemented in the LargeMessageSender class and the aggregator in the LargeMessageReceiver class. A class diagram showing the two classes is shown below.

Implementing the Splitter

The splitter will take a large brokered message, and split the messages into a sequence of smaller sub-messages that can be transmitted over the service bus messaging entities. The LargeMessageSender class provides a Send method that takes a large brokered message as a parameter. The implementation of the class is shown below; console output has been added to provide details of the splitting operation.

public class LargeMessageSender

{

private static int SubMessageBodySize = 192 * 1024;

private QueueClient m_QueueClient;

public LargeMessageSender(QueueClient queueClient)

{

m_QueueClient = queueClient;

}

public void Send(BrokeredMessage message)

{

// Calculate the number of sub messages required.

long messageBodySize = message.Size;

int nrSubMessages = (int)(messageBodySize / SubMessageBodySize);

if (messageBodySize % SubMessageBodySize != 0)

{

nrSubMessages++;

}

// Create a unique session Id.

string sessionId = Guid.NewGuid().ToString();

Console.WriteLine(“Message session Id: “ + sessionId);

Console.Write(“Sending {0} sub-messages”, nrSubMessages);

Stream bodyStream = message.GetBody<Stream>();

for (int streamOffest = 0; streamOffest < messageBodySize;

streamOffest += SubMessageBodySize)

{

// Get the stream chunk from the large message

long arraySize = (messageBodySize – streamOffest) > SubMessageBodySize

? SubMessageBodySize : messageBodySize – streamOffest;

byte[] subMessageBytes = new byte[arraySize];

int result = bodyStream.Read(subMessageBytes, 0, (int)arraySize);

MemoryStream subMessageStream = new MemoryStream(subMessageBytes);

// Create a new message

BrokeredMessage subMessage = new BrokeredMessage(subMessageStream, true);

subMessage.SessionId = sessionId;

// Send the message

m_QueueClient.Send(subMessage);

Console.Write(“.”);

}

Console.WriteLine(“Done!”);

}

}

The LargeMessageSender class is initialized with a QueueClient that is created by the sending application. When the large message is sent, the number of sub messages is calculated based on the size of the body of the large message. A unique session Id is created to allow the sub messages to be sent as a message session, this session Id will be used for correlation in the aggregator. A for loop in then used to create the sequence of sub messages by creating chunks of data from the stream of the large message. The sub messages are then sent to the queue using the QueueClient.

As sessions are used to correlate the messages, the queue used for message exchange must be created with the RequiresSession property set to true.

Implementing the Aggregator

The aggregator will receive the sub messages in the message session that was created by the splitter, and combine them to form a single, large message. The aggregator is implemented in the LargeMessageReceiver class, with a Receive method that returns a BrokeredMessage. The implementation of the class is shown below; console output has been added to provide details of the splitting operation.

public class LargeMessageReceiver

{

private QueueClient m_QueueClient;

public LargeMessageReceiver(QueueClient queueClient)

{

m_QueueClient = queueClient;

}

public BrokeredMessage Receive()

{

// Create a memory stream to store the large message body.

MemoryStream largeMessageStream = new MemoryStream();

// Accept a message session from the queue.

MessageSession session = m_QueueClient.AcceptMessageSession();

Console.WriteLine(“Message session Id: “ + session.SessionId);

Console.Write(“Receiving sub messages”);

while (true)

{

// Receive a sub message

BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));

if (subMessage != null)

{

// Copy the sub message body to the large message stream.

Stream subMessageStream = subMessage.GetBody<Stream>();

subMessageStream.CopyTo(largeMessageStream);

// Mark the message as complete.

subMessage.Complete();

Console.Write(“.”);

}

else

{

// The last message in the sequence is our completeness criteria.

Console.WriteLine(“Done!”);

break;

}

}

// Create an aggregated message from the large message stream.

BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);

return largeMessage;

}

}

The LargeMessageReceiver initialized using a QueueClient that is created by the receiving application. The receive method creates a memory stream that will be used to aggregate the large message body. The AcceptMessageSession method on the QueueClient is then called, which will wait for the first message in a message session to become available on the queue. As the AcceptMessageSession can throw a timeout exception if no message is available on the queue after 60 seconds, a real-world implementation should handle this accordingly.

Once the message session as accepted, the sub messages in the session are received, and their message body streams copied to the memory stream. Once all the messages have been received, the memory stream is used to create a large message, that is then returned to the receiving application.

Testing the Implementation

The splitter and aggregator are tested by creating a message sender and message receiver application. The payload for the large message will be one of the webcast video files from http://www.cloudcasts.net/, the file size is 9,697 KB, well over the 256 KB threshold imposed by the Service Bus.

As the splitter and aggregator are implemented in a separate class library, the code used in the sender and receiver console is fairly basic. The implementation of the main method of the sending application is shown below.

static void Main(string[] args)

{

// Create a token provider with the relevant credentials.

TokenProvider credentials =

TokenProvider.CreateSharedSecretTokenProvider

(AccountDetails.Name, AccountDetails.Key);

// Create a URI for the serivce bus.

Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

(“sb”, AccountDetails.Namespace, string.Empty);

// Create the MessagingFactory

MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

// Use the MessagingFactory to create a queue client

QueueClient queueClient = factory.CreateQueueClient(AccountDetails.QueueName);

// Open the input file.

FileStream fileStream = new FileStream(AccountDetails.TestFile, FileMode.Open);

// Create a BrokeredMessage for the file.

BrokeredMessage largeMessage = new BrokeredMessage(fileStream, true);

Console.WriteLine(“Sending: “ + AccountDetails.TestFile);

Console.WriteLine(“Message body size: “ + largeMessage.Size);

Console.WriteLine();

// Send the message with a LargeMessageSender

LargeMessageSender sender = new LargeMessageSender(queueClient);

sender.Send(largeMessage);

// Close the messaging facory.

factory.Close();

}

The implementation of the main method of the receiving application is shown below.

static void Main(string[] args)

{

// Create a token provider with the relevant credentials.

TokenProvider credentials =

TokenProvider.CreateSharedSecretTokenProvider

(AccountDetails.Name, AccountDetails.Key);

// Create a URI for the serivce bus.

Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

(“sb”, AccountDetails.Namespace, string.Empty);

// Create the MessagingFactory

MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

// Use the MessagingFactory to create a queue client

QueueClient queueClient = factory.CreateQueueClient(AccountDetails.QueueName);

// Create a LargeMessageReceiver and receive the message.

LargeMessageReceiver receiver = new LargeMessageReceiver(queueClient);

BrokeredMessage largeMessage = receiver.Receive();

Console.WriteLine(“Received message”);

Console.WriteLine(“Message body size: “ + largeMessage.Size);

string testFile = AccountDetails.TestFile.Replace(@”\In\”, @”\Out\”);

Console.WriteLine(“Saving file: “ + testFile);

// Save the message body as a file.

Stream largeMessageStream = largeMessage.GetBody<Stream>();

largeMessageStream.Seek(0, SeekOrigin.Begin);

FileStream fileOut = new FileStream(testFile, FileMode.Create);

largeMessageStream.CopyTo(fileOut);

fileOut.Close();

Console.WriteLine(“Done!”);

}

In order to test the application, the sending application is executed, which will use the LargeMessageSender class to split the message and place it on the queue. The output of the sender console is shown below.

The console shows that the body size of the large message was 9,929,365 bytes, and the message was sent as a sequence of 51 sub messages.

When the receiving application is executed the results are shown below.

The console application shows that the aggregator has received the 51 messages from the message sequence that was creating in the sending application. The messages have been aggregated to form a massage with a body of 9,929,365 bytes, which is the same as the original large message. The message body is then saved as a file.

Improvements to the Implementation

The splitter and aggregator patterns in this implementation were created in order to show the usage of the patterns in a demo, which they do quite well. When implementing these patterns in a real-world scenario there are a number of improvements that could be made to the design.

Copying Message Header Properties

When sending a large message using these classes, it would be great if the message header properties in the message that was received were copied from the message that was sent. The sending application may well add information to the message context that will be required in the receiving application.

When the sub messages are created in the splitter, the header properties in the first message could be set to the values in the original large message. The aggregator could then used the values from this first sub message to set the properties in the message header of the large message during the aggregation process.

Using Asynchronous Methods

The current implementation uses the synchronous send and receive methods of the QueueClient class. It would be much more performant to use the asynchronous methods, however doing so may well affect the sequence in which the sub messages are enqueued, which would require the implementation of a resequencer in the aggregator to restore the correct message sequence.

Handling Exceptions

In order to keep the code readable no exception handling was added to the implementations. In a real-world scenario exceptions should be handled accordingly.

BizTalk360 Live Webcast – Current State and what’s coming in vNext

We released version 3.2 of BizTalk360 on 7th of February with bunch of interesting features. Straight after that we merged ourselves on the our 4.0 version which is due to be released end of May.

Live Web Cast : 18th April 2012. Details and registration can be found here.

The main focus for 4.0 version is to expand BizTalk360 existing monitoring capabilities to other core infrastructure related areas like disk space, NT services running state, SQL jobs running state, system resources like CPU, Memory, Event Log patterns etc.

The other important thing we included in 4.0 is a plug-in architecture, monitoring sections can be written in isolation and can be plugged-in into BizTalk360 core monitoring system. These is super useful for customer who got very specific monitoring requirements. Example: One of customers want to do some sequence of steps,

1. Get a value of the element in the config file using xpath,
2. Check the database for some relevant value based on the previous config file value.
3. Alert if the database value is not expected value.

In the past BizTalk customers always got some custom in-house solution to do these kind of things. But now you can easily create such custom monitoring in BizTalk360 and make it part of your overall monitoring solution.

Why not SCOM Monitoring?

There is no question an enterprise monitoring solution like System Center operation manager(SCOM), HPOpenview or something equivalent can do these kind of jobs, but there are some real practical challenges.

1. They are complex products in themselves, and requires considerable knowledge to set it up

2. They are general purpose monitoring products and they look heavy and complicated for some of the basic scenarios

3. It’s often time consuming to setup SCOM or HPOpenview in an organisation, which could range from weeks to months.

With BizTalk360 our focus is to to concentrate only on Microsoft BizTalk Server related areas. We try to keep it as simple as possible. Our focus is to setup BizTalk environment monitoring in few minutes or hours, NOT days, weeks or months.

Come and join our BizTalk360 Live Webcast on 18th April 2012, and check for yourself what we have to offer. We are doing two session on the same day to cover east and west time zones. The details of the event and registration can be done here.

Nandri!

Saravana Kumar

BizTalk Server 2010 Cookbook is now available

BizTalk Server 2010 Cookbook is now available

Finally the day that many of us were waiting for arrived! BizTalk Server 2010 Cookbook was officially released by PACKT Publishing on April 05, 2012. BizTalk Server 2010 Cookbook was written by Steef-Jan Wiggers, awarded Microsoft Most Valuable Professional (MVP) since 2010 and one of the most respected members of BizTalk Server community for his […]
Blog Post by: Sandro Pereira

BizTalk Community series: Introducing Naushad Alam

Stories continue with the BizTalk Community Series that bring BizTalk community members to the foreground. The ninth story will be on Nausad Alam, who is currently one of most active community members on BizTalk forums. Not only on the forums has Naushad been very active. He has also written a few extensive wiki articles for Technet wiki like:

A few weeks a go I had a chat with Naushad and here’s his story.

Naushad is BizTalk Developer/Administrator/Designer/Architect, working with one of the largest Bank in the United Kingdom. He has more than 9 years of working experience with Microsoft products.

Naushad enjoys the BizTalk Administrator and Developer role the most. He started working with BizTalk early 2004 during a BizTalk 2002 project, and after that it seems he got married with this product. Nowadays he is working/thinking/sharing his knowledge on the BizTalk forums.

Naushad has certainly showed his enthusiasm on the forums, TechNet wiki and blog. I quote:

“There are really great articles/forums (MSDN)/blogs evolved during last 3-4 years, I remember when I started working with BizTalk 2002, there was not much information on the internet, So I really had some difficulties find information, but things have been revolutionized in recent years. People should get benefit from this and make the BizTalk product the most popular and successful ever.”
 
His view on sharing knowledge is and I quote:

“I would like to say that it really takes time and passion to write and maintain technical blogs online with a consistency, People should try to get maximum benefits from the blogs/articles because it could give them a simple solution of their complex problems.”

He feels BizTalk is a great product and as he states:

“Certainly BizTalk is a great product and its been evolved a lot during last couple of years, the guys at Redmond are really shipping great features every time making it a better product. I would love to continue the rest my career around this product and other integration products of Microsoft.”

In his spare time Naushad likes to watch animations like Cars, and Toy story with his little daughter, and he is an active reader. He recently finished reading the biography of Steve Jobs” by Walter Isaacson, which aligns with his passion for Apple products. Naushad likes his Mac and loves to design simple user interfaces for it. Currently he is working on a design for a user interface for one of his personal media management software.

Regarding sports Naushad loves to watch cricket; the Indian Cricket team is his favorite team and Sachin Tendulkar his favorite player. Since the last couple of months he has been enjoying playing badminton on the weekends.

Thanks Naushad for your time and your contributions to forums and the TechNet Wiki.

Geospatial Support for Circular Data in SQL Server 2012

SQL Server 2012 adds many significant improvements to the spatial support that was first introduced with SQL Server 2008. In this blog post, I’ll explore one of the more notable enhancements: support for curves and arcs (circular data). SQL Server 2008 only supported straight lines, or polygons composed of straight lines. The three new shapes […]
Blog Post by: Lenni Lobel

BizTalk 2010 Cookbook Released

BizTalk 2010 Cookbook Released

I am very happy to announce that BizTalk 2010 Cookbook written by a great MVP Steef Jan is released. He has done a marvelous job writing this book, the contents of the book can be viewed at Packt Website. The book is mainly for Developers and Administrators who can gain their knowledge in Development, Maintaining, […]
Blog Post by: Abdul Rafay

More Cookbooks: Microsoft Windows Server AppFabric Cookbook

More Cookbooks: Microsoft Windows Server AppFabric Cookbook

I myself have written a cookbook on BizTalk Server called: BizTalk Server Cookbook 2010 for Packt Publishing. Yet there is another cookbook heading our way from this publisher. Fellow MVP from the “Connected Systems” discipline Rick G. Garibay and Microsoft Architect Hammad Rajjoub have written a cookbook on Windows Server AppFabric.

Microsoft Windows Server AppFabric Cookbook
This book will:

  • Download, install, configure and get up and running with Windows Server AppFabric quickly
  • Learn how to take advantage of distributed caching for providing high performance and elastic scale on-premise today
  • Take advantage of the enhanced hosting capabilities that Windows Server AppFabric has to offer including Auto-Start and a greatly simplified configuration experience
  • Enable support for long-running composite applications that are resilient and fault-tolerant while maximizing computing resources
  • Gain insight into the health of your composite applications seamlessly, both proactively and when something goes wrong
  • Learn how to scale Windows Server AppFabric by leveraging farm deployments

I know both Rick and Hammad, who are both very active community leaders and long time MVP’s, have done a great job. Windows Server AppFabric can be viewed as a part of the Microsoft Integration Stack together with BizTalk, SQL Server and Azure ServiceBus. Basically Windows Server AppFabric, BizTalk and SQL Server are a part of the On-premise integration middleware (see whitepaper by Gijs in ’t Veld).

For lightweight integration Windows Server AppFabric together with WF (Windows Workflow) and WCF (Windows Communication Foundation) can be a right fit within your enterprise without using the more scalable, robust BizTalk Server. I believe this book will tell you the complete story on how to use Windows Server AppFabric, how to set it up and how to create robust, and state-of-the-art solutions on it.

To conclude I think the content of this book will be very valuable for WF/WCF developer and also for us BizTalk guys. I am looking forward to this book and have pre-ordered it, so now go and you pre-order it!

Three Software Updates to be Aware Of

Three Software Updates to be Aware Of

In the past few days, there have been three sizable product announcements that should be of interest to the cloud/integration community. Specifically, there are noticeable improvements to Microsoft’s CEP engine StreamInsight, Windows Azure’s integration services, and Tier 3’s Iron Foundry PaaS. First off, the Microsoft StreamInsight team recently outlined changes that are coming in their […]
Blog Post by: Richard Seroter

BizTalk Server 2010 Cookbook Released

BizTalk Server 2010 Cookbook Released

Finally my book with recipes for BizTalk Server 2010 is officially released. After a year of hard work it is available now. You can find the complete table of contents on Packt Website for this book.
image
Before starting this endeavor I asked Richard Seroter for guidance and discussed the opportunity with fellow MVP Randal van Splunteren during MVP Summit 2011. Both motivated me to go for it. So I did and I signed a book contract with Packt.

I did not want to write a straight forward book with recipes only targeted on the out-of-box capabilities of BizTalk Server 2010. So I decided to do it differently.

This book will provide material on community tooling available on CodePlex like BizTalk BenchMark Wizard and many other BizTalk related tooling. Also Microsoft has built tooling for BizTalk like BizTalk Best Practice Analyzer, which can be very valuable in validating your BizTalk environment. The book provides a couple recipes on topics that you would not find in other BizTalk (recipe) books and some that you are familiar with. With all the recipes combined I wanted to draw a contrast of what BizTalk offers out-of-box versus the alternatives offered through the community and 3rd parties.
 
During the course of the book I have had support from fellow MVP’s Saravana Kumar, who contributed a recipe on alternative monitoring with BizTalk360, Mikael H%u00e5kansson who helped on the recipe concerning BizTalk BenchMark Wizard (BBW), and my colleague Douglas Skirving on Security.

It was lots of fun to write this book, and I hope many of you will order the book and enjoy reading it. I would like to thank both my official technical reviewers Randal van Splunteren, Rene Brauwers, Abdul Rafay and Sandro Pereira and others Tord G. Nordahl, Richard Seroter, and Paul Gielens for their insight, hints, tips and keeping me on the right path. Finally I like to thank Packt Publishing for giving me this opportunity. Finally I like to thank the team behind the scene at Packt Publishing that I worked with: Dhwani, Chris, Prasad, Laxmi, Vishal, Rekha, Prachali and Alwin.

If you do have any feedback on my work in this book, don’t hesitate to drop me a line.