Integrating Integrators – BizTalk, Windows Azure, Windows Workflow, and Beyond
Join
Sign in
Search Options
Search Everything
Search BizTalk Blogs
Home
AppFabric
BizTalk Server
Windows Azure
Windows Workflow
Jobs (Hire A Guru)
More ...
Home
»
BizTalk Server
»
BizTalk Blogs
»
BizTalk Community Blogs via Syndication
»
Windows Azure Service Bus Resequencer
Windows Azure Service Bus Resequencer
BizTalk Blogs
This group is for blogs related to BizTalk Server. This includes Community Syndicated blogs and Stephen W. Thomas’s blog.
Get this RSS feed
Home
Blogs
Sitewide Application Navigation
Home
Blogs
Media
Forums
Wikis
Groups
Options
Share this
Monthly Archive List
Archives
May 2013
(41)
April 2013
(70)
March 2013
(65)
February 2013
(57)
January 2013
(79)
December 2012
(63)
November 2012
(68)
October 2012
(74)
September 2012
(66)
August 2012
(63)
July 2012
(77)
June 2012
(101)
May 2012
(64)
April 2012
(64)
March 2012
(68)
February 2012
(48)
January 2012
(34)
December 2011
(61)
November 2011
(44)
October 2011
(76)
September 2011
(66)
August 2011
(46)
July 2011
(66)
June 2011
(75)
May 2011
(58)
April 2011
(46)
March 2011
(65)
February 2011
(62)
January 2011
(76)
December 2010
(67)
November 2010
(140)
October 2010
(154)
September 2010
(143)
August 2010
(120)
July 2010
(86)
June 2010
(116)
May 2010
(91)
April 2010
(120)
March 2010
(98)
February 2010
(103)
January 2010
(107)
December 2009
(64)
November 2009
(118)
October 2009
(127)
September 2009
(89)
August 2009
(74)
July 2009
(115)
June 2009
(129)
May 2009
(134)
April 2009
(136)
March 2009
(161)
February 2009
(100)
January 2009
(107)
December 2008
(107)
November 2008
(106)
October 2008
(173)
September 2008
(146)
August 2008
(139)
July 2008
(101)
June 2008
(115)
May 2008
(120)
April 2008
(134)
March 2008
(104)
February 2008
(136)
January 2008
(106)
December 2007
(73)
November 2007
(135)
October 2007
(143)
September 2007
(138)
August 2007
(144)
July 2007
(139)
June 2007
(139)
May 2007
(166)
April 2007
(199)
March 2007
(200)
February 2007
(188)
January 2007
(182)
December 2006
(151)
November 2006
(149)
October 2006
(184)
September 2006
(147)
August 2006
(124)
July 2006
(125)
June 2006
(125)
May 2006
(89)
April 2006
(63)
March 2006
(83)
February 2006
(40)
January 2006
(42)
December 2005
(16)
November 2005
(32)
October 2005
(17)
September 2005
(29)
August 2005
(15)
July 2005
(11)
June 2005
(45)
May 2005
(39)
April 2005
(28)
March 2005
(14)
February 2005
(16)
January 2005
(18)
December 2004
(14)
November 2004
(13)
October 2004
(11)
September 2004
(26)
August 2004
(8)
July 2004
(9)
June 2004
(2)
May 2004
(2)
April 2004
(2)
March 2004
(2)
February 2004
(1)
Tags
.Net
.NET Framework
AppFabric
Architecture
ASP.NET
Azure
BizTalk
BizTalk 2006
BizTalk 2009
BizTalk 2010
BizTalk Server
Cloud
Community News
General
Microsoft
REST
SharePoint
SOA
Tellago
Uncategorized
Visual Studio
WCF
WCF/WF
Web Services
WF
BizTalk Community Blogs via Syndication
Numerous BizTalk Bloggers all in one spot. All content is property of the original blog owner.
RSS for posts
Windows Azure Service Bus Resequencer
Rate This
Anonymous
Fri, Mar 23 2012 4:47 AM
Comments
0
Introduction
I’ll be presenting a session at
Sweden Windows Azure Group
(SWAG) on Monday, as well as presenting on the Windows Azure Service Bus at various other events. I thought it would be fun to look at implementing some of the Enterprise Integration Patterns using the brokered messaging capabilities. I’ll use this article in the next release of
Windows Azure Service Bus Developer Guide
, and it will probably end up as a “challenge lab” for my
Service Bus course
.
As a long-time BizTalk developer I have seen many scenarios where the order of messages sent to a destination system needs is critical to business operations. Updating orders is a good example, if the first update to an order is sent after the second update then data may well be corrupted. Many BizTalk adapters, such as the file and FTP adapters have the potential to shuffle the sequence of messages as they enter BizTalk server, and message channels must be developed to resequence these messages.
The Enterprise Integration Patterns website provides a description of the Resequencer pattern
here
.
There are also scenarios where the sequence of messages can get mixed up when working with Service Bus brokered messaging. When sending a stream of messages using the asynchronous send method there is a good chance that the messages will be placed on the queue out of sequence. The following scenario will use this as an example, and a resequencer will be implemented to restore the order of the messages.
Resequencer Scenario
The scenario used for the resequencer implementation is the transfer of a photo over a Service Bus queue. The photo is broken down into 64 tiles (8 x 8), with each tile being sent in a separate message. When the tiles are received from the queue they are reassembled to form the original image.
I’ve have used this scenario previously when demoing resequencer patterns in BizTalk Server after seeing Shy Cohen use a similar scenario to demo reliable messaging in WCF. It’s great to use for presentations, and the use of an image makes it easy to see when messages are out of sequence.
The application that sends and receives the messages is built using Windows Presentation Foundation (WPF), with a basic user interface to show the original image, and the reassembled image after it has been sent and received on the queue. The application has the option to send messages using the synchronous or asynchronous send methods.
A screenshot of the application after sending the messages synchronously is shown below.
The received image has been assembled correctly from the sequence of message tiles, indicating that there was not a disturbance in the order of the messages. The sending of the messages, however, was not optimal. As they were sent synchronously, with the send operation on one having to complete before the next message can be sent, it took almost 7 seconds to send all 64 messages, at about 9.5 messages per second.
Sending messages asynchronously will provide much better throughput for the sending application. The results of this are shown below.
Using asynchronous send the 64 messages were sent in under half a second, at a rate of over 150 messages per second. In this test, sending messages asynchronously provides better than 15 times the throughput. Sending the messages asynchronously, however, has affected the order in which the messages were received. While most of the messages are in order, the first four messages containing the first half of the book title were the last four messages to be received.
In some scenarios the order of messages is not important, we are only concerned with throughput and reliability, but in this scenario it affects the display of the image. For these kinds of scenarios we need to implement a resequencer.
Resequencer Implementation
In this scenario the resequencer will be implemented as in intermediary service between the source system and the target system. I am using the Image Transfer WPF application to act as the source and target system, but the principle is the same.
The pseudo-code for a possible resequencer is shown below.
while (true)
{
ReceiveMessage;
if (message is in sequence)
{
ForwardMessage;
Forward any stored in sequence messages;
}
else
{
Store message;
}
}
The design decisions we have to make when implementing a resequencer are as follows:
·
How is the sequence of messages determined?
·
How should the message store be implemented?
Determining the Sequence of Messages
In order for a resequencer to work it must have a means of determining the correct sequence of messages. The BrokeredMessage class provides a SequenceId property, which is set by the queue or topic when the message is enqueued, starting with 1 for the first message enqueued.
In some scenarios the messages could have been enqueued in the correct order, and then the sequence changed by a multi-threaded receive. In those scenarios the SequenceId could be used to resequencer the messages.
In this scenario the messages are enqueued out of sequence by the multi-threaded asynchronous sending operations. This means that the SequenceId property of the dequeued messages will not reflect correct order of the messages. This means that the message sender will have to
provide some means of identifying the sequence of each message.
The following code shows the code used to create messages from the encoded image streams and assign an incrementing send session ID value to the Label property of the message header. The message is then sent synchronously or asynchronously depending on the selection made by the user.
for (int y = 0; y < ImageTiles; y++)
{
for (int x = 0; x < ImageTiles; x++)
{
MemoryStream
ms =
new
MemoryStream
();
// Use a delegate as we are accessing UI elements in a different thread.
SimpleDelegate getImageCrop = delegate()
{
// Create a cropped bitmap of the image tile.
CroppedBitmap cb = null;
cb = new CroppedBitmap(FileBitmapImage, new Int32Rect
(x * blockWidth, y * blockHeight, blockWidth, blockHeight));
// Encode the bitmap to the memory stream.
PngBitmapEncoder encoder = new PngBitmapEncoder();
encoder.Frames.Add(BitmapFrame.Create(cb));
encoder.Save(ms);
};
this.Dispatcher.Invoke(DispatcherPriority.Send, getImageCrop);
// Create a brokered message using the stream.
ms.Seek(0,
SeekOrigin
.Begin);
BrokeredMessage
blockMsg =
new
BrokeredMessage
(ms,
true
);
// Set the send sequence ID to the message lable.
blockMsg.Label = sendSequenceId.ToString();
// Send the message using either sync or async.
if
(SendAsync ==
true
)
{
queueClient.BeginSend(blockMsg, OnSendComplete,
new
Tuple
<
QueueClient
,
string
>(queueClient, blockMsg.MessageId));
}
else
{
queueClient.Send(blockMsg);
}
// Increment the send sequence ID
sendSequenceId++;
// Update the progress bar.
SimpleDelegate updateBar = delegate()
{
prgTransfer.Value++;
};
this.Dispatcher.BeginInvoke(DispatcherPriority.Send, updateBar);
}
}
Delegates have been used here as the code is running on a background worker thread and needs to access the UI elements in the WPF application.
Storing Messages
Any messages that are received out of sequence will need to be stored, and then forwarded to the target system once the previous message in the sequence has been received and forwarded. The message deferral functionality available when receiving messages from queues and subscriptions provides a nice way to store messages that are out of sequence. Provided the messages are received using the peek-lock receive mode the messages can be deferred, and then received again by specifying the appropriate SessionId in the receive method.
Implementing the Resequencer Loop
The resequencer is implemented as a separate WPF application. It receives messages from an inbound queue, resequences them, and sends them, and then sends them on an outbound queue. The code for the main loop of the resequencer is shown below. The receiveSequenceId variable is used to keep track of the sequence of received messages. If the sendSequenceId, which is retrieved from the Label property of the message matches the receiveSequenceId the message is cloned and forwarded and receiveSequenceId is incremented.
When a message has been forwarded, the next message in the sequence may have been received earlier and been deferred. The deferred messages are checked using the dictionary to see if the next message is present. If so it is received, copied, and forwarded, and the process repeated.
If the receiveSequenceId does not match sendSequenceId then the message is out of sequence. When this happens the message is deferred, and the SequenceId of the message added to a dictionary with the sendSequenceId used as a key. The SequenceId is required to receive the deferred message.
// Initialize the receive sequence ID.
long
receiveSequenceId = 1;
while
(
true
)
{
BrokeredMessage
msg = inbloudQueueClient.Receive(
TimeSpan
.FromSeconds(3));
if
(msg !=
null
)
{
long
sendSequenceId =
long
.Parse(msg.Label);
// Is the message in sequence?
if
(sendSequenceId == receiveSequenceId)
{
// Clone the message and forward it.
Debug
.WriteLine(
"Forwarding: "
+ sendSequenceId);
BrokeredMessage
outMsg = CloneBrokeredMessage(msg);
outboundQueueClient.Send(outMsg);
msg.Complete();
// Increment the receive sequence ID.
receiveSequenceId++;
// Check for deferred messages in sequence.
while
(
true
)
{
if
(deferredMessageSequenceNumbers.ContainsKey(receiveSequenceId))
{
Console
.WriteLine(
"Sending deferred message: "
+ receiveSequenceId);
// Receive the deferred message from the queue using the sequence ID
// retrieved from the dictionary.
long
deferredMessageSequenceNumber =
deferredMessageSequenceNumbers[receiveSequenceId];
BrokeredMessage
msgDeferred =
inbloudQueueClient.Receive(deferredMessageSequenceNumber);
// Clone the deferred message and send it.
BrokeredMessage
outMsgDeferred = CloneBrokeredMessage(msgDeferred);
outboundQueueClient.Send(outMsgDeferred);
msgDeferred.Complete();
receiveSequenceId++;
}
else
{
// The next message in the sequence is not deferred.
break
;
}
}
}
else
{
// Add the message sequence ID to the dictionary using the send sequence ID
// then defer the message. We will need the sequence id to receive it.
deferredMessageSequenceNumbers.Add(sendSequenceId, msg.SequenceNumber);
msg.Defer();
}
}
}
Testing the Implementation
In order to test the resequencer the image transfer application will send messages to the inbound queue, and receive them from the outbound queue. The results of testing with the application sending messages asynchronously is shown below.
When the application was tested with 16 tiles (4 x 4) with tracing added the forwarding and deferring of the messages can clearly be seen.
Deferring: 3
Deferring: 4
Deferring: 5
Deferring: 6
Deferring: 7
Deferring: 8
Deferring: 9
Deferring: 10
Deferring: 11
Deferring: 12
Deferring: 13
Deferring: 14
Deferring: 15
Deferring: 16
Forwarding: 1
Forwarding: 2
Sending deferred message: 3
Sending deferred message: 4
Sending deferred message: 5
Sending deferred message: 6
Sending deferred message: 7
Sending deferred message: 8
Sending deferred message: 9
Sending deferred message: 10
Sending deferred message: 11
Sending deferred message: 12
Sending deferred message: 13
Sending deferred message: 14
Sending deferred message: 15
Sending deferred message: 16
Issues with Cloning Messages
When receiving messages from one messaging entity and forwarding them to another the following code should not be used.
// Receive a message from the inbound queue.
BrokeredMessage
msg = inbloudQueueClient.Receive(
TimeSpan
.FromSeconds(3));
// Forward the message to the outbound queue.
outboundQueueClient.Send(outMsg);
It will result in an InvalidOperationException being thrown with the message “A received message cannot be directly sent to another entity. Construct a new message object instead.”.
The resequencer uses a quick and dirty message clone method, the code for this is shown below.
private
BrokeredMessage
CloneBrokeredMessage (
BrokeredMessage
msg)
{
Stream
stream = msg.GetBody<
Stream
>();
BrokeredMessage
clonedMsg =
new
BrokeredMessage
(stream,
true
);
clonedMsg.Label = msg.Label;
return
clonedMsg;
}
The code seems to work fine in this scenario, but care must be taken to ensure that the appropriate message properties are copied from the header of the source message to that of the destination message.
Alternative Resequencer Implementations
The implementation in the previous section has been developed to demonstrate the principles of a resequencer in a presentation. For this reason it is hosted in a WPF application, and no error handling code has been added. In a real world scenario the resequencer would be either hosted in a service, or alternatively in the target system. Using a worker role in Windows Azure would allow for a cloud-based solution, but the hourly costs may make this prohibitive.
Handling Errors
As well as the standard error handling on sending and receiving messages a resequencer should also handle a scenario when one of the messages in the sequence is missing. If this happens in my scenario all subsequent messages will be deferred, and the system will never recover. There are a number of ways that this could be handled better.
One option would be to set a threshold of a specific number of deferred messages or a specific time interval that would indicate that the missing message is probably lost. When this threshold is reached, an error or warning could be raised, and the sequence could be resumed. This could either be by an administrative action, or automatically. In either case, if the missing message does eventually arrive at the resequencer it can be dead-lettered and another error or warning raised.
Storing State of Deferred Messages
One of the disadvantages of the message deferral design is that the resequencer needs to hold the state of the SequenceId values for the deferred messages. If this is lost there is no way to receive these messages from the queue. In my demo scenario I use an in-memory dictionary for this. In a real-world implementation the resequencer should store the SequenceId values in a durable store.
Using SessionId for Send Sequence Id
An alternative to using message deferral to store messages the resequencer could be implemented using message sessions. Each session would contain one message, and the SessionId would be set to the sending sequence id. The resequencer (or the receiving application) could then use a session receiver and receive the message from the session by incrementing the value of the session it is listening for to receive the messages in order. The disadvantage of this design is that it would not be possible to use sessions for another purpose in the implementation.
Read the complete post at
geekswithblogs.net/.../149093.aspx
BizTalk
,
WCF/WF
,
Azure
,
VS2010
,
AppFabic
,
Windows Azure Service Bus