StreamInsight Query Pattern: Find the top category (using Order By, Take and ApplyWithUnion)

This is a question that has come up in a few customer engagements, and on several forum posts – how do I find the top category in a stream for a given group?  Several examples of this pattern of question are:

  • Given a stream of Product View events from a website, each containing a user id and product category
    • How do I determine the most popular category for each user in the past 8 hours?
    • How do I determine the top 2 most popular categories in the past 8 hours?
  • Given a stream of Sensor Data events from a process data collection system, each containing a sensor id and a sensor reading (for example, a power meter reading)
    • How do I determine the highest reading for each meter in the past hour?
    • How do I determine the meters with the three highest readings in the past hour?

Writing a StreamInsight query to answer this question typically follows one of two patterns:

  • Top X by grouping
  • Top X

Note: all of the source code, sample data, queries etc for this blog post may be found here.  Please download and follow along.

Find Top X

The 2-step process for finding the Top X is detailed below.  For the purposes of this example, we’ll be using the question How do I determine the top 2 most popular categories in the past 8 hours with some sample data and results.

  • Create the initial aggregate. In the case of the product view question, this would be a window of events containing count by category in the past 8 hours.
  • Order by, Take X.  Sort the initial aggregates and take the top 2 ranked results.

Ok, sounds reasonable – what does this look like in terms of a StreamInsight query?

Code Snippet
  1. // Determine the page view count (by category)
  2. // in the past eight hours
  3. var categoryCount = from e in pageViews
  4. group e by e.SkuCategory into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. SkuCategory = skuGroups.Key,
  11. Count = win.Count()
  12. };

This query creates the initial aggregates, being the window of time (start time / end time) and a set of events containing the SKU category and count.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM  Zune 1  
User Counts 12:00:00 AM 8:00:00 AM  XBox 5  
User Counts 12:00:00 AM 8:00:00 AM  DVD 6  

Then we’ll take a snapshot of the initial aggregates window, sort by the count and take the top 2 results.

Code Snippet
  1. // From the output of categoryCount, order by the Count
  2. // field and take the top 2 ranked events
  3. var topCategories = (from win in categoryCount
  4. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  5. from e in win
  6. orderby e.Count descending
  7. select e).Take(2);

Resulting in the answer to our question:

 Start Time  End Time  SkuCategory  Count
12:00:00 AM 8:00:00 AM  XBox 5
12:00:00 AM 8:00:00 AM  DVD 6

Putting it all together:

Code Snippet
  1. // Determine the page view count (by category)
  2. // in the past eight hours
  3. var categoryCount = from e in pageViews
  4. group e by e.SkuCategory into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. SkuCategory = skuGroups.Key,
  11. Count = win.Count()
  12. };
  13.  
  14. // From the output of categoryCount, order by the Count
  15. // field and take the top 2 ranked events
  16. var topCategories = (from win in categoryCount
  17. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  18. from e in win
  19. orderby e.Count descending
  20. select e).Take(2);

Find Top X by Grouping

The 3-step process for finding the Top X by Grouping is detailed below.  For the purposes of this example, we’ll be using the question How do I determine the most popular category for each user in the past 8 hours with some sample data and results.

  • Create the initial aggregate. In the case of the product view question, this would be a window of events containing count by category and user id in the past 8 hours.
  • Group the initial aggregates.  In order to determine the top X by group, we need to re-group the aggregate results.  In the case of the question How do I determine the most popular category for each user in the past 8 hours we will need to group by user before sorting and selecting the top X.
  • Order by, Take X.  Once we have our sub-groups created, we’ll sort each of them and take the top X ranked results.

Ok, sounds reasonable – what does this look like in terms of a StreamInsight query?

Code Snippet
  1. // Determine the page view count (by user and by category)
  2. // in the past eight hours
  3. var categoryCountByUser = from e in pageViews
  4. group e by new { e.UserId, e.SkuCategory } into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. UserId = skuGroups.Key.UserId,
  11. SkuCategory = skuGroups.Key.SkuCategory,
  12. Count = win.Count()
  13. };

This query creates the initial aggregates, being the window of time (start time / end time) and a set of events containing the user Id, SKU category and count.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 1  XBox 1
User Counts 12:00:00 AM 8:00:00 AM 1  Zune 1
User Counts 12:00:00 AM 8:00:00 AM 2  DVD 2
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

Next we’ll re-group this window of events before performing the order by / take X:

Code Snippet
  1. // Take these events and re-group them into user groups
  2. var userGroups = from e in categoryCountByUser
  3. group e by e.UserId;

This logically splits the window of events up into groups based on the user ID.

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 1  XBox 1
User Counts 12:00:00 AM 8:00:00 AM 1  Zune 1
           
User Counts 12:00:00 AM 8:00:00 AM 2  DVD 2
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

Finally, we use the ApplyWithUnion operator to perform an order by / take operation on each individual group, then union the results back together.

Code Snippet
  1. // (a) Apply this query to each group
  2. var topCategoriesByUser = userGroups.ApplyWithUnion(
  3. // (b) Create a snapshot window over the results of the previous tumbling window
  4. applyIn => (from win in applyIn
  5. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  6. // (c) for the events in the window
  7. from e in win
  8. // (d) sort by the count field, largest to smallest
  9. orderby e.Count descending
  10. // (e) take the top ranked event(s)
  11. select e).Take(1),
  12. // (f) the new event type will consist of the grouping key (user id),
  13. // the sku category and the event count
  14. e => new { UserId = e.Key, SkuCategory = e.Payload.SkuCategory,
  15. Count = e.Payload.Count } );

Resulting in the answer to our question:

User Counts  Start Time  End Time  UserId  SkuCategory  Count
User Counts 12:00:00 AM 8:00:00 AM 1  DVD 4
User Counts 12:00:00 AM 8:00:00 AM 2  XBox 4

This syntax can be a little overwhelming at first glance, so let’s break it down and examine what’s going on.

  • (a) Apply this query to each group.  The ApplyWithUnion operator performs the ’apply’ query to each group, then unions the results back into a single stream.  This can be thought of as the StreamInsight equivalent to foreach (var g in group) { do stuff }
  • (b) Create a snapshot window over the results of the previous tumbling window.  We use windows when we want to look at sets of events (as defined by a window of time).  In this case, we want to look at each window of events from the Tumbling window in the previous query (which has been grouped by user id) to perform some sorting.
  • (c) For the events in the window, (d) sort by the count field, largest to smallest.  Here we look at the events in the group, and sort them by the e.Count field in descending order.
  • (e)  Take the top ranked event(s).  Remember that Take doesn’t mean take the first event, it means take the first rank of events.  For example, if both DVD and XBox views happened to be four (4) for a given user in the same time window, which one of them is “largest”.  Since they would both occupy the top rank, both events would be returned from the query. 
    • It would be the responsibility of the consuming system to determine how to handle situations where more than one event occupies the top rank (and depending on the business logic could be as simple as taking the first event, or displaying both, etc).
  • ( f ) The new event type will consist of the grouping key (user id), sku category and event count.  The events emitted from the group and apply branch have two fields – Key and Payload.  Attempting to pass the event directly out of the stream would result in passing a nested event, causing an exception, hence the e => new {} projection.

Putting it all together:

Code Snippet
  1. // Determine the page view count (by user and by category)
  2. // in the past eight hours
  3. var categoryCountByUser = from e in pageViews
  4. group e by new { e.UserId, e.SkuCategory } into skuGroups
  5. from win in skuGroups.TumblingWindow(
  6. TimeSpan.FromHours(8),
  7. HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. UserId = skuGroups.Key.UserId,
  11. SkuCategory = skuGroups.Key.SkuCategory,
  12. Count = win.Count()
  13. };
  14.  
  15. // Take these events and re-group them into user groups
  16. var userGroups = from e in categoryCountByUser
  17. group e by e.UserId;
  18.  
  19. // Determine the top ranked category for each user
  20. var topCategoriesByUser = userGroups.ApplyWithUnion(
  21. applyIn => (from win in applyIn
  22. .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  23. from e in win
  24. orderby e.Count descending
  25. select e).Take(1),
  26. e => new { UserId = e.Key, SkuCategory = e.Payload.SkuCategory,
  27. Count = e.Payload.Count } );

Summary

StreamInsight provides rich capabilities for ranking (TopK) events within a window (or groups within a window) by using the OrderBy and TopK (Take) methods.  These can be used, in conjunction with the appropriate windows and aggregates to find trends, “hot” patterns and other interesting occurrences within event streams.

Key References

Using Realistic Security For Sending and Listening to The AppFabric Service Bus

Using Realistic Security For Sending and Listening to The AppFabric Service Bus

I can’t think of any demonstration of the Windows Azure platform AppFabric Service Bus that didn’t show authenticating to the endpoints using the default “owner” account. At the same time, I can’t imagine anyone wanting to do this in real life. In this post, I’ll show you how you should probably define the proper permissions […]

Windows Azure Platform Introductory Special Extended to March 31, 2011

Good news – the Windows Azure Platform Introductory Special offer (which includes the SQL Azure Free Trial) has been extended through March 31, 2011!  This promotional offer enables you to try a limited amount of the Windows Azure platform at no charge. The subscription includes a base level of monthly Windows Azure compute hours, storage, data transfers, AppFabric Access Control transactions, AppFabric Service Bus connections, and a SQL Azure Database, at no charge.

 Included each month at no charge:

  • Windows Azure
    • 25 hours of a small compute instance
    • 500 MB of storage
    • 10,000 storage transactions
    • Windows Azure AppFabric
      • 100,000 Access Control transactions
      • 2 Service Bus connections
  • SQL Azure
    • 1GB Web Edition database (available for first 3 months only)
  • Data Transfers (per region)
    • 500 MB in
    • 500 MB out

 Any monthly usage in excess of the above amounts will be charged at the standard rates. This Introductory Special offer will end on March 31, 2011 and all usage will then be charged at the standard rates.

 Please visit http://www.microsoft.com/windowsazure/offers/ to see additional details on the Introductory Special as well as other offers currently available for the Windows Azure platform.

BizTalk SMTP adapter: Attaching multiple messages to an email as attachments.

I had a requirement to consume .PDFs and attach all of them that arrived within 30 minutes to an email with the PDFs as attachments. I created a multi part message that I named msg_Email of type System.Xml.XmlDocument. I created the message using a Construct Message shape. I then loaded the body of the message using the .LoadXML() method. I then used a helper class named EmailAttacher to attach the new messages needing to be attachments to the existing multi part email message. Here is the helper class code.

namespace EmailAttacher
{
public class MessageHelper
{
public static void AddAttachment(XLANGMessage destination, XLANGMessage attachment, string filename)
{
try
{
int count = destination.Count;
destination.AddPart(attachment[0], string.Format(“Attachment_{0}”, count));
destination[count].SetPartProperty(typeof(MIME.FileName), filename);
}
finally
{
//decrement reference count
destination.Dispose();
attachment.Dispose();
}
}

}

The message assignment shape has the following code to add the newest message to the existing message.

EmailAttacher.MessageHelper.AddAttachment(msg_Email, msg_ToAddAsAttachment,”NameOfAttachedFile”);

Don’t forget to add this, to ensure all message parts that aren’t the body are attachments.

msg_Email(SMTP.MessagePartsAttachments) = 2;

StreamInsight: Obscure LINQ error – Stream other than apply input stream is cannot be referenced inside apply branch

Another little LINQ error you might encounter from time to time.  Ran into this yesterday while building out some queries, and figured it was worth a quick post.  Starting with a basic stream, I needed to group by a set of fields in the stream and calculate some basic aggregates.

Code Snippet
  1. // This query calculates the sum of all sensor values
  2. //for each sensor
  3. //for each 5 seconds worth of data.
  4. var query = from e in inputStream
  5. group e by e.SensorId into sensorGroups
  6. from window in sensorGroups.TumblingWindow(
  7. TimeSpan.FromSeconds(5),
  8. HoppingWindowOutputPolicy.ClipToWindowEnd)
  9. select new
  10. {
  11. SensorId = sensorGroups.Key,
  12. Sum = window.Sum(e => e.Value)
  13. };

Running this throws the error:

Microsoft.ComplexEventProcessing.Linq.QueryGenerationException was unhandled by user code  
Message=Stream other than apply input stream is cannot be referenced inside apply branch. The 
following expression is not supported: 

'sensorGroups => CreateAdapterStream("input", 
StreamInsight.Samples.Adapters.SimpleTextFileReader.TextFileReaderFactory, value
(StreamInsight.Samples.Adapters.SimpleTextFileReader.TextFileReaderConfig), Point, value
(Microsoft.ComplexEventProcessing.Linq.CepStreamCreationContext)).TumblingWindow(FromSeconds(5), 
HoppingWindowOutputPolicy.ClipToWindowEnd)'.

See the subtle yet annoyingly obvious after the fact mistake I made?  I grouped by sensorGroups, but windowed over inputStream.  Fix this to use the same stream for the window and the group resolves the error.

Code Snippet
  1. var query = from e in inputStream
  2. group e by e.SensorId into sensorGroups
  3. from window in sensorGroups.TumblingWindow(
  4. TimeSpan.FromSeconds(5),
  5. HoppingWindowOutputPolicy.ClipToWindowEnd)
  6. select new
  7. {
  8. SensorId = sensorGroups.Key,
  9. Sum = window.Sum(e => e.Value)
  10. };

C# Method to match files to filemask

Every once in a while I need a little C# method to do some dirty work. Here is an example of how to use Regular Expressions to match Windows style filemask matching.

private bool FitsMask(string fileName, string fileMask)
{
string pattern =
‘^’ +
System.Text.RegularExpressions.Regex.Escape(fileMask.Replace(“.”, “__DOT__”)
.Replace(“*”, “__STAR__”)
.Replace(“?”, “__QM__”))
.Replace(“__DOT__”, “[.]”)
.Replace(“__STAR__”, “.*”)
.Replace(“__QM__”, “.”)
+ ‘$’;
return new System.Text.RegularExpressions.Regex(pattern, System.Text.RegularExpressions.RegexOptions.IgnoreCase).IsMatch(fileName);
}

PDC10: Introduction to Windows Azure AppFabric Caching (CS60)

In addition to building the Composite Application keynote demo presented by James Conard and Bob Muglia, I presented a session on the new Windows Azure AppFabric Caching service that’s now available as a CTP release in the AppFabric LABS environment.  You can find the presentation here: http://player.microsoftpdc.com/Session/1f607983-c6eb-4d9f-b644-55247e8adda6 A few interesting notes on the Caching service: […]