My contributions to TechNet Gallery

As I like to research how BizTalk’s interiors work, I often find myself writing queries against BizTalk’s databases. Until now I’ve put literally a handfull of those queries at TechNet Gallery.

Here follows a description of those queries:

View Hosts and Host Instances Information from a SQL Query
This query shows information about the configured Hosts and the Host Instances within your BizTalk Group(s). The query becomes run against the BizTalkMgmtDb database.

It shows the following information:
The name of the BizTalk Group
The name of the Host
Host Type (In-process or Isolated)
32-bits only (Yes or No)
Tracking Host (Yes or No)
The name of the Server where the Instance exists
Name of the Host Instance
Disabled from starting (Yes or No)

The result is sorted on:
BizTalk Group
Host Name
Server Name

Show BizTalk Applications, Orchestrations, Ports and Pipelines with a SQL query
This query queries the BizTalk Management database. In one view it shows the following information:
The BizTalk Applications from the BizTalk Group at hand
The Assemblies which are deployed to the Applications
The Orchestrations which are in the Applications
The Status of the Orchestrations:

  1. = Unenlisted 
  2. = Stopped 
  3. = Started

The Ports as defined in the Orchestrations
The Port Types
The Port Type Operations
The Receive Ports as they are bound to the Orchestrations
The Receive Locations as they belong to the Receive Ports
The Receive Pipelines as they are configured at the Receive Locations
The Send Ports as they are bound to the Orchestrations
The Send Pipelines as they are configured at the Send Ports
The Send Port Groups as they are bound to the Orchestrations

The output is sorted on:
Application
Assembly
Orchestration

Number of records in the MarkLog table
This query returns the number of records found in the MarkLog table of your BizTalk databases. This table is filled by the Backup BizTalk Server job, but is not cleaned up, unless you use the Terminator tool. Read this article to purge the records in the MarkLog table.

If you have more than 150.000 records in any of your MarkLog tables, you should use Terminator to purge these records.

The query can be easily extendedwith any custom databases which are also backed up by the Backup BizTalk Server job.

Show BizTalk Subscriptions
This download contains a .SQL query, which can be executed against your BizTalk Message Box. The results you get are the current subscriptions in the Message Box. Only the ‘Equals Predicates’ subscriptions are shown.
 
The following fields are returned:
Application – Name of the BizTalk Application which contains the orchestration for this subscription
Orchestration – Name of the orchestration which has the subscription
Created On – The timestamp when the subscription was created
Equals Predicate – The Equals Predicate (mostly the promoted property on a schema)
Value – The value part from the predicate

Get Orchestration Instances, Counts and States
When this script is being run against a MessageBox it gives you the following information:

Name of orchestration instances
Number of orchestration instances
State of the orchestration instances

The view is grouped by the first 2 columns and sorted on all 3 columns. To make it safe to execute this query against the MessageBoxDb, the WITH(NOLOCK) hint is used.

Number of Messages per MessageType in the TrackingDb
This query gives you insight in what information is stored in the BizTalk TrackingDb. When being run against it, the query shows the following columns:

Schema
Number of Messages

The result is grouped by and sorted on the field Schema. To make it safe to execute this query against the MessageBoxDb, the WITH(NOLOCK) hint is used. If you query the BizTalk databases yourself, never forget to add the WITH(NOLOCK) hint.

Hope these queries are helpfull for you. I’ll certainly add more queries to TechNet Gallery.

How BizTalk Maps Work – Processing model (Part 1)

How BizTalk Maps Work – Processing model (Part 1)

Introduction Maps or transformations are one of the most common components in the integration processes. They act as essential translators in the decoupling between the different systems to connect. This article aims to explain how maps are processed internally by the engine of the product as we explore the map editor BizTalk Server. This series […]
Blog Post by: Sandro Pereira

Integration Patterns utilizing the Windows Azure Service Bus-Part II

In the previous post I covered the Messaging Channel group of patterns. In this post, I will cover the Message Routing Group patterns.  This group contains:

Content Based Router, Message Filter, Recipient List, Splitter, Aggregator, Resequencer, Composed Message Processor, Scatter-Gather, Process Manager and Message Broker.

So, lets dive right in to each of these patterns and see how these can be implemented.

 

  Content-Based Router

This pattern requires the the router system inspect each message and direct that message to different channels based on data elements contained in the message.  The routing should be configurable to route based on different fields, the existence or non-existence of a specific value and the routing rules should be easily maintained as there are typically frequent changes

This pattern can be implement in Azure using a Service Bus Topic.  There are two parts that need to be implemented to put this pattern into use.  The first is that subscriptions need to be setup and the second is that a message needs to be created with the specific properties populated to match the subscription which follows the diagram below:

 

The following code shows to create the subscriptions as well as the submitted message.  Let’s first start by creating the subscriptions

public static void CBRPatternExampleWithTopic(String topicName)
 {
 String serviceNamespace = ....;
 String issuer = ....;
 String key = ....;
 
 SharedSecretCredential credential =.... 
 ....
 ServiceBusNamespaceClient namespaceClient = new ServiceBusNamespaceClient(serviceBusUri, credential);
 Topic topic = namespaceClient.GetTopic(topicName);
 //using the AddSubscription(String, FilterExpression) method overload 
 topic.AddSubscription(“PremiumMemberType”,new SqlFilter("MemberType = Premium"));
 topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType <> Premium"));
 }

    

Then we will create the message with the property populated that the subscriptions will key off of.

public static void SubmitMessageToTopic()
 {
 String issuer = ....;
 String key = ....;
 TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuer, key); 
 Uri uri = ServiceBusEnvironment.CreateServiceUri(.., .., ..);

 MessagingFactory factory = MessagingFactory.Create(uri, tp);
 MessageSender myTopic = factory.CreateMessageSender("<TopicName>");
 BrokeredMessage message = new BrokeredMessage(); 
 //This will create a message that will be routed to the first subscription above 
 message.Properties.Add("MemberType", Premium); 
 //OR add the following property instead to have it routed to the other subscription above
message.Properties.Add("MemberType", Gold);
myTopic.Send(message);

 

  Message Filter

This pattern requires that any message that doesn’t match the criteria of a subscription will be discarded.  This pattern is used to eliminate any messages that aren’t routed to a subscriber such as those setup using the Content Based Router pattern.  If the message doesn’t fall into a filter used by the subscriber then the message should be routed to a filter channel. 

When using an Azure Service Bus Topic you can utilize the RuleDescription object along with the FilterAction and FilterExpression objects to setup the subscriber filters.  You can explicitly utilize the MatchNoneFilterExpression object to subscribe to messages that haven’t been routed to any other subscriber.  Once routed to this subscriber you can either log that the message wasn’t picked up and you can either process it or discard the message

The following code shows how to add a subscriber to the Topic for a MatchNoneFilterExpression

public static void AddFilterPatternSubscriptionToTopic(String topicName)
 {
 String serviceNamespace = ....;
 String issuer = ....;
 String key = ....;
 
 SharedSecretCredential credential =.... 
 ....
 ServiceBusNamespaceClient namespaceClient = new ServiceBusNamespaceClient(serviceBusUri, credential);
 Topic topic = namespaceClient.GetTopic(topicName);
 <place other subscriber code here>
 RuleDescription matchNoneRule = new RuleDescription()
 {
     FilterAction = new SqlFilterAction(“set defer = ’yes’;”),
     FilterExpression = new MatchNoneFilterExpression()
 };
 
 //using the AddSubscription(String, RuleDescription) method overload
 Subscription matchNoneFilterSubscription =
 topic.AddSubscription(“matchNoneFilterSubscription”, matchNoneRule);  
 }
   

  Dynamic Router

This pattern provides for the scenario in that each recipient would register and list the conditions or rules for which messages or types of messages it can handle.  The conditions or rules are stored so that when a message arrives, the rules run and the message parameters are evaluated against the rules and then the message is routed accordingly to the correct receiver.  The benefit of this is that you can achieve predictive routing, with eased maintenance and less dependency on each receiver.

This pattern is implemented in Azure by default in both the Topic and the Queues.  When you create a subscription and a receiver for a topic you are registering with the service and as part of that registration process you also provide rules or conditions for the types of messages or the value of properties that you want to filter for. 

But what if we want to take this a step further and provide even more dynamic functionality.  What if we could implement the functionality that is available in BizTalk called Promoted Properties but do it completely within the functionality of Azure.  Promoted Properties is a way of taking data contained in the body of an object or message and put that in a properties collection that can then be used to route messages.  When working with Azure, the BrokeredMessage object contains a properties collection that can be used to route however, you manually have to set these properties at design time.  Our goal is to take the values of properties in the object body itself and promote those to the BrokeredMessage properties collection so that they can be assigned and routed at run time.

The subscribers on the Topic still need to be created and registered but this functionality allows the data in the message to direct the flow of the message and provides further dynamic routing scenarios. 

I have been working with Michael Muta (MCS Consultant) and he came up with a great implementation of this pattern.  Lets look at how this can be accomplished. 

First, we would create a new class that will be serialized as the message body of the BrokeredMessage.  In this case we will create a class for membership.

    [Serializable]
    public class Member
    {
        public Member()
        {
            
        }

        public bool NewMember { get; set; }
        public string Name { get; set; }
        public string Address { get; set; }

        [PromotedProperty("ReceiveEMail", PropertyDirection.ReadAndWrite)]   
        public string ReceiveEmailAds { get; set; }

        [PromotedProperty("Licensed", PropertyDirection.ReadOnly)]
        public bool Licensed
        {
            get { return (....); }
        }
    }

As you can see we have decorated a number of the properties with custom attributes.  We decorate every property that we will want to add to the BrokeredMessage properties collection.  In addition, we can set the PropertyDirection to either ReadOnly or ReadAndWrite.  ReadOnly certainly makes sense – we just need to take a value of the property and promote that into the collection.  However, we can take this beyond just static promotion.  With the ReadAndWrite we can provide the functionality to update the property as the message is received from the Topic.  With the Subscriptions on our Topics we have the ability to perform an action in the rules.  In that action we can set the value of a property, and in this case, we can push that modification all that way back to the internal property on the object.  We can then use that for other downstream actions in our processing of the message.  This give us a lot more flexibility and functionality in our solution. 

After we have our object decorated we can put together the code to send the message.  We can take the same method that we used earlier to submit a message but we will do a couple of different things. 

As you will notice, the code to submit the message is pretty much the same – we create the factory, create the message sender, create an instance of our member object and create a BrokeredMessage.  In order to get our promoted properties code to work as an extension to the BrokereMessage we needed to utilize the Extension Methods functionality in C#.  We needed to do this because the the BrokeredMessage class is sealed.  One of the drawbacks of the Extension Methods functionality is that it doesn’t work on the objects constructor.  Therefore, we need to call the PromoteProperties method that is placed on the BrokeredMessage class.  By doing this, this method will interrogate all of decorated properties on your object and promote them. 

public static void SubmitMessageToTopic()
 {
 String issuer = ....;
 String key = ....;
 TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuer, key); 
 Uri uri = ServiceBusEnvironment.CreateServiceUri(.., .., ..);

 MessagingFactory factory = MessagingFactory.Create(uri, tp);
 MessageSender myTopic = factory.CreateMessageSender("<TopicName>");
 
 //Create our message body object 
 Member memberItem = new Member();
 BrokeredMessage message = new BrokeredMessage(memberItem); 
 //Now that we have created the message and passed in our object we need to do one more thing 
 message.PromoteProperties(memberItem); 
 //even thought we are promoting properties, we can still add properties at design time as usual as shown below
 //message.Properties.Add("MemberType", Gold);
 myTopic.Send(message);
}

One other item I want to mention in the code above is that even though you promote properties decorated in your object you can still add properties to the BrokeredMessage properties collection at design time as usual.

 

So, we now have our object decorated and we have the code to submit the message to the Topic.  Lets look at the extension method that promotes the properties.

public static void PromoteProperties(this BrokeredMessage msg, object serializableObject)
        {
            PropertyInfo[] properties = serializableObject.GetType().GetProperties();

            foreach (PropertyInfo prop in properties)
            {
                foreach (PromotedProperty promotedProp in prop.GetCustomAttributes(typeof(PromotedProperty), true))
                {
                    object value = prop.GetValue(serializableObject, null);
                    msg.Properties.Add(promotedProp.Name, value);
                }
            }
        }

This method loops through the properties in the object, find the promoted properties and adds the value of the property with the name provided in the attribute to the BrokeredMessage’s property collection.  There are other other methods in the extension class for things such as setting the properties value for our ReadAndWrite property direction but this gives an overview on how we can add this functionality and provide not only a great implementation of this pattern but also extend the functionality of the Azure Topics and routing functionality.

 

  Recipient List

This pattern is related to the Content Based Router pattern in that the CBR pattern routes messages based on the message content.  The Recipient List pattern provides a means for the message to contain a list of one or more recipients to receive the message.  This differs from CBR in that we may want to submit a message to multiple order fulfillment suppliers in order to receive back prices thus allowing for the comparison of responses and the selection of the cheapest supplier as shown in the diagram below:

This pattern can be implemented in the Azure Service Bus Topic very similarly to that of the Content Based Router example.   The difference will be that we will create the subscriptions using the SQLFilter but will utilize the LIKE keyword.  This way the sender can populate the property with a comma separated list and the subscriptions will pick it up and route to N number of receivers.

So, if we created three subscriptions

topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType LIKE ’%Premium%’"));
topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType LIKE ’%Gold%’"));
topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType LIKE ’%Silver%’"));

and we wanted to create a message that would be routed to Premium and Gold we would create a BrokeredMessage and set the following property:

message.Properties.Add("MemberType", “Premium,Gold”);
This would allow us to set, at the client,  the recipients that we wanted to send the message to.
 
  Splitter

This pattern provides the ability to break apart a composite message into its separate message parts and process the related message parts separately.  This pattern is useful in scenarios where you might have an order that has many order items and each of the order items need to be sent to different receivers or sent to different inventory systems.  

This pattern can be implemented in Azure using the Service Bus Topic or Queue by utilizing the sessions feature.  To setup a queue or topic for sessions we need to set the RequiresSession property in the QueueDescription or SubscriptionDescription.  Then each message needs a SessionId in order to relate the individual messages.  The receiver will read a message and accept the session.  Once the session is accepted, the session is locked and other receivers won’t be able accept it.  This ensures that there are no competing receivers.  One thing to note about sessions and Azure is that session ordering is guaranteed even if the receiver crashes.  Another thing to think about is this pattern comes in handy when you need to provide support for sending messages larger than the allowed message size.  To send larger message, you can chunk the message into smaller sizes and use the session id in order to receive all the messages in the same session and then reassemble them.

There is a great example of this that Alan Smith has put together so I won’t duplicate the effort.  Take a look at his blog for an example of implementing this pattern.

 

  Aggregator

This pattern is related to the splitter pattern.  Whereas the splitter processes separate messages, the aggregator is used to bring all the separate messages back together.

This pattern can be implemented in Azure using the Service Bus Topic and correlation.  When setting up the subscription you can use the CorrelationFilterExpression just as we did with the FilterExpressions in the code samples shown above in previous patterns.  So the submitter would set the CorrelationId property of the BrokeredMessage object to an identifier.  Then this same identifier would be used in the CorrelationFilter object that was configured when creating the subscription.  Also, you will want to know when the processing is finished for this correlation set.  You could easily create a custom property on the BrokeredMessage object (as we did previously with the MemberType property) that lists the total number of messages that are in the set or you may just decide to create a flag that states that this is the last message in the set.  Once the total count has been received or the message containing the flag is received then you can perform whatever task is required to aggregate all the messages together.

You might ask, why use Correlation over Session.  The CorrelationId property provides an exact match, the CorrelationId is saved in a hash table and the CorrelationId processing is better optimized for matching performance than the SQL expression filters.

There is a great example of this that Alan Smith has put together that is in the same article as his Splitter implementation so I won’t duplicate the effort. Take a look at his blog for an example of implementing this pattern.

 

  Resequencer

The Resequencer pattern provides guidance around putting messages back in order.  Individual messages may be processed by a downstream system and returned faster than other downstream systems can process their messages and thus leading to messages that are out of order.  This pattern uses a buffer to store messages so that they can be resequenced.  Typically the Resequencer does not modify the message in any way.  Once in the required order, the messages can be delivered as shown in the following diagram. 

 

This pattern can be implemented in Azure through a Service Bus Queue.  The queue becomes the buffer to not only store the messages that are submitted but also to store the messages that are out of order.  There are a number of steps that will be required to put this pattern in practice.  First, there is no change to the code that is used to submit the messages to the queue however you will need to populate the MessageId property of the BrokeredMessage object.  While there is nothing out of the ordinary to submit, there is a bit of work to be done by the receiver.  Next, we are going to take advantage of the Defer() method which will allow us to put a message back on to the queue that we detect is in the wrong sequence.  Then we can dequeue it later when we need that message by calling the receive message and passing in the message sequence.  This means that we need to keep a list of message sequence numbers in our code.  One thing to note is that once we defer a message and put it back on the queue the only way that we can retrieve it again is to call the receive and pass in the sequence number.  Another thing to note is that the Service Bus Sequencing may be different than the applications sequence numbering.  Therefore you may need to store both in either session state or an internal list.

Lets look at the code to put this pattern into place. 

First, let’s look at what is needed to send the message to the queue.

 string issuer = "....";
 string key = "....";

 //create the token provider and URI that are needed to send to the MessagingFactory
 TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuer, key); 
 Uri uri = ServiceBusEnvironment.CreateServiceUri("sb", "<ServiceNamespace>", string.Empty);

 //retrieve the MessageSender for the SeqQueue within the namespace
 MessagingFactory factory = MessagingFactory.Create(uri, tp);
 MessageSender SeqQueue = factory.CreateMessageSender("SeqQueue");
 //create a message and use a string for the body.  We will put the sequence here as well as in a property.
 BrokeredMessage message1 = new BrokeredMessage("MessageSequence 1”);

 //set some additional custom app-specific properties
 message.Properties["MessageNumber"] = "1";
 message.Properties["TotalMsgsInSequence"] = "3";   
 //send the first message to the queue
 SeqQueue.Send(message1);
 
//create the next message in the sequence. This message will be out of order.
 BrokeredMessage message3 = new BrokeredMessage("MessageSequence 3”);
 message.Properties["MessageNumber"] = "3";
 message.Properties["TotalMsgsInSequence"] = "3";   
 //send the first message to the queue
 SeqQueue.Send(message3);
//create the last message in the sequence.
 BrokeredMessage message2 = new BrokeredMessage("MessageSequence 2”);
 message.Properties["MessageNumber"] = "2";
 message.Properties["TotalMsgsInSequence"] = "3";   
 //send the first message to the queue
 SeqQueue.Send(message2);

Now we have three messages in our queue that are out of order.  Next, let’s look at what is needed to receive the message and process them in order.  By the way, I will be utilizing the QueueClient object as this supports more advanced features (including sessions).

private static void ReadMessagesFromQueue()
{
.... 
// Read messages from queue until queue is empty: 
 
MessageReceiver Qreceiver = queueClient.CreateReceiver(); 
List<MessageReceipt> MessageReceiptsDeferred = new List<MessageReceipt>(); 
 
BrokeredMessage receivedMessage; 
while (Qreceiver.TryReceive(TimeSpan.FromSeconds(10), out receivedMessage)) 
{ 
     
  msgCount++;
  //check the received message and determine if it is in the sequence we need 
  if (receivedMessage.Properties["MessageNumber"].ToString() == msgCount) 
  { 
     
     ProcessEachMessage(receivedMessage); 
  } 
  else 
  { 
     receivedMessage.Defer();
     //store the message receipt for later 
     MessageReceiptsDeferred.Add(receivedMessage.MessageReceipt); 
  } 
} 
 
 
//we have now processed messages 1 and 2.  We need to go back and get message 3. 
foreach (MessageReceipt receipt in MessageReceiptsDeferred) 
{ 
      ProcessEachMessage(Qreceiver.Receive(receipt)); 
} 
 
 
Qreceiver.Close(); 
 
 
}
private static void ProcessEachMessage(BrokeredMessage message) 
{ 
         //This is where we do the work we need to on the messages (i.e., reassemble, send off to a LOB system, etc.)
         //Do your work here
       
         message.Complete(); 
} 
 

 

This example shows you how we can put this pattern into code.  As with all code examples, there is still work that needs to be done to this code if you are going to use this as a sample. 

As you think about putting this pattern into practice, it could be useful for situations in which you want to process messages that have high and low priority all on one queue.  You may not know what order the messages are submitted but you can receive all the messages, process the ones marked as high priority and defer the other submitted messages and then go back and process those messages after all the high priority messages have been drained.

 

  Composed Msg. Processor

This pattern provides guidance for message processing where there are a number of sub messages (i.e., order line items) that need to be routed to individual destinations, processed, returned and aggregated back into a single message.  This pattern is composed of a number of other patterns that include the Splitter, Content Based Router and Aggregator.

Because this pattern utilizes patterns already covered in this article I won’t duplicate the code as that can be found in each patterns implementation.  However, I will include a diagram that shows how these patterns can come together to create this complex pattern.

 

 

Scatter-Gather

This pattern outlines how a single message can be sent to many receivers, or a message may be split and its parts sent to many receivers so that one of many external providers may process the message.  Once the message has been processed and returned then the messages need to be aggregated and brought back into one message that will be processed.  Think of an order that could be fulfilled by a number of suppliers.  You can scatter the messages to those suppliers and gather all the responses and decide if you are going to take the first message returned and continue processing or if you analyze all the responses and take the lowest cost supplier.  Either way, once you have received all the messages you will take the message and send it on to the next step in the process.

To put this pattern in practice we can take implementations of other patterns.  The scatter portion can be implemented through the use of the Splitter pattern whereas the Gather portion can be implemented through the use of the Aggregator pattern.  Both of which are covered above.

 

  Process Manager

The process manager pattern outlines how you send a message through a number of processing steps that aren’t known at design time. These steps may not even be sequential.  In addition, the sequence of the processing steps may also be modified based on the results of the previously executed processing step.

To put this pattern into practice you can utilized Windows Workflow.  There are a number of blog articles showing how you can implement the currently released version of WF 4 in your code and deploy it to Azure.  In addition, there was a CTP of WF in Azure that was released last summer.  Lastly, there is also the Workflow Activity Pack for Windows Azure that allows you to run WF on-premises and interact with artifacts in Azure.       

Blog Post by: Stephen Kaufman

Integration Patterns utilizing the Windows Azure Service Bus

For years we have all utilized Gregor Hohpe’s Enterprise Integration Patterns book (and the web site).  Many of these patterns, if not most, are well known and there are many resources out there that talk about how these patterns can be implemented.  There are also a number of resources that talk about how these patterns can be implemented in BizTalk or Windows Server AppFabric. 

However, there is very little that shows how these patterns can be implemented using the Windows Azure Service Bus.  This two part series will take a patterns first  look at how these different patterns can be implemented. 

The patterns that will be covered in this post will focus on the Messaging Channel groupings.  The patterns in that fall in the Messaging Channel group contain:

Point to Point, Publish/Subscribe, Datatype Channel, Invalid Message Channel, Dead Letter Channel, Guaranteed Delivery, Channel Adapter, Messaging Bridge and Message Bus.  

In the next post we will focus on the patterns that fall in the Message Routing group, which contain:

Content Based Router, Message Filter, Recipient List, Splitter, Aggregator, Resequencer, Composed Message Processor, Scatter-Gather, Process Manager and Message Broker.

There is also the Messaging Systems group of patterns but those are being addressed with the Azure Integration Components (that is currently in CTP).  As those components get closer to release I will cover those patterns as well.

Lets start out by looking at each of the patterns in the Messaging Channel Group including a description of the pattern and then how it can be implemented.

Point-to-Point Channel requires that only one receiver gets the message.  If there are multiple receivers, then only one must still get the message.  This functionality can be done directly through WCF (the Service Bus can also be included by hosting the WCF service).  This pattern can also be accomplished through the use of the Service Bus Queue and controlling what services are listening.

Publish-Subscribe Channel requires that the message or event sent to the channel will be delivered to each and every receiver.  Once the receiver obtains a copy of the message it is removed from the channel.  This can be implemented through the use of Service Topics. 

A Service Bus Topic can have multiple subscriptions with each subscription being able to be individually managed.  The subscriber receive all messages, based on filters, that are sent once the subscriber is created.  If a message satisfies the filter criteria for more than one subscriber the message will be delivered to each subscriber and then onto the receiver.  When a message needs to be delivered to more than one subscriber, the message is not duplicated.  The meta data for the filter/subscriptions is stored and is tracked based on which subscriber/receiver has received the message.  In terms of removing it from the channel, think of the concept of ref counts.  One other thing to keep in mind is that if the Receiver is not online to receive the message, then it is stored until the receiver comes on-line.

Datatype Channel requires that each channel will contain the same type of data.  Thereby when the receiver obtains a message from the channel it will always be the same type.  This can be accomplished with directly with WCF (the Service Bus can also be included by hosting the WCF service).  

In addition, an Azure Queue can also be utilized.  You can create a new queue for each type of message that will flow through the queue. 

Invalid Message Channel requires that a message that is deemed to be invalid be moved to a Invalid Message Channel where they will not be processed by the normal receiver.

The Azure Queue implements the dead letter queue concept and uses the $DeadLetterQueue subqueue to store invalid messages.  Messages are placed in the dead letter subqueue:

When a message expires and deadlettering for expired messages is set to true in a queue or subscription.
When the max delivery count for a message is exceeded on a queue or subscription.
When a filter evaluation exception occurs in a subscription and deadlettering is enabled on filter evaluation exceptions

To get to the dead letter queue you would use:

string queueDeadletterPath = QueueClient.FormatDeadLetterPath("<your queue name>");

the same behavior is available with Topics.  However, to get to the dead letter queue for a topic you would use:

string subscriptionDeadletterPath = SubscriptionClient.FormatDeadLetterPath("<Your Topic Name>", "<Subscription Name>");

Dead Letter Channel requires that if the system can’t deliver a message there is a mechanism to move the message to a dead letter channel.  This capability is handled like the Invalid Message Channel above.  However, check out this link from MSDN on the QueuePolicy.PoisonMessageDrop property.  It points to the prerelease documentation but it is interesting to read what is being planned. 

Guaranteed Delivery requires that the system persists the message so that it is not lost if there is a crash or outage.  In using the Service Bus Queues, you are ensured of guaranteed delivery as the queue uses SQL Azure as the backing store.

Channel Adapter requires that there is an interface so that any application can connect and be integrated with other applications.  The Service Bus Queues and Topics both provide a full API that can be utilized by a number of different technologies.

Messaging Bridge provides a connection between different messaging systems.  Essentially to transmit message that was intended for another messaging system.  This can be done through either the Service Bus Queues or Topics with the receiver getting the message and forwarding it to endpoints provided by the other messaging system.  In addition, the receiver can get the message and forward it on to on-premise messaging systems through WCF.

Message Bus provides a means for separate application to work together in a decoupled manner with the ability for the applications to be added or removed with no affect on any other applications.  The ability to be decoupled but still allowing the application to work together, share data and operate in a unified manner is the basis for this pattern.  This pattern can be easily implemented through the Azure Service Bus and WCF endpoints.  We will come back to this pattern again when we spend more time covering the integration components.

In this post I have talked about the Service Bus Queue, but there is still some confusion between the Windows Azure Queue and the Service Bus Queue.  Yes, there are two Queues and they are different and have different uses and purposes.  Everything I have been referring to here is in reference to the Service Bus Queue.  You can find more information on the differences between Azure Queues and Azure Service Bus Queues in this article – Windows Azure Queues and Windows Azure Service Bus Queues – Compared and Contrasted.

Lastly, in talking about Gregor Hohpe and his patterns book, there are a number of diagrams that are utilized that represent the patterns.  These diagrams can be downloaded and used in Visio which you can find at http://www.eaipatterns.com/downloads.html

Blog Post by: Stephen Kaufman

Windows Azure Service Bus Scatter-Gather Implementation

One of the more challenging enterprise integration patterns that developers may wish to implement is the Scatter-Gather pattern. In this article I will show the basic implementation of a scatter-gather pattern using the topic-subscription model of the windows azure service bus. I’ll be using the implementation in demos, and also as a lab in my training courses, and the pattern will also be included in the next release of my free e-book the “Windows Azure Service Bus Developer Guide”.

The Scatter-Gather pattern answers the following scenario.

How do you maintain the overall message flow when a message needs to be sent to multiple recipients, each of which may send a reply?

Use a Scatter-Gather that broadcasts a message to multiple recipients and re-aggregates the responses back into a single message.

The Enterprise Integration Patterns website provides a description of the Scatter-Gather pattern here.

The scatter-gather pattern uses a composite of the publish-subscribe channel pattern and the aggregator pattern. The publish-subscribe channel is used to broadcast messages to a number of receivers, and the aggregator is used to gather the response messages and aggregate them together to form a single message.

Scatter-Gather Scenario

The scenario for this scatter-gather implementation is an application that allows users to answer questions in a poll based voting scenario. A poll manager application will be used to broadcast questions to users, the users will use a voting application that will receive and display the questions and send the votes back to the poll manager. The poll manager application will receive the users’ votes and aggregate them together to display the results. The scenario should be able to scale to support a large number of users.

Scatter-Gather Implementation

The diagram below shows the overall architecture for the scatter-gather implementation.

Messaging Entities

Looking at the scatter-gather pattern diagram it can be seen that the topic-subscription architecture is well suited for broadcasting a message to a number of subscribers. The poll manager application can send the question messages to a topic, and each voting application can receive the question message on its own subscription. The static limit of 2,000 subscriptions per topic in the current release means that 2,000 voting applications can receive question messages and take part in voting.

The vote messages can then be sent to the poll manager application using a queue. The voting applications will send their vote messages to the queue, and the poll manager will receive and process the vote messages.

The questions topic and answer queue are created using the Windows Azure Developer Portal. Each instance of the voting application will create its own subscription in the questions topic when it starts, allowing the question messages to be broadcast to all subscribing voting applications.

Data Contracts

Two simple data contracts will be used to serialize the questions and votes as brokered messages. The code for these is shown below.

[DataContract]

public class Question

{

[DataMember]

public string QuestionText { get; set; }

}

To keep the implementation of the voting functionality simple and focus on the pattern implementation, the users can only vote yes or no to the questions.

[DataContract]

public class Vote

{

[DataMember]

public string QuestionText { get; set; }

[DataMember]

public bool IsYes { get; set; }

}

Poll Manager Application

The poll manager application has been implemented as a simple WPF application; the user interface is shown below.

A question can be entered in the text box, and sent to the topic by clicking the Add button. The topic and subscriptions used for broadcasting the messages are shown in a TreeView control. The questions that have been broadcast and the resulting votes are shown in a ListView control.

When the application is started any existing subscriptions are cleared form the topic, clients are then created for the questions topic and votes queue, along with background workers for receiving and processing the vote messages, and updating the display of subscriptions.

public MainWindow()

{

InitializeComponent();

// Create a new results list and data bind it.

Results = new ObservableCollection<Result>();

lsvResults.ItemsSource = Results;

// Create a token provider with the relevant credentials.

TokenProvider credentials =

TokenProvider.CreateSharedSecretTokenProvider

(AccountDetails.Name, AccountDetails.Key);

// Create a URI for the serivce bus.

Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

(“sb”, AccountDetails.Namespace, string.Empty);

// Clear out any old subscriptions.

NamespaceManager = new NamespaceManager(serviceBusUri, credentials);

IEnumerable<SubscriptionDescription> subs =

NamespaceManager.GetSubscriptions(AccountDetails.ScatterGatherTopic);

foreach (SubscriptionDescription sub in subs)

{

NamespaceManager.DeleteSubscription(sub.TopicPath, sub.Name);

}

// Create the MessagingFactory

MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

// Create the topic and queue clients.

ScatterGatherTopicClient =

factory.CreateTopicClient(AccountDetails.ScatterGatherTopic);

ScatterGatherQueueClient =

factory.CreateQueueClient(AccountDetails.ScatterGatherQueue);

// Start the background worker threads.

VotesBackgroundWorker = new BackgroundWorker();

VotesBackgroundWorker.DoWork += new DoWorkEventHandler(ReceiveMessages);

VotesBackgroundWorker.RunWorkerAsync();

SubscriptionsBackgroundWorker = new BackgroundWorker();

SubscriptionsBackgroundWorker.DoWork += new DoWorkEventHandler(UpdateSubscriptions);

SubscriptionsBackgroundWorker.RunWorkerAsync();

}

When the poll manager user nters a question in the text box and clicks the Add button a question message is created and sent to the topic. This message will be broadcast to all the subscribing voting applications. An instance of the Result class is also created to keep track of the votes cast, this is then added to an observable collection named Results, which is data-bound to the ListView control.

private void btnAddQuestion_Click(object sender, RoutedEventArgs e)

{

// Create a new result for recording votes.

Result result = new Result()

{

Question = txtQuestion.Text

};

Results.Add(result);

// Send the question to the topic

Question question = new Question()

{

QuestionText = result.Question

};

BrokeredMessage msg = new BrokeredMessage(question);

ScatterGatherTopicClient.Send(msg);

txtQuestion.Text = “”;

}

The Results class is implemented as follows.

public class Result : INotifyPropertyChanged

{

public string Question { get; set; }

private int m_YesVotes;

private int m_NoVotes;

public event PropertyChangedEventHandler PropertyChanged;

public int YesVotes

{

get { return m_YesVotes; }

set

{

m_YesVotes = value;

NotifyPropertyChanged(“YesVotes”);

}

}

public int NoVotes

{

get { return m_NoVotes; }

set

{

m_NoVotes = value;

NotifyPropertyChanged(“NoVotes”);

}

}

private void NotifyPropertyChanged(string prop)

{

if(PropertyChanged != null)

{

PropertyChanged(this, new PropertyChangedEventArgs(prop));

}

}

}

The INotifyPropertyChanged interface is implemented so that changes to the number of yes and no votes will be updated in the ListView control.

Receiving the vote messages from the voting applications is done asynchronously, using a background worker thread.

// This runs on a background worker.

private void ReceiveMessages(object sender, DoWorkEventArgs e)

{

while (true)

{

// Receive a vote message from the queue

BrokeredMessage msg = ScatterGatherQueueClient.Receive();

if (msg != null)

{

// Deserialize the message.

Vote vote = msg.GetBody<Vote>();

// Update the results.

foreach (Result result in Results)

{

if (result.Question.Equals(vote.QuestionText))

{

if (vote.IsYes)

{

result.YesVotes++;

}

else

{

result.NoVotes++;

}

break;

}

}

// Mark the message as complete.

msg.Complete();

}

}

}

When a vote message is received, the result that matches the vote question is updated with the vote from the user. The message is then marked as complete.

A second background thread is used to update the display of subscriptions in the TreeView, with a dispatcher used to update the user interface.

// This runs on a background worker.

private void UpdateSubscriptions(object sender, DoWorkEventArgs e)

{

while (true)

{

// Get a list of subscriptions.

IEnumerable<SubscriptionDescription> subscriptions =

NamespaceManager.GetSubscriptions(AccountDetails.ScatterGatherTopic);

// Update the user interface.

SimpleDelegate setQuestion = delegate()

{

trvSubscriptions.Items.Clear();

TreeViewItem topicItem = new TreeViewItem()

{

Header = AccountDetails.ScatterGatherTopic

};

foreach (SubscriptionDescription subscription in subscriptions)

{

TreeViewItem subscriptionItem = new TreeViewItem()

{

Header = subscription.Name

};

topicItem.Items.Add(subscriptionItem);

}

trvSubscriptions.Items.Add(topicItem);

topicItem.ExpandSubtree();

};

this.Dispatcher.BeginInvoke(DispatcherPriority.Send, setQuestion);

Thread.Sleep(3000);

}

}

Voting Application

The voting application is implemented as another WPF application. This one is more basic, and allows the user to vote “Yes” or “No” for the questions sent by the poll manager application. The user interface for that application is shown below.

When an instance of the voting application is created it will create a subscription in the questions topic using a GUID as the subscription name. The application can then receive copies of every question message that is sent to the topic.

Clients for the new subscription and the votes queue are created, along with a background worker to receive the question messages. The voting application is set to receiving mode, meaning it is ready to receive a question message from the subscription.

public MainWindow()

{

InitializeComponent();

// Set the mode to receiving.

IsReceiving = true;

// Create a token provider with the relevant credentials.

TokenProvider credentials =

TokenProvider.CreateSharedSecretTokenProvider

(AccountDetails.Name, AccountDetails.Key);

// Create a URI for the serivce bus.

Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri

(“sb”, AccountDetails.Namespace, string.Empty);

// Create the MessagingFactory

MessagingFactory factory = MessagingFactory.Create(serviceBusUri, credentials);

// Create a subcription for this instance

NamespaceManager mgr = new NamespaceManager(serviceBusUri, credentials);

string subscriptionName = Guid.NewGuid().ToString();

mgr.CreateSubscription(AccountDetails.ScatterGatherTopic, subscriptionName);

// Create the subscription and queue clients.

ScatterGatherSubscriptionClient = factory.CreateSubscriptionClient

(AccountDetails.ScatterGatherTopic, subscriptionName);

ScatterGatherQueueClient =

factory.CreateQueueClient(AccountDetails.ScatterGatherQueue);

// Start the background worker thread.

BackgroundWorker = new BackgroundWorker();

BackgroundWorker.DoWork += new DoWorkEventHandler(ReceiveMessages);

BackgroundWorker.RunWorkerAsync();

}

I took the inspiration for creating the subscriptions in the voting application from the chat application that uses topics and subscriptions blogged by Ovais Akhter here.

The method that receives the question messages runs on a background thread. If the application is in receive mode, a question message will be received from the subscription, the question will be displayed in the user interface, the voting buttons enabled, and IsReceiving set to false to prevent more questing from being received before the current one is answered.

// This runs on a background worker.

private void ReceiveMessages(object sender, DoWorkEventArgs e)

{

while (true)

{

if (IsReceiving)

{

// Receive a question message from the topic.

BrokeredMessage msg = ScatterGatherSubscriptionClient.Receive();

if (msg != null)

{

// Deserialize the message.

Question question = msg.GetBody<Question>();

// Update the user interface.

SimpleDelegate setQuestion = delegate()

{

lblQuestion.Content = question.QuestionText;

btnYes.IsEnabled = true;

btnNo.IsEnabled = true;

};

this.Dispatcher.BeginInvoke(DispatcherPriority.Send, setQuestion);

IsReceiving = false;

// Mark the message as complete.

msg.Complete();

}

}

else

{

Thread.Sleep(1000);

}

}

}

When the user clicks on the Yes or No button, the btnVote_Click method is called. This will create a new Vote data contract with the appropriate question and answer and send the message to the poll manager application using the votes queue. The user voting buttons are then disabled, the question text cleared, and the IsReceiving flag set to true to allow a new message to be received.

private void btnVote_Click(object sender, RoutedEventArgs e)

{

// Create a new vote.

Vote vote = new Vote()

{

QuestionText = (string)lblQuestion.Content,

IsYes = ((sender as Button).Content as string).Equals(“Yes”)

};

// Send the vote message.

BrokeredMessage msg = new BrokeredMessage(vote);

ScatterGatherQueueClient.Send(msg);

// Update the user interface.

lblQuestion.Content = “”;

btnYes.IsEnabled = false;

btnNo.IsEnabled = false;

IsReceiving = true;

}

Testing the Application

In order to test the application, an instance of the poll manager application is started; the user interface is shown below.

As no instances of the voting application have been created there are no subscriptions present in the topic. When an instance of the voting application is created the subscription will be displayed in the poll manager.

Now that a voting application is subscribing, a questing can be sent from the poll manager application. When the message is sent to the topic, the voting application will receive the message and display the question.

The voter can then answer the question by clicking on the appropriate button. The results of the vote are updated in the poll manager application.

When two more instances of the voting application are created, the poll manager will display the new subscriptions. More questions can then be broadcast to the voting applications.

As the question messages are queued up in the subscription for each voting application, the users can answer the questions in their own time. The vote messages will be received by the poll manager application and aggregated to display the results. The screenshots of the applications part way through voting are shown below.

The messages for each voting application are queued up in sequence on the voting application subscriptions, allowing the questions to be answered at different speeds by the voters.

WCF-SQL Adapter and permissions

WCF-SQL Adapter and permissions

If you are using a WCF SQL adapter in your solution, it is imperitive to set permissions on the stored procedures you call from within BizTalk. However, if you havent set permissions, the error message can be a little misleading. The adapter failed to transmit message going to send port “WcfSendPort_SqlAdapterBinding_TypedProcedures_MyDB” with URL “mssql://MYSRVSQ01//MyDB?”. It […]
Blog Post by: DipeshA