Versioning long running Workflow Services in WF4

One of the problems with the current version of Windows Workflow Foundation is how to handle different versions of your workflows. With short running workflows this is no big deal, workflows do whatever they are supposed to do and finish, and you can deploy a newer updated version of your XAMLX files whenever you want. Provided the public facing SOAP interface doesn’t change no one will notice a difference and everything will work just fine.

However as soon as we get into long running workflows and the SQL Workflow Instance Store things get quite a bit more complicated. As soon as you add or remove activities from your XAMLX, the workflow service definition, you can no longer load any of the workflow instances currently saved in the SQL Workflow Instance Store. This is a bit of a problem because it would mean you would either have to wait until all workflows are finished before upgrading you workflow definition or you would have to abort all running instances, neither is an acceptable solution in most cases.

 

How workflow data is stored

The SQL Workflow Instance Store keeps track of the WCF address used to start a workflow and stores that along with the actual workflow state. It uses this data to differentiate between different workflow service definition. And this can actually help us fixing our versioning problem, just leave the existing workflow definition as is and create a new one alongside it with the new definition.

So this solves the problem of separating the state of each workflow version but means that the client application needs to be updated each time a new version of the workflow service is deployed. Not only that but the client needs to keep track of which workflow was started using which service and send each future requests to the same address. This puts a big extra burden on our client app and that is something we don’t want.

 

The WCF 4 RoutingService to the rescue

We can solve this problem by adding the WCF RoutingService, a new .NET 4 feature, to the mix. In this case the client only talks to the routing service and the routing service is aware of each workflow service version and knows how to route the request to the correct address. This way the client never knows when new workflow services are created, all it knows about is the WCF RoutingService address.

 

So how does the WCF RoutingService where to send messages?

There are several ways this can be done but the easiest is to have the workflow service return a version number from the initial request that started the workflow. This version number is also a required argument for each subsequent request into the workflow. The WCF RoutingService can now use this version part of the message, or the lack thereof, to determine where to route the message. If there is no version information the message is always routed to the last version of the workflow service so new instance requests as well as WSDL requests are always send to the most recent version.

 

So does this solve all our problems?

Unfortunately not. This will solve the problem of updating the workflow service definition and keeping the different versions apart it does keep the workflow already running with their old definition. And that might be exactly what you want in some cases but if there is a bug in the existing definition you still can’t fix that. And that is a problem that can’t really be solved properly with WF4. This feature has been promised at the 2010 PDC for the next version of .NET but that doesn’t help us now.

 

Enjoy!

 

www.TheProblemSolver.nl
Wiki.WindowsWorkflowFoundation.eu

BizTalk EDI party management (C#)

A couple of weeks ago, I had to create a custom EDI party trough code.
I noticed that the information provided on this topic was rather small and that other people were experiencing the same problem as well.
After some research I did on this topic, I discovered it’s actually rather easy to do these kinds of modifications through code.
Below you can find an example on how to create and modify your EDI Party.

Prerequisites

You will need to add references to following assemblies in order to create and modify an EDI Party trough code.

These assemblies can be found in the BizTalk installation folder.

Add new Party

The first step is to add a new party to your BizTalk Group.
Below is an example how to add it through code, but it’s perfectly possible to add this through the BizTalk Administration Console.

Modify the Party Settings

The party you just created will be used to create a new partner.
After that you can tweak and modify every setting needed for your Party/Partner.
There are tons of possibilities to adjust, but below you can see some basic modifications I did.

The result

When we open the BizTalk Administration console, we can view the result.

The source code of my example can be found here.
EDIPartyManagement.zip (18.43 kb)

Glenn Colpaert, CODit

AS2 Certificates

I deal quite a lot with AS2 and BizTalk. I sometimes lose track of the different types of files when dealing with the certificates. I found a nice description of all the file types and a nice little converter online.

Here is the good description:

PEM Format
The PEM format is the most common format that Certificate Authorities issue certificates in. PEM certificates usually have extentions such as .pem, .crt, .cer, and .key. They are Base64 encoded ASCII files and contain “—–BEGIN CERTIFICATE—–” and “—–END CERTIFICATE—–” statements. Server certificates, intermediate certificates, and private keys can all be put into the PEM format.

Apache and other similar servers use PEM format certificates. Several PEM certificates, and even the private key, can be included in one file, one below the other, but most platforms, such as Apache, expect the certificates and private key to be in separate files.

DER Format
The DER format is simply a binary form of a certificate instead of the ASCII PEM format. It sometimes has a file extension of .der but it often has a file extension of .cer so the only way to tell the difference between a DER .cer file and a PEM .cer file is to open it in a text editor and look for the BEGIN/END statements. All types of certificates and private keys can be encoded in DER format. DER is typically used with Java platforms. The SSL Converter can only convert certificates to DER format. If you need to convert a private key to DER, please use the OpenSSL commands on this page.

PKCS#7/P7B Format
The PKCS#7 or P7B format is usually stored in Base64 ASCII format and has a file extention of .p7b or .p7c. P7B certificates contain “—–BEGIN PKCS7—–” and “—–END PKCS7—–” statements. A P7B file only contains certificates and chain certificates, not the private key. Several platforms support P7B files including Microsoft Windows and Java Tomcat.

PKCS#12/PFX Format
The PKCS#12 or PFX format is a binary format for storing the server certificate, any intermediate certificates, and the private key in one encryptable file. PFX files usually have extensions such as .pfx and .p12. PFX files are typically used on Windows machines to import and export certificates and private keys.

When converting a PFX file to PEM format, OpenSSL will put all the certificates and the private key into a single file. You will need to open the file in a text editor and copy each certificate and private key (including the BEGIN/END statments) to its own individual text file and save them as certificate.cer, CACert.cer, and privateKey.key respectively.

OpenSSL Commands to Convert SSL Certificates on Your Machine
It is highly recommended that you convert to and from .pfx files on your own machine using OpenSSL so you can keep the private key there. Use the following OpenSSL commands to convert SSL certificate to different formats on your own machine:

OpenSSL Convert PEM
Convert PEM to DER

openssl x509 -outform der -in certificate.pem -out certificate.der

Convert PEM to P7B

openssl crl2pkcs7 -nocrl -certfile certificate.cer -out certificate.p7b -certfile CACert.cer

Convert PEM to PFX

openssl pkcs12 -export -out certificate.pfx -inkey privateKey.key -in certificate.crt -certfile CACert.crt

OpenSSL Convert DER
Convert DER to PEM

openssl x509 -inform der -in certificate.cer -out certificate.pem

OpenSSL Convert P7B
Convert P7B to PEM

openssl pkcs7 -print_certs -in certificate.p7b -out certificate.cer

Convert P7B to PFX

openssl pkcs7 -print_certs -in certificate.p7b -out certificate.cer

openssl pkcs12 -export -in certificate.cer -inkey privateKey.key -out certificate.pfx -certfile CACert.cer

OpenSSL Convert PFX
Convert PFX to PEM

openssl pkcs12 -in certificate.pfx -out certificate.cer -nodes

Here is their little certificate converter:

https://www.sslshopper.com/ssl-converter.html

StreamInsight Query Pattern: Find the top category (using Order By, Take and ApplyWithUnion)

This is a question that has come up in a few customer engagements, and on several forum posts – how do I find the top category in a stream for a given group?  Several examples of this pattern of question are:

  • Given a stream of Product View events from a website, each containing a user id and product category
    • How do I determine the most popular category for each user in the past 8 hours?
    • How do I determine the top 2 most popular categories in the past 8 hours?
  • Given a stream of Sensor Data events from a process data collection system, each containing a sensor id and a sensor reading (for example, a power meter reading)
    • How do I determine the highest reading for each meter in the past hour?
    • How do I determine the meters with the three highest readings in the past hour?

Writing a StreamInsight query to answer this question typically follows one of two patterns:

  • Top X by grouping
  • Top X

Note: all of the source code, sample data, queries etc for this blog post may be found here.  Please download and follow along.

Find Top X

The 2-step process for finding the Top X is detailed below.  For the purposes of this example, we’ll be using the question How do I determine the top 2 most popular categories in the past 8 hours with some sample data and results.

  • Create the initial aggregate. In the case of the product view question, this would be a window of events containing count by category in the past 8 hours.
  • Order by, Take X.  Sort the initial aggregates and take the top 2 ranked results.

Ok, sounds reasonable – what does this look like in terms of a StreamInsight query?

Code Snippet
  1. // Determine the page view count (by category)
  2. // in the past eight hours
  3. var categoryCount = from e in pageViews
  4. group e by e.SkuCategory into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. SkuCategory = skuGroups.Key,
  11. Count = win.Count()
  12. };

This query creates the initial aggregates, being the window of time (start time / end time) and a set of events containing the SKU category and count.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM  Zune 1  
User Counts 12:00:00 AM 8:00:00 AM  XBox 5  
User Counts 12:00:00 AM 8:00:00 AM  DVD 6  

Then we’ll take a snapshot of the initial aggregates window, sort by the count and take the top 2 results.

Code Snippet
  1. // From the output of categoryCount, order by the Count
  2. // field and take the top 2 ranked events
  3. var topCategories = (from win in categoryCount
  4. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  5. from e in win
  6. orderby e.Count descending
  7. select e).Take(2);

Resulting in the answer to our question:

 Start Time  End Time  SkuCategory  Count
12:00:00 AM 8:00:00 AM  XBox 5
12:00:00 AM 8:00:00 AM  DVD 6

Putting it all together:

Code Snippet
  1. // Determine the page view count (by category)
  2. // in the past eight hours
  3. var categoryCount = from e in pageViews
  4. group e by e.SkuCategory into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. SkuCategory = skuGroups.Key,
  11. Count = win.Count()
  12. };
  13.  
  14. // From the output of categoryCount, order by the Count
  15. // field and take the top 2 ranked events
  16. var topCategories = (from win in categoryCount
  17. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  18. from e in win
  19. orderby e.Count descending
  20. select e).Take(2);

Find Top X by Grouping

The 3-step process for finding the Top X by Grouping is detailed below.  For the purposes of this example, we’ll be using the question How do I determine the most popular category for each user in the past 8 hours with some sample data and results.

  • Create the initial aggregate. In the case of the product view question, this would be a window of events containing count by category and user id in the past 8 hours.
  • Group the initial aggregates.  In order to determine the top X by group, we need to re-group the aggregate results.  In the case of the question How do I determine the most popular category for each user in the past 8 hours we will need to group by user before sorting and selecting the top X.
  • Order by, Take X.  Once we have our sub-groups created, we’ll sort each of them and take the top X ranked results.

Ok, sounds reasonable – what does this look like in terms of a StreamInsight query?

Code Snippet
  1. // Determine the page view count (by user and by category)
  2. // in the past eight hours
  3. var categoryCountByUser = from e in pageViews
  4. group e by new { e.UserId, e.SkuCategory } into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. UserId = skuGroups.Key.UserId,
  11. SkuCategory = skuGroups.Key.SkuCategory,
  12. Count = win.Count()
  13. };

This query creates the initial aggregates, being the window of time (start time / end time) and a set of events containing the user Id, SKU category and count.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 1  XBox 1
User Counts 12:00:00 AM 8:00:00 AM 1  Zune 1
User Counts 12:00:00 AM 8:00:00 AM 2  DVD 2
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

Next we’ll re-group this window of events before performing the order by / take X:

Code Snippet
  1. // Take these events and re-group them into user groups
  2. var userGroups = from e in categoryCountByUser
  3. group e by e.UserId;

This logically splits the window of events up into groups based on the user ID.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 1  XBox 1
User Counts 12:00:00 AM 8:00:00 AM 1  Zune 1
           
User Counts 12:00:00 AM 8:00:00 AM 2  DVD 2
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

Finally, we use the ApplyWithUnion operator to perform an order by / take operation on each individual group, then union the results back together.

Code Snippet
  1. // (a) Apply this query to each group
  2. var topCategoriesByUser = userGroups.ApplyWithUnion(
  3. // (b) Create a snapshot window over the results of the previous tumbling window
  4. applyIn => (from win in applyIn
  5. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  6. // (c) for the events in the window
  7. from e in win
  8. // (d) sort by the count field, largest to smallest
  9. orderby e.Count descending
  10. // (e) take the top ranked event(s)
  11. select e).Take(1),
  12. // (f) the new event type will consist of the grouping key (user id),
  13. // the sku category and the event count
  14. e => new { UserId = e.Key, SkuCategory = e.Payload.SkuCategory,
  15. Count = e.Payload.Count } );

Resulting in the answer to our question:

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

This syntax can be a little overwhelming at first glance, so let’s break it down and examine what’s going on.

  • (a) Apply this query to each group.  The ApplyWithUnion operator performs the ’apply’ query to each group, then unions the results back into a single stream.  This can be thought of as the StreamInsight equivalent to foreach (var g in group) { do stuff }
  • (b) Create a snapshot window over the results of the previous tumbling window.  We use windows when we want to look at sets of events (as defined by a window of time).  In this case, we want to look at each window of events from the Tumbling window in the previous query (which has been grouped by user id) to perform some sorting.
  • (c) For the events in the window, (d) sort by the count field, largest to smallest.  Here we look at the events in the group, and sort them by the e.Count field in descending order.
  • (e)  Take the top ranked event(s).  Remember that Take doesn’t mean take the first event, it means take the first rank of events.  For example, if both DVD and XBox views happened to be four (4) for a given user in the same time window, which one of them is “largest”.  Since they would both occupy the top rank, both events would be returned from the query. 
    • It would be the responsibility of the consuming system to determine how to handle situations where more than one event occupies the top rank (and depending on the business logic could be as simple as taking the first event, or displaying both, etc).
  • ( f ) The new event type will consist of the grouping key (user id), sku category and event count.  The events emitted from the group and apply branch have two fields – Key and Payload.  Attempting to pass the event directly out of the stream would result in passing a nested event, causing an exception, hence the e => new {} projection.

Putting it all together:

Code Snippet
  1. // Determine the page view count (by user and by category)
  2. // in the past eight hours
  3. var categoryCountByUser = from e in pageViews
  4. group e by new { e.UserId, e.SkuCategory } into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. UserId = skuGroups.Key.UserId,
  11. SkuCategory = skuGroups.Key.SkuCategory,
  12. Count = win.Count()
  13. };
  14.  
  15. // Take these events and re-group them into user groups
  16. var userGroups = from e in categoryCountByUser
  17. group e by e.UserId;
  18.  
  19. // Determine the top ranked category for each user
  20. var topCategoriesByUser = userGroups.ApplyWithUnion(
  21. applyIn => (from win in applyIn
  22. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  23. from e in win
  24. orderby e.Count descending
  25. select e).Take(1),
  26. e => new { UserId = e.Key, SkuCategory = e.Payload.SkuCategory,
  27. Count = e.Payload.Count } );

Summary

StreamInsight provides rich capabilities for ranking (TopK) events within a window (or groups within a window) by using the OrderBy and TopK (Take) methods.  These can be used, in conjunction with the appropriate windows and aggregates to find trends, “hot” patterns and other interesting occurrences within event streams.

Key References

Using Realistic Security For Sending and Listening to The AppFabric Service Bus

Using Realistic Security For Sending and Listening to The AppFabric Service Bus

I can’t think of any demonstration of the Windows Azure platform AppFabric Service Bus that didn’t show authenticating to the endpoints using the default “owner” account. At the same time, I can’t imagine anyone wanting to do this in real life. In this post, I’ll show you how you should probably define the proper permissions […]

Windows Azure Platform Introductory Special Extended to March 31, 2011

Good news – the Windows Azure Platform Introductory Special offer (which includes the SQL Azure Free Trial) has been extended through March 31, 2011!  This promotional offer enables you to try a limited amount of the Windows Azure platform at no charge. The subscription includes a base level of monthly Windows Azure compute hours, storage, data transfers, AppFabric Access Control transactions, AppFabric Service Bus connections, and a SQL Azure Database, at no charge.

 Included each month at no charge:

  • Windows Azure
    • 25 hours of a small compute instance
    • 500 MB of storage
    • 10,000 storage transactions
    • Windows Azure AppFabric
      • 100,000 Access Control transactions
      • 2 Service Bus connections
  • SQL Azure
    • 1GB Web Edition database (available for first 3 months only)
  • Data Transfers (per region)
    • 500 MB in
    • 500 MB out

 Any monthly usage in excess of the above amounts will be charged at the standard rates. This Introductory Special offer will end on March 31, 2011 and all usage will then be charged at the standard rates.

 Please visit http://www.microsoft.com/windowsazure/offers/ to see additional details on the Introductory Special as well as other offers currently available for the Windows Azure platform.

BizTalk SMTP adapter: Attaching multiple messages to an email as attachments.

I had a requirement to consume .PDFs and attach all of them that arrived within 30 minutes to an email with the PDFs as attachments. I created a multi part message that I named msg_Email of type System.Xml.XmlDocument. I created the message using a Construct Message shape. I then loaded the body of the message using the .LoadXML() method. I then used a helper class named EmailAttacher to attach the new messages needing to be attachments to the existing multi part email message. Here is the helper class code.

namespace EmailAttacher
{
public class MessageHelper
{
public static void AddAttachment(XLANGMessage destination, XLANGMessage attachment, string filename)
{
try
{
int count = destination.Count;
destination.AddPart(attachment[0], string.Format(“Attachment_{0}”, count));
destination[count].SetPartProperty(typeof(MIME.FileName), filename);
}
finally
{
//decrement reference count
destination.Dispose();
attachment.Dispose();
}
}

}

The message assignment shape has the following code to add the newest message to the existing message.

EmailAttacher.MessageHelper.AddAttachment(msg_Email, msg_ToAddAsAttachment,”NameOfAttachedFile”);

Don’t forget to add this, to ensure all message parts that aren’t the body are attachments.

msg_Email(SMTP.MessagePartsAttachments) = 2;

StreamInsight: Obscure LINQ error – Stream other than apply input stream is cannot be referenced inside apply branch

Another little LINQ error you might encounter from time to time.  Ran into this yesterday while building out some queries, and figured it was worth a quick post.  Starting with a basic stream, I needed to group by a set of fields in the stream and calculate some basic aggregates.

Code Snippet
  1. // This query calculates the sum of all sensor values
  2. //for each sensor
  3. //for each 5 seconds worth of data.
  4. var query = from e in inputStream
  5. group e by e.SensorId into sensorGroups
  6. from window in sensorGroups.TumblingWindow(
  7. TimeSpan.FromSeconds(5),
  8. HoppingWindowOutputPolicy.ClipToWindowEnd)
  9. select new
  10. {
  11. SensorId = sensorGroups.Key,
  12. Sum = window.Sum(e => e.Value)
  13. };

Running this throws the error:

Microsoft.ComplexEventProcessing.Linq.QueryGenerationException was unhandled by user code  
Message=Stream other than apply input stream is cannot be referenced inside apply branch. The 
following expression is not supported: 

'sensorGroups => CreateAdapterStream("input", 
StreamInsight.Samples.Adapters.SimpleTextFileReader.TextFileReaderFactory, value
(StreamInsight.Samples.Adapters.SimpleTextFileReader.TextFileReaderConfig), Point, value
(Microsoft.ComplexEventProcessing.Linq.CepStreamCreationContext)).TumblingWindow(FromSeconds(5), 
HoppingWindowOutputPolicy.ClipToWindowEnd)'.

See the subtle yet annoyingly obvious after the fact mistake I made?  I grouped by sensorGroups, but windowed over inputStream.  Fix this to use the same stream for the window and the group resolves the error.

Code Snippet
  1. var query = from e in inputStream
  2. group e by e.SensorId into sensorGroups
  3. from window in sensorGroups.TumblingWindow(
  4. TimeSpan.FromSeconds(5),
  5. HoppingWindowOutputPolicy.ClipToWindowEnd)
  6. select new
  7. {
  8. SensorId = sensorGroups.Key,
  9. Sum = window.Sum(e => e.Value)
  10. };

C# Method to match files to filemask

Every once in a while I need a little C# method to do some dirty work. Here is an example of how to use Regular Expressions to match Windows style filemask matching.

private bool FitsMask(string fileName, string fileMask)
{
string pattern =
‘^’ +
System.Text.RegularExpressions.Regex.Escape(fileMask.Replace(“.”, “__DOT__”)
.Replace(“*”, “__STAR__”)
.Replace(“?”, “__QM__”))
.Replace(“__DOT__”, “[.]”)
.Replace(“__STAR__”, “.*”)
.Replace(“__QM__”, “.”)
+ ‘$’;
return new System.Text.RegularExpressions.Regex(pattern, System.Text.RegularExpressions.RegexOptions.IgnoreCase).IsMatch(fileName);
}