Following in the great tradition of other learn through example LINQ example pages, such as 101 LINQ Samples, here’s the first in a series of posts on StreamInsight query examples. This will first show up on this blog, then get migrated over to a centralized site on the StreamInsight developer center. We’ve got a fairly hefty queue we’re working through, but if there are specific examples or query patterns of interest, please post them in the comments.
Each query pattern will consist of a description of the input set (.csv file samples for most of these, for ease of use), the full query and the output result. For more complicated query structures the query pattern will also be described visually, using a Visio stencil (to be described in future posts).
The full application framework that I use to run all of these queries, associate with the data sources, etc, is available here. Illustration of the techniques in this codebase will be left for a future post.
In this post:
- Filtering
- How do I filter a stream, keeping only specific events?
- How do I filter on multiple conditions?
- Windows and Aggregates
- How can I calculate aggregates, such as averages, over a set of events?
- How can I calculate the average of all events in the last 5 seconds?
- How can I calculate the average of all events in the past 5 seconds, every 2 seconds?
- How can I calculate the average of all events in the past 5 seconds, every time a new event arrives?
- How can I calculate the average of the past 5 events?
- How do I measure the rate of arriving events every two seconds?
- How can I calculate the average of each group of events in the last 10 seconds?
- How can I calculate the average of each group of events in the last 20 seconds, every 10 seconds?
- How can I calculate the average of each group of events in the past 10 seconds, every time a new event in that group arrives?
- How can I find the events with the largest value every 10 seconds?
- How can I find the events with the two smallest values in the last 10 seconds, every 5 seconds?
- How can I find the group of events with the highest average value in the last 10 seconds?
- How can I find the group of events with the highest maximum value in the last 10 seconds, every 5 seconds?
- How can I detect if two events with the same ID value are received within a specific time frame, such as a 5 minute window?
[Where] – How do I filter a stream, keeping only specific events?
var filterQuery = from e in inputStream where e.Value > 20 select e;
[Input] Result:
SimpleFilter,Point,12:00:02.000,1001,77 SimpleFilter,Point,12:00:03.000,1001,44 SimpleFilter,Point,12:00:04.000,1001,22 SimpleFilter,Point,12:00:05.000,1001,51 SimpleFilter,Point,12:00:06.000,1001,46 SimpleFilter,Point,12:00:07.000,1001,71 SimpleFilter,Point,12:00:08.000,1001,37 SimpleFilter,Point,12:00:09.000,1001,45 |
How do I filter on multiple conditions?
var filterQuery = from e in inputStream where e.Value > 70 && e.SensorId == 1001 select e;
[Input] Result:
MediumFilter,Point,12:00:01.001,1001,77 MediumFilter,Point,12:00:03.010,1001,72 MediumFilter,Point,12:00:12.082,1001,73 MediumFilter,Point,12:00:14.145,1001,75 MediumFilter,Point,12:00:14.154,1001,71 MediumFilter,Point,12:00:15.163,1001,74 MediumFilter,Point,12:00:15.172,1001,73 |
How can I calculate aggregates, such as averages, over a set of events?
All aggregates, such as averages, min, max, etc are calculated over a set of events. Sets of events in StreamInsight are defined as those events falling into a window. Windows operate over a period of time (hopping, tumbling and sliding), or over a specific number of events (count).
How can I calculate the average of all events in the last 5 seconds?
var query = from window in inputStream.TumblingWindow( TimeSpan.FromSeconds(5), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { Average = window.Avg(e => e.Value) };
[Input] Result:
SimpleTumblingWindow,Point,12:00:00.000,32.2 SimpleTumblingWindow,Point,12:00:05.000,50 |
How can I calculate the average of all events in the past 5 seconds, every 2 seconds?
var query = from window in inputStream.HoppingWindow( TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(2), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { Average = window.Avg(e => e.Value) };
[Input] Result:
SimpleHoppingWindow,Point,11:59:56.000,14 SimpleHoppingWindow,Point,11:59:58.000,31.66666 SimpleHoppingWindow,Point,12:00:00.000,32.2 SimpleHoppingWindow,Point,12:00:02.000,48 SimpleHoppingWindow,Point,12:00:04.000,45.4 SimpleHoppingWindow,Point,12:00:06.000,49.75 SimpleHoppingWindow,Point,12:00:08.000,41 |
How can I calculate the average of all events in the past 5 seconds, every time a new event arrives?
var query = from window in inputStream.HoppingWindow( TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(2), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { Average = window.Avg(e => e.Value) };
[Input] Result:
SimpleSlidingWindow,Point,12:00:00.000,14 SimpleSlidingWindow,Point,12:00:01.000,9 SimpleSlidingWindow,Point,12:00:02.000,31.66 SimpleSlidingWindow,Point,12:00:03.000,34.75 SimpleSlidingWindow,Point,12:00:04.000,32.2 SimpleSlidingWindow,Point,12:00:05.000,39.6 SimpleSlidingWindow,Point,12:00:06.000,48 SimpleSlidingWindow,Point,12:00:07.000,46.8 SimpleSlidingWindow,Point,12:00:08.000,45.4 SimpleSlidingWindow,Point,12:00:09.000,50 SimpleSlidingWindow,Point,12:00:10.000,49.75 SimpleSlidingWindow,Point,12:00:11.000,51 SimpleSlidingWindow,Point,12:00:12.000,41 SimpleSlidingWindow,Point,12:00:13.000,45 |
How can I calculate the average of the past 5 events?
Aggregates over a set number of events are produced using Count windows. As of StreamInsight v1, Count Windows cannot use the built-in aggregate functions (Count, Avg, Min, Max), but must instead use user-defined aggregates. The code sample below shows the use of a simple averaging function with a count window with the user defined aggregate.
var query = from window in inputStream.CountByStartTimeWindow( 5, CountWindowOutputPolicy.PointAlignToWindowEnd) select new { Average = window.UserDefinedAverage(e => e.Value) }; public class SimpleAggregate : CepAggregate<double, double> { public override double GenerateOutput(IEnumerable<double> payloads) { return payloads.Average(); } } public static class MyExtensions { [CepUserDefinedAggregate(typeof(SimpleAggregate))] public static double UserDefinedAverage<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map) { throw CepUtility.DoNotCall(); } }
[Input] Result:
SimpleCountWindow,Point,12:00:04.000,32.2 SimpleCountWindow,Point,12:00:05.000,39.6 SimpleCountWindow,Point,12:00:06.000,48 SimpleCountWindow,Point,12:00:07.000,46.8 SimpleCountWindow,Point,12:00:08.000,45.4 SimpleCountWindow,Point,12:00:09.000,50 |
How do I measure the rate of arriving events every two seconds?
var query = from window in inputStream.TumblingWindow( TimeSpan.FromSeconds(2), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { Count = window.Count() };
[Input] Result:
SimpleEventCount,Point,12:00:00.000,54 SimpleEventCount,Point,12:00:02.000,54 SimpleEventCount,Point,12:00:04.000,54 SimpleEventCount,Point,12:00:06.000,54 SimpleEventCount,Point,12:00:08.000,54 SimpleEventCount,Point,12:00:10.000,54 SimpleEventCount,Point,12:00:12.000,54 SimpleEventCount,Point,12:00:14.000,54 SimpleEventCount,Point,12:00:16.000,54 |
How can I calculate the average of each group of events in the last 10 seconds?
For this example, we’ll calculate the average of each sensor every 10 seconds. We do this by first grouping the events by the sensor ID, then performing the same aggregation operation from the average without grouping (with the addition of the group key – in this case, the sensor ID).
var query = from e in inputStream group e by e.SensorId into sensorGroups from window in sensorGroups.TumblingWindow( TimeSpan.FromSeconds(10), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { SensorId = sensorGroups.Key, Average = window.Avg(e => e.Value) };
[Input] Result:
GroupedTumblingWindow,Point,12:00:00.000,42.1,1001 GroupedTumblingWindow,Point,12:00:00.000,34.6,1002 GroupedTumblingWindow,Point,12:00:00.000,39.7,1003 GroupedTumblingWindow,Point,12:00:00.000,50.9,1004 GroupedTumblingWindow,Point,12:00:00.000,30.7,1005 GroupedTumblingWindow,Point,12:00:00.000,38.3,1006 GroupedTumblingWindow,Point,12:00:00.000,36.8,1007 GroupedTumblingWindow,Point,12:00:00.000,41.7,1008 GroupedTumblingWindow,Point,12:00:00.000,34.4,1009 GroupedTumblingWindow,Point,12:00:10.000,46.375,1001 GroupedTumblingWindow,Point,12:00:10.000,40.2,1002 GroupedTumblingWindow,Point,12:00:10.000,36.1,1003 GroupedTumblingWindow,Point,12:00:10.000,37.3,1004 GroupedTumblingWindow,Point,12:00:10.000,31.8,1005 GroupedTumblingWindow,Point,12:00:10.000,41.4,1006 GroupedTumblingWindow,Point,12:00:10.000,40.3,1007 GroupedTumblingWindow,Point,12:00:10.000,43.5,1008 GroupedTumblingWindow,Point,12:00:10.000,43.3,1009 |
How can I calculate the average of each group of events in the last 20 seconds, every 10 seconds?
var query = from e in inputStream group e by e.SensorId into sensorGroups from window in sensorGroups.TumblingWindow( TimeSpan.FromSeconds(10), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { SensorId = sensorGroups.Key, Average = window.Avg(e => e.Value) };
[Input] Result:
GroupedHoppingWindow,Point,11:59:50.000,42.1,1001 GroupedHoppingWindow,Point,11:59:50.000,34.6,1002 GroupedHoppingWindow,Point,11:59:50.000,39.7,1003 GroupedHoppingWindow,Point,11:59:50.000,50.9,1004 GroupedHoppingWindow,Point,11:59:50.000,30.7,1005 GroupedHoppingWindow,Point,11:59:50.000,38.3,1006 GroupedHoppingWindow,Point,11:59:50.000,36.8,1007 GroupedHoppingWindow,Point,11:59:50.000,41.7,1008 GroupedHoppingWindow,Point,11:59:50.000,34.4,1009 GroupedHoppingWindow,Point,12:00:00.000,44,1001 GroupedHoppingWindow,Point,12:00:00.000,37.0,1002 GroupedHoppingWindow,Point,12:00:00.000,38.1,1003 GroupedHoppingWindow,Point,12:00:00.000,44.8,1004 GroupedHoppingWindow,Point,12:00:00.000,31.2,1005 GroupedHoppingWindow,Point,12:00:00.000,39.7,1006 GroupedHoppingWindow,Point,12:00:00.000,38.4,1007 GroupedHoppingWindow,Point,12:00:00.000,42.5,1008 GroupedHoppingWindow,Point,12:00:00.000,38.4,1009 GroupedHoppingWindow,Point,12:00:10.000,46.3,1001 GroupedHoppingWindow,Point,12:00:10.000,40.2,1002 GroupedHoppingWindow,Point,12:00:10.000,36.1,1003 GroupedHoppingWindow,Point,12:00:10.000,37.3,1004 GroupedHoppingWindow,Point,12:00:10.000,31.8,1005 GroupedHoppingWindow,Point,12:00:10.000,41.4,1006 GroupedHoppingWindow,Point,12:00:10.000,40.3,1007 GroupedHoppingWindow,Point,12:00:10.000,43.5,1008 GroupedHoppingWindow,Point,12:00:10.000,43.3,1009 |
var query = from e in inputStream .AlterEventDuration(e => TimeSpan.FromSeconds(10)) group e by e.SensorId into sensorGroups from window in sensorGroups .SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { SensorId = sensorGroups.Key, Average = window.Avg(e => e.Value) };
[Input] Result:
<snip> GroupedSlidingWindow,Point,12:00:17.017,39.3,1008 GroupedSlidingWindow,Point,12:00:17.018,39.4,1009 GroupedSlidingWindow,Point,12:00:17.019,43,1001 GroupedSlidingWindow,Point,12:00:17.020,43.1,1002 GroupedSlidingWindow,Point,12:00:17.021,37.8,1003 GroupedSlidingWindow,Point,12:00:17.022,41.8,1004 GroupedSlidingWindow,Point,12:00:17.023,33.7,1005 GroupedSlidingWindow,Point,12:00:17.024,39.2,1006 GroupedSlidingWindow,Point,12:00:17.025,40.3,1007 GroupedSlidingWindow,Point,12:00:17.026,38.9,1008 GroupedSlidingWindow,Point,12:00:17.027,40.8,1009 <snip> |
How can I find the events with the largest value every 10 seconds?
var query = (from window in inputStream.TumblingWindow( TimeSpan.FromSeconds(10), HoppingWindowOutputPolicy.ClipToWindowEnd) from e in window orderby e.Value descending select e).Take(1);
[Input] Result:
TopK_TumblingDescending,Point,12:00:00.000,1006,80 TopK_TumblingDescending,Point,12:00:00.000,1007,80 TopK_TumblingDescending,Point,12:00:00.000,1009,80 TopK_TumblingDescending,Point,12:00:10.000,1007,81 |
Note that for the first group of events, the are three events with the same maximum value (80) in the same window of time. Thusly, all of the events with that maximum value are returned.
How can I find the events with the two smallest values in the last 10 seconds, every 5 seconds?
var query = (from window in inputStream.HoppingWindow( TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(5), HoppingWindowOutputPolicy.ClipToWindowEnd) from e in window orderby e.Value ascending select e).Take(2);
[Input] Result:
TopK_HoppingAscending,Point,11:59:55.000,1004,2 TopK_HoppingAscending,Point,11:59:55.000,1007,1 TopK_HoppingAscending,Point,11:59:55.000,1008,2 TopK_HoppingAscending,Point,12:00:00.000,1001,1 TopK_HoppingAscending,Point,12:00:00.000,1001,1 TopK_HoppingAscending,Point,12:00:00.000,1003,1 TopK_HoppingAscending,Point,12:00:00.000,1007,1 TopK_HoppingAscending,Point,12:00:05.000,1001,1 TopK_HoppingAscending,Point,12:00:05.000,1001,1 TopK_HoppingAscending,Point,12:00:05.000,1003,1 TopK_HoppingAscending,Point,12:00:10.000,1004,2 TopK_HoppingAscending,Point,12:00:10.000,1004,2 TopK_HoppingAscending,Point,12:00:10.000,1006,2 TopK_HoppingAscending,Point,12:00:10.000,1007,2 TopK_HoppingAscending,Point,12:00:15.000,1004,2 TopK_HoppingAscending,Point,12:00:15.000,1007,2 |
Note the same effect herein – multiple events having identical minimum values in the window.
How can I find the group of events with the highest average value in the last 10 seconds?
This query is performed in two stages. In the first query we determine the average value of each sensor over the past 10 seconds (using a group by, a tumbling window and an Avg()). Then we take the output of that aggregate and perform a snapshot on top of that (against each set of events from the first query), sort it and take the top value.
var avg = from e in inputStream group e by e.SensorId into sensorGroups from window in sensorGroups.TumblingWindow( TimeSpan.FromSeconds(10), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { SensorId = sensorGroups.Key, Average = window.Avg(e => e.Value) }; var query = (from win in avg .SnapshotWindow(SnapshotWindowOutputPolicy.Clip) from e in win orderby e.Average descending select e).Take(1);
[Input] Results:
TopK_GroupTumblingDescending,Point,12:00:00.000,50.9,1004 TopK_GroupTumblingDescending,Point,12:00:10.000,46.3,1001 |
var avg = from e in inputStream group e by e.SensorId into sensorGroups from window in sensorGroups.HoppingWindow( TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(5), HoppingWindowOutputPolicy.ClipToWindowEnd) select new { SensorId = sensorGroups.Key, Maximum = window.Max(e => e.Value), Minimum = window.Min(e => e.Value) }; var query = (from win in avg .SnapshotWindow(SnapshotWindowOutputPolicy.Clip) from e in win orderby e.Maximum descending select e).Take(1);
[Input] Result:
TopK_GroupHoppingDescending,Point,11:59:55.000,80,6,1006 TopK_GroupHoppingDescending,Point,11:59:55.000,80,1,1007 TopK_GroupHoppingDescending,Point,11:59:55.000,80,4,1009 TopK_GroupHoppingDescending,Point,12:00:00.000,80,6,1006 TopK_GroupHoppingDescending,Point,12:00:00.000,80,6,1006 TopK_GroupHoppingDescending,Point,12:00:00.000,80,1,1007 TopK_GroupHoppingDescending,Point,12:00:00.000,80,1,1007 TopK_GroupHoppingDescending,Point,12:00:00.000,80,2,1009 TopK_GroupHoppingDescending,Point,12:00:00.000,80,4,1009 TopK_GroupHoppingDescending,Point,12:00:05.000,80,2,1006 TopK_GroupHoppingDescending,Point,12:00:05.000,80,6,1006 TopK_GroupHoppingDescending,Point,12:00:05.000,80,1,1007 TopK_GroupHoppingDescending,Point,12:00:05.000,80,2,1009 TopK_GroupHoppingDescending,Point,12:00:10.000,81,2,1007 TopK_GroupHoppingDescending,Point,12:00:15.000,81,2,1007 TopK_GroupHoppingDescending,Point,12:00:15.000,81,2,1007 TopK_GroupHoppingDescending,Point,12:00:20.000,81,2,1007 |
Again, note the StreamInsight engine delivering the values corresponding to the top maximum value (80 or 81).
To implement this query, we use a sliding window to look at arriving events in a 5 minute window, grouped by their ID value. We then count the number of events in the window with that ID, and filter for groups that have less than 2 events.
var eventCount = from e in inputStream.AlterEventDuration(e => TimeSpan.FromMinutes(5)) group e by e.SensorId into sensorGroups from win in sensorGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip) select new { SensorId = sensorGroups.Key, Count = win.Count() }; var query = from e in eventCount where e.Count >= 2 select e;
[Input] Result:
DetectMultipleInWindow,Point,12:05:16.206,4,1008 DetectMultipleInWindow,Point,12:05:16.207,4,1009 DetectMultipleInWindow,Point,12:05:16.208,3,1001 DetectMultipleInWindow,Point,12:05:16.209,3,1002 <snip> |