Workflow on Windows Azure Research

Our team is busily working on some great new stuff to bring first class support for Workflow to Windows Azure.  We want to hear from you so we can be sure to deliver on what you need.

Drop me a line if you want to chat and you

  • Have workflow (WF3/ WF4) solutions running on Azure or other cloud systems today
  • Have workflow (WF3/ WF4) solutions on-premises and moving that solution to Azure in the next 3 months
  • Have solutions running on Azure today and are contemplating adding Workflow to your solution

Follow me on Twitter

Come Join My Team!

One of the best career decisions I’ve made was to join the Windows Azure Platform Evangelism team last July.  Yes, it was a lot of work to uproot the family and move to Seattle, but it was absolutely worth it.  I pinch myself nearly everyday to make sure that it’s not just a dream – […]

BizTalk Server 2006 Oracle Adapter Setup in Windows Server 2003 R2 (64 Bit)

Recently I had to integrate with Oracle db using the BizTalk 2006 Oracle adapter (Note: just our vanilla Oracle adapter, not the WCF one). I experience huge challenge setting it up on a Windows 2003 64 bit machine. It’s mainly around getting the correct Oracle drivers, using the 64 bit version of ODBC data source interface, giving permission to certain folders, etc.

We had to do this on a production server on limited window, so its crucial to get the steps spot on. Here are the steps that?s required.

  1. Download Oracle Data Access Component
  2. Install Oracle Data Access Component
  3. Create/Configure tnsnames.ora file will connection details
  4. Create a System DSN and test the connection
  5. Install Oracle Adapter
  6. Configure Oracle Adapter
  7. Give permission to Oracle Installation folder
  8. Restart Host Instances
  9. Configure a Receive Port/Receive Location

1 Download Oracle Data Access Component

The first step in configuring the BizTalk 2006 Oracle adapter is to make sure the underlying Oracle ODAC components are correctly configured. You can download the required files from the following location

http://www.oracle.com/technetwork/database/windows/downloads/utilsoft-087491.html

The page will display various links, with various flavours. The one we are interested in (the one we tested successfully) is

Download the Oracle Universal Installer version – ODAC112012.zip

The key thing to look out for in the list is the “Oracle ODBC Driver 11.2.0.1.0”

The other important bit here is not to confuse too much with the 64 bit and 32 bit versions. You only need 32 bit version of the ODAC installer.

2 Install Oracle Data Access Component

Once you have downloaded the ODAC112012.zip (around 270mb), extract the content and identify setup.exe file. Double click the file to start the installation process.

On the first screen select the option “Oracle Data Access Components for Oracle Client 11.2.0.1.2”

In the next screen you need to specify the path, by default it puts the current user name at the end. Just replace the username with “Oracle”

In the component selection screen, select only the following components

  1. Oracle Data Provider for .NET 11.2.0.1.2, and
  2. Oracle Instant Client 11.2.0.1.0

Note: While unselecting you need to do it from bottom up, else you will get error about dependencies.

3 Create/Configure tnsnames.ora file will connection details

Once the Oracle ODAC components are successfully installed, you need to configure the tnsnames.ora file (found under your install folder d:\app\Oracle\Network\admin) with the environment specific connection string. Example:

ORACLEWEB =

(DESCRIPTION =

(ADDRESS = (PROTOCOL = TCP)(HOST = yourservername)(PORT = 1523))

(CONNECT_DATA =

(SERVER = DEDICATED)

(SERVICE_NAME = FILEMFU1)

)

)

Oracle DBA’s should be able to provide these details.

4 Create a System DSN and test the connection

Next we need to create a system DSN to connect to the oracle database. On a 64 bit system it?s important to use the 64bit or Wow64 enabled ODBC datasources interface instead of using the standard one present under Administrative Tools\DataSources ODBC.

In order to do that, open the odbccad32 application found under the following location C:\WINDOWS\SysWOW64\odbcad32.exe

Navigate to System DSN tab and click “Add” button. Then Select “Oracle in OraClient11g_home1” and click finish

In the next screen, Set

Data Source Name = ORACLEWEB

TNS Service Name = ORACLEWEB (This should correspond to the name provided in the tnsnames.ora file)

UserID = <<environment specifi>>

Then click the “Test Connection” button, provide password and make sure the connection is successful.

5 Install Oracle Adapter

Once establishing the successful oracle connection from the BizTalk machine, the next step is to install the BizTalk 2006 Oracle adapter. The installer files are present in a separate CD called “Microsoft BizTalk Server Adapters for Enterprise”, if you can?t find the CD you can download it from MSDN using the subscription account.

Once downloaded, open the setup.exe file and select “Install Microsoft BizTalk adapters for Enterprise Application” link.

In the next screen, only select the “Oralce? Database” (not the Oracle? E-Business Suite) and make all other options disabled.

Follow the rest of wizard with standard steps and finish the installation.

6 Configure Oracle Adapter

Once the oracle adapter setup is completed, open up BizTalk Administration console and navigate to Platform Settings\Adapters, right click and select “New”

In the new window, enter “OralceDB” for name and select “Oracle? Database” from the drop down box as shown below:

Click OK.

7 Give permission to Oracle Installation folder

Navigate to the oracle ODAC installation folder ex: “D:\app\Oracle” in the windows explorer. Right-click and select Security. Add the service account user name under which the host instance is running and provide full control

8 Restart Host Instances

Make sure you restart the required host instances. Note: There is no requirement to create a 32 bit host/host instances.

9 Configure a Receive Port/Receive Location

You can test the configuration by creating a new Receive Port, Receive Location.

Open “BizTalk Application 1” and

  1. Navigate to Receive Port, right click and select “New – One-Way Receive Port”, Name it “Oracle Test”.
  2. Select “Receive Location” tab and click “New”, Name it “Oracle Test”
  3. Select “Oracle DB” from the Type combo box and click “Configure”. You need to configure 5 main values
    1. Path: Point to “D:\app\Oracle\product\11.2.0\client_1\bin”
    2. Service Name: System DSN name configured earlier
    3. User Name:
    4. Password:
    5. Poll SQL Statement

As shown below:

Click ?Apply? and then ?OK?. Click the ?Configure? button again and this time click on the ?Manage Events? option, which will pop up a new window. On LHS navigate to ?Native SQL?.

Note: If you are able to see the LHS expand as shown below, then it a sign showing everything configured correctly.

Click “OK” all the way through, and enable the “Receive Location”. After a while you should see some messages coming into BizTalk server and gets suspended (since there are no subscribers). But this will just prove the oracle configuration is successful.

Gartner Names Windows Azure AppFabric “A Strategic Core of Microsoft’s Cloud Platform”

Earlier in October, at our Professional Developer Conference (PDC), we made some exciting announcements regarding the future and roadmap of Windows Azure AppFabric.

If you want to learn more regarding these announcements you can visit the Windows Azure AppFabric website, and also read content and watch sessions from PDC.

Today, we wanted to highlight a paper the leading analyst firm Gartner published following PDC regarding these roadmap announcements.

Here is a quick summary and link to the paper:

Windows Azure AppFabric: A Strategic Core of Microsoft’s Cloud Platform (Gartner, November 15, 2010) Examines Microsoft’s strategy with Windows Azure AppFabric and the AppFabric services, concluding that “continuing strategic investment in Windows Azure is moving Microsoft toward a leadership position in the cloud platform market, but success is not assured until user adoption confirms the company’s vision and its ability to execute in this new environment.”

This reinforces that the investments we are making in Windows Azure AppFabric are leading us in the right direction in the cloud.

Besides the exciting roadmap, Windows Azure AppFabric provides great capabilities and benefits already today. So be sure to try it out using our free trial offer.

Click the link below and get started!

StreamInsight: Creating a LINQPad data context

This blog post is for those folks comfortable with creating a data stream from an IEnumerable, IObservable or StreamInsight adapter data source.  If you’d like more details on creating those, have a look at this blog post, which covers creating an IEnumerable ’canned’ data source then wrapping a LINQPad context around it.

Creating a data context is very straightforward:

  • Create a new C# class library project in Visual Studio 2010 called SampleContext.
    • Ensure that the Framework type is set to .NET Framework 4 (and NOT .NET Framework 4 Client Profile)
    • Add a reference to Microsoft.ComplexEventProcessing.dll
    • Add a reference to Microsoft.ComplexEventProcessing.Observable.dll
      • This is required for the IObservable<T>.ToPointStream extension method.
    • Add a reference to Microsoft.ComplexEventProcessing.Adapters.dll
      • Optional – this is only required if you have an adapter embedded inside of your context
    • Add a reference to System.Reactive.dll
      • Optional – this is only required for creating different types of IObservable streams.
      • Note: System.Reactive.dll is part of the Reactive Extensions framework, which needs to be downloaded separately.
  • Derive a class from StreamInsightContext in the StreamInsightLinqPad.Samples assembly (available as part of the LINQPad driver for StreamInsight download).
  • Expose your stream as a property of the class.

 

Download the sample project here.

That’s it!  Here’s what this looks like in practice.  First we establish the basic class

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using StreamInsightLinqPad.Samples;
  6. using Microsoft.ComplexEventProcessing;
  7. using Microsoft.ComplexEventProcessing.Linq;
  8. using StreamInsight.Samples.Adapters.DataGenerator;
  9.  
  10. /// <summary>
  11. /// Define a sample LINQPad context for StreamInsight
  12. /// </summary>
  13. public class SampleContext : StreamInsightContext
  14. {
  15. public SampleContext(Server server)
  16. : base(server)
  17. { }
  18. }

Now we’ll go ahead and expose three types of streams.  The first will be one generated from an IEnumerable.

  1. SimpleEvent[] events = new SimpleEvent[]
  2. {
  3. new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 12, 0, 0), ID = 5, Message = "Test" },
  4. new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 13, 0, 0), ID = 6, Message = "Test2" },
  5. new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 14, 0, 0), ID = 7, Message = "Test3" },
  6. new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 15, 0, 0), ID = 5, Message = "Test4" },
  7. };
  8. /// <summary>
  9. /// Expose a stream created from an IEnumerable
  10. /// </summary>
  11. public CepStream<SimpleEvent> SimpleStream
  12. {
  13. get
  14. {
  15. return events.ToPointStream(this.Application, t =>
  16. PointEvent.CreateInsert<SimpleEvent>(t.Timestamp, t),
  17. AdvanceTimeSettings.IncreasingStartTime);
  18. }
  19. }

Next, we’ll use IObservable (created via Observer.Interval) to generate a “live” stream of events:

  1. /// <summary>
  2. /// Expose a stream created from an IObservable
  3. /// </summary>
  4. public CepStream<SimpleEvent> ObservableStream
  5. {
  6. get
  7. {
  8. var rand = new Random();
  9.  
  10. // Create a simple observable that returns a random event every
  11. // 250 ms
  12. var interval = Observable.Interval(TimeSpan.FromMilliseconds(250))
  13. .Select(i => new SimpleEvent
  14. {
  15. ID = rand.Next(10),
  16. Timestamp = DateTime.UtcNow,
  17. Message = "Observable message!"
  18. });
  19. return interval.ToPointStream(Application, s =>
  20. PointEvent.CreateInsert(s.Timestamp, s),
  21. AdvanceTimeSettings.IncreasingStartTime,
  22. null);
  23. }
  24. }

Finally, we’ll leverage the DataGenerator sample adapter (from http://streaminsight.codeplex.com/) to demonstrate creating a LINQPad stream from an adapter.

  1. /// <summary>
  2. /// Expose a stream created from an adapter instance
  3. /// </summary>
  4. public CepStream<GeneratedEvent> AdapterStream
  5. {
  6. get
  7. {
  8. var generatorConfig = new GeneratorConfig()
  9. {
  10. CtiFrequency = 1,
  11. DeviceCount = 3,
  12. EventInterval = 250,
  13. EventIntervalVariance = 10,
  14. MaxValue = 100
  15. };
  16.  
  17. var inputStream = CepStream<GeneratedEvent>.Create(Application,
  18. "inputStream", typeof(GeneratorFactory), generatorConfig,
  19. EventShape.Point);
  20. return inputStream;
  21. }
  22. }

To test, we’ll need to configure LINQPad with our assembly as a data context.  To do this:

  1. Add Connection, then select Microsoft StreamInsight from the list of drivers.
  2. Select Custom Context from the Context Kind dropdown.
  3. Click on the ellipsis for Assembly Path (…), then browse to the directory containing SampleContext.dll.
  4. Click OK to finish selecting the new context.

The context streams should now be visible in LINQPad, similar to the screenshot below.  Create a new Query, and select C# Statements as the Language, and StreamInsight: SampleContext as the database.

Let’s dump our streams, using this LINQ code:

  1. // Dump out the enumerable stream.Since this is an IEnumerable, it
  2. // will eventually complete and return
  3. SimpleStream.ToEnumerable().Dump("Enumerable stream");
  4.  
  5. // Dump out the observable stream.As this is an IObservable without
  6. // an end condition, we will have to stop the query (as it will not
  7. // stop on its own)
  8. ObservableStream.ToObservable().Dump("Observable stream");

This will show us our enumerable and observable streams:

Finally, let’s dump out some data from our adapter stream:

  1. // Dump out the adapter stream.As this is an adapter without
  2. // an end condition, we will have to stop the query (as it will not
  3. // stop on its own)
  4. AdapterStream.ToObservable().Dump("Adapter stream");

Resulting in:

Note – I had to make one change to the adapter code to make it work with ToObservable.  The GeneratedEvent definition doesn’t define a default constructor (which is needed by ToObservable).   In GeneratedEvent.cs, on line 23 I added the code:

  1. public GeneratedEvent()
  2. { }

That’s it – a LINQPad data context in a nutshell.

Systematically Starting and Stopping host instances

Wanted to put a couple of scripts I put together on how to start and stop host instances using WMI in a VBS file

StartAllInProcessHostInstance
const HostInstServiceState_Stopped=1
Sub StartAllInProcessHostInstance ()
   On Error Resume Next
   Dim Query, HostInstSet, Inst
   ' Enumerate all InProcess type Host Instance
   Query = "SELECT * FROM MSBTS_HostInstance WHERE HostType =1 and IsDisabled='FALSE'"
   Set HostInstSet = GetObject("Winmgmts:!root\MicrosoftBizTalkServer").ExecQuery(Query)
   For Each Inst in HostInstSet
      ' If host instance is stopped, then it'll start it
      If( Inst.ServiceState=HostInstServiceState_Stopped ) Then
         wscript.echo "Starting host instance -> " & Inst.HostName
             Inst.Start   ' Calling MSBTS_HostInstance::Start() method
         CheckWMIError
         wscript.echo Inst.HostName & " has been started successfully on server " & Inst.RunningServer & VBNewLine
      End If
   Next
end Sub
'This subroutine deals with all errors using the WbemScripting object.  Error descriptions
'are returned to the user by printing to the console.
Sub   CheckWMIError()
   If Err <> 0   Then
      On Error Resume   Next
      Dim strErrDesc: strErrDesc = Err.Description
      Dim ErrNum: ErrNum = Err.Number
      Dim WMIError : Set WMIError = CreateObject("WbemScripting.SwbemLastError")
      If ( TypeName(WMIError) = "Empty" ) Then
         wscript.echo strErrDesc & " (HRESULT: "   & Hex(ErrNum) & ")."
      Else
         wscript.echo WMIError.Description & "(HRESULT: " & Hex(ErrNum) & ")."
         Set WMIError = nothing
      End   If  
      wscript.quit 0
   End If
End Sub

StopAllInProcessHostInstance
const HostInstServiceState_Started=4
Sub StopAllInProcessHostInstance ()
   On Error Resume Next
   Dim Query, HostInstSet, Inst
   ' Enumerate all InProcess type Host Instance
   Query = "SELECT * FROM MSBTS_HostInstance WHERE HostType =1 and IsDisabled='FALSE'"
   Set HostInstSet = GetObject("Winmgmts:!root\MicrosoftBizTalkServer").ExecQuery(Query)
   For Each Inst in HostInstSet
      ' If host instance is stopped, then it'll start it
      If( Inst.ServiceState=HostInstServiceState_Started ) Then
         wscript.echo "Stopping host instance -> " & Inst.HostName
             Inst.Stop   ' Calling MSBTS_HostInstance::Stop() method
         CheckWMIError
         wscript.echo Inst.HostName & " has been stopped successfully on server " & Inst.RunningServer & VBNewLine
      End If
   Next
end Sub
'This subroutine deals with all errors using the WbemScripting object.  Error descriptions
'are returned to the user by printing to the console.
Sub   CheckWMIError()
   If Err <> 0   Then
      On Error Resume   Next
      Dim strErrDesc: strErrDesc = Err.Description
      Dim ErrNum: ErrNum = Err.Number
      Dim WMIError : Set WMIError = CreateObject("WbemScripting.SwbemLastError")
      If ( TypeName(WMIError) = "Empty" ) Then
         wscript.echo strErrDesc & " (HRESULT: "   & Hex(ErrNum) & ")."
      Else
         wscript.echo WMIError.Description & "(HRESULT: " & Hex(ErrNum) & ")."
         Set WMIError = nothing
      End   If  
      wscript.quit 0
   End If
End Sub

StreamInsight: Creating a custom data context for LINQPad, chock full of weathery goodness.

Following on the heels of my last query pattern blog post, I started to dig into creating a custom data context for use with StreamInsight and LINQPad.  The default contexts supplied out of box with the StreamInsight driver (including the Hitchhiker’s Guide context) are great, but don’t contain a large volume of data.  I wanted to put together a much larger context to have a baseline to play with larger and more complicated query patterns.  My goals for the context/data streams were:

  • Contain “real” process data, around a million “real” data records with accompanying metadata.
  • Be easily transferrable / downloadable
  • Be (relatively) human readable
  • Be performant (sustain at least 60k events read / sec on my desktop)

Building on the fun I had putting together the windchill sample, here’s what I decided on:

  • Use weather data from Environment Canada, who has kindly made a great deal of information available for download in XML format.
    • This includes both the “live” weather and reference data about the producing weather stations.  I chose to use airport weather information, as it is a lot more granular than your average weather station.
  • Create a custom data context which can pull information from compressed CSV files, and expose it as a CepStream.

 

If you’re already comfortable with exposing data via IEnumerable interface, check out this blog post that provides a concise summary of creating data contexts.

 

Why did I choose to store the canned data set as a compressed CSV file?  Based on a series of performance explorations, this turned out to be the best choice between file size (being downloadable), and performance (throughput).

For all of the gory performance exploration, check out this blog post. 

This blog post walks through the various steps of creating the custom data context:

  • Preparing a canned data set (Downloading, parsing and formatting).
  • Reading back the canned data set
  • Creating a LINQPad data context which exposes the data set as a CepStream<T>.

If you’re comfortable with creating/accessible canned data sets, and you just want to create a custom LINQPad context for use with StreamInsight, skip directly to that section. 

Before you start reading through the blog, download and follow along with the code samples:

  • Complete package (weather context and LINQ samples)

 

Preparing a canned data set – Downloading

Environment Canada rocks.  They really do (caveat: I am Canadian ).  They’ve made a great deal of climatic data available for download in their National Climate Archives On-line.  From the National Climate Archives site, I chose to download ten year’s worth of information for ten airports in Canada (airport weather stations tend to record and archive data in 30 minute or hourly increments – most “other” weather stations seem to record 2-3x per day).  Since the goal is lots of time-series data to look at weather stations it is (hourly records gives us 8760 records per year per weather station.  10 stations times 10 years gives us 876k records.. close enough ).

You can see the available stations in a given area and timeframe by looking at their customized search.  Being familiar with Canadian airport geography, I selected them by:

  • Searching by province for a specific date.
  • Selecting the data for an airport (I happened to know what they were).
  • From the specific screen, there’s a navigation option to download data in either CSV or XML format.  I chose to download in XML (this was before I had settled on using CSV as the store format of choice).
    • The station code and station metadata is also available from the URL for this view (for example, the station code for St. John’s, Newfoundland is 6720.

The web URL for downloading data was of the form:

  1. var urlQuery = "http://www.climate.weatheroffice.gc.ca/climateData/bulkdata_e.html? timeframe=1&Prov=XX&StationID={0}&Year={1}&Month={2}&Day={3}&format={4}&type=hly";

With the following variables:

  • ( 0 ) Station ID
  • ( 1 ) Year
  • ( 2 ) Month
  • ( 3 ) Day
  • ( 4 ) Format (XML, CSV)

Next step was to script up a downloader that pulled down data for 2010 for each of my target weather stations.  Threw something quick together in LINQPad to accomplish this.  Each invocation of this URL will return data for the given month (i.e. the day value is ignored).

Download the LINQ script Download_WeatherData.

After running this script, I had available to me roughly 1.0 GB of raw XML weather data.  The contents took the form:

  1. <stationinformation>
  2. <name>OTTAWA MACDONALD-CARTIER INT&apos;LA</name>
  3. <province>ONTARIO</province>
  4. <latitude>45.32</latitude>
  5. <longitude>-75.67</longitude>
  6. <elevation>114.00</elevation>
  7. <climate_identifier>6106000</climate_identifier>
  8. <wmo_identifier>71628</wmo_identifier>
  9. <tc_identifier>YOW</tc_identifier>
  10. <note>All times are specified in Local Standard Time (LST). Add 1 hour to adjust for Daylight Saving Time where and when it is observed.</note>
  11. </stationinformation>
  12. <stationdata day="1" hour="0" minute="0" month="1" year="2010" quality=" ">
  13. <temp description="Temperature" units="°C">-5.30</temp>
  14. <dptemp description="Dew Point Temperature" units="°C">-6.00</dptemp>
  15. <visibility description="Visibility" units="km">4.80</visibility>
  16. <relhum description="Relative Humidity" units="%">95.00</relhum>
  17. <winddir description="Wind Direction" units="10&apos;s deg">9.00</winddir>
  18. <windspd description="Wind Speed" units="km/h">15.00</windspd>
  19. <stnpress description="Station Pressure" units="kPa">100.03</stnpress>
  20. <humidex description="Humidex"></humidex>
  21. <windchill description="Wind Chill">-11.00</windchill>
  22. <weather description="Weather">Snow</weather>
  23. </stationdata>

Note that we have both the station metadata and the “live” weather data available to us.  The next step will be to render this data into two separate data sets:

  • Sorted list of weather data across all of the weather stations, saved as a CSV file with headers.
  • List of weather station information (one record for each station), saved as a CSV file with headers.

Note that we will also have to shift the timestamp on each data record to the appropriate DateTimeOffset.  We’ll use another handy little LINQ nugget to do this (I’ve really gotten into using LINQPad as my general purpose C# scratchpad).

Download the LINQ script Parse_WeatherData.

Let’s walk through the main method before diving into the two key functions, ParseWeatherData and CsvSequenceStore.WriteEvents<T>.  We grab the list of files from the download directory (in this case we just take 5 to run quickly and work with a smaller amount of data for test purposes).  Next, iterate over the list of files, parsing each and inserting the results into the weatherData and stations lists.  The weatherData list is then sorted before being written into the output file via the CsvSequenceStore.  Note how we use the GZipStream to transparently write out a compressed stream. 

Note that the entire 1 GB worth of data is read into memory before being written out to the destination file.  This isn’t the world’s best coding practice.  What I should have done was:

  • Written out each weather data file to a common output .CSV file
  • Sorted the .CSV file.
  • Compressed the .CSV file.

However, as this was a one-shot, and I have lots of memory on my desktop, I brute-forced it.

  1. void Main()
  2. {
  3. // Get a list of all of the downloaded XML files.
  4. var fileList = Directory.GetFiles(@"C:\temp\weatherdata\")
  5. .Take(5) /* Take the first 5 for testing purposes */
  6. .Where(d => d.EndsWith(".xml"));
  7.  
  8. // Define the output file
  9. var outputFile = @"C:\temp\weather_data_out.csv.gz";
  10. var stationFile = @"C:\temp\station_data_out.csv.gz";
  11.  
  12. // Create holding lists for weather data and stations
  13. var weatherData = new List<WeatherReading>();
  14. var stations = new List<StationData>();
  15.  
  16. // Iterate through the file list and parse for weather data
  17. var fileCount = (double)fileList.ToList().Count();
  18. var progress = 0.0;
  19. fileList.ToList().ForEach(fl =>
  20. {
  21. Console.WriteLine("{0:00.0} Parsing file {1}",
  22. (progress / fileCount * 100.0), fl);
  23. ParseWeatherData(fl, weatherData, stations);
  24. progress++;
  25. });
  26.  
  27. // Sort the weather data into strict ascending time order
  28. weatherData = weatherData.OrderBy(d => d.Timestamp).ToList();
  29.  
  30. // Use the CSV store
  31. CsvSequenceStore store = new CsvSequenceStore();
  32.  
  33. // Write the data to a compressed file using the GZipStream class.Way cool.
  34. Console.WriteLine("Writing {0} events to file {1}",
  35. weatherData.Count, outputFile);
  36.  
  37. using (FileStream fs = new FileStream(outputFile, FileMode.Create))
  38. {
  39. using (GZipStream zipStream = new GZipStream(
  40. fs, CompressionMode.Compress))
  41. {
  42. // Write out the weather data set to the file, using the Timestamp field
  43. // as the event timestamp
  44. store.WriteEvents<WeatherReading>(zipStream, weatherData,
  45. e => e.Timestamp);
  46. }
  47. }
  48.  
  49. using (FileStream fs = new FileStream(stationFile, FileMode.Create))
  50. {
  51. using (GZipStream zipStream = new GZipStream(
  52. fs, CompressionMode.Compress))
  53. {
  54. // Write out the station data set to the file, using a hard coded
  55. // timestamp
  56. store.WriteEvents<StationData>(zipStream, stations,
  57. e => DateTime.Parse("2010-01-01 00:00:00"));
  58. }
  59. }
  60. }

Now to dive into the first of the two key functions being called – ParseWeatherData.

  1.  
  2. /// Parse through the file, extracting weather and station data
  3. void ParseWeatherData(string fileName,
  4. List<WeatherReading> weatherData, List<StationData> stations)
  5. {
  6. if (!System.IO.File.Exists(fileName))
  7. throw new ArgumentException("Could not load file" + fileName);
  8.  
  9. using (StreamReader sr = new StreamReader(fileName))
  10. {
  11. var xml = XElement.Load(sr);
  12.  
  13. // Parse the station information
  14. var q = from e in xml.Elements() where e.Name == "stationinformation" select e;
  15. var stationInfo = q.First();
  16. var stationName = GetValue<string>(stationInfo, "name");
  17. var stationCode = GetValue<int>(stationInfo, "wmo_identifier");
  18.  
  19. var newStation = new StationData()
  20. {
  21. Name = stationName,
  22. Elevation = GetValue<double>(stationInfo, "elevation"),
  23. Latitude = GetValue<double>(stationInfo, "latitude"),
  24. Longitude = GetValue<double>(stationInfo, "longitude"),
  25. TCID = GetValue<string>(stationInfo, "tc_identifier"),
  26. StationCode = GetValue<int>(stationInfo, "wmo_identifier"),
  27. Province = GetValue<string>(stationInfo, "province")
  28. };
  29.  
  30. // If this is a new station, add it to the list
  31. if (!stations.Where(s => s.Name == stationName).Any())
  32. stations.Add(newStation);
  33.  
  34. // Parse the station data
  35. var sds = from e in xml.Elements() where e.Name == "stationdata" select e;
  36. sds.ToList().ForEach((e) =>
  37. {
  38. if (!String.IsNullOrEmpty(GetValue<string>(e, "temp")))
  39. {
  40. // Assign the core values
  41. var wr = new WeatherReading()
  42. {
  43. StationCode = stationCode,
  44. Temperature = GetValue<double>(e, "temp"),
  45. DewpointTemperature = GetValue<double>(e, "dptemp"),
  46. Humidex = GetValue<double>(e, "humidex"),
  47. RelativeHumidity = GetValue<double>(e, "relhum"),
  48. StationPressure = GetValue<double>(e, "stnpress"),
  49. Visibility = GetValue<double>(e, "visibility"),
  50. WindChill = GetValue<double>(e, "windchill"),
  51. WindDirection = GetValue<double>(e, "winddir"),
  52. WindSpeed = GetValue<double>(e, "windspd"),
  53. };
  54.  
  55. // Convert the timestamp into universal time based
  56. // on the current station information (lookup based on
  57. // province
  58. var tzi = tzMap[newStation.Province];
  59. // Asemble the timestamp
  60. var dt = new DateTimeOffset(
  61. GetAttribute<int>(e, "year"),
  62. GetAttribute<int>(e, "month"),
  63. GetAttribute<int>(e, "day"),
  64. GetAttribute<int>(e, "hour"),
  65. GetAttribute<int>(e, "minute"),
  66. 0, tzi.BaseUtcOffset);
  67. wr.Timestamp = dt.UtcDateTime;
  68. // Parse weather conditions – WeatherConditions is an enum
  69. // flags field
  70. string[] conditions = GetValue<string>(e, "weather")
  71. .Replace(' ', '_')
  72. .Split(new char[] { ',' });
  73.  
  74. WeatherConditions cond = WeatherConditions.None;
  75. WeatherConditions total = WeatherConditions.None;
  76. foreach (var s in conditions)
  77. {
  78. if (Enum.TryParse<WeatherConditions>(s, out cond))
  79. {
  80. total |= cond;
  81. }
  82. }
  83.  
  84. // Add the data to the list
  85. weatherData.Add(wr);
  86. }
  87. });
  88. }
  89. }

This looks like a lot, but it’s simply an XML parser, leveraging the XElement class.  After opening up the file, we pull the file into an XElement object, then pluck out the station and weather (station data) information.  The GetValue<T> and GetAttribute<T> functions are simple helpers to ease extracting typed data from the various nodes and coercing the types.

Note that I use Convert.ChangeType in the GetValue<T> and GetAttribute<T> functions.  This is a pretty slow method – I would have been better off using a switch() statement on the data type and XmlConvert for performance reasons. 

However, as this is code that only runs once, I didn’t bother to add the performance optimization.  I did spend a lot of time on performance optimizations for reading the data, which we’ll cover in the next section.

There are only two “custom” operations in ParseWeatherData, to handle two custom data parsing condtions:

  • Timezone offset.  Each of the timestamps in the file is in local time.  In order to adjust to UTC time we need to determine the time zone (based on the province field in the stations information, which we use to look up a TimeZoneInfo object from a static dictionary I defined at the bottom of the linq file).  This is used as an argument to DateTimeOffset to shift the date to UTC.
  • Weather conditions.  The input XML file can define weather conditions as having multiple values.  To support this in StreamInsight in a relatively straightforward fashion, I defined a bitwise [Flags] enum (WeatherConditions) that I fill in on lines 72-83.

Finally, we have our friendly neighborhood public void WriteEvents<T>(Stream stream, IEnumerable<T> events, Func<T, DateTimeOffset> dtFunc) method, which writes out each event to a .CSV file.  It takes as arguments:

  • Stream stream.  The destination stream – taking a stream rather than a file name made it easy to substitute the GZipStream writer to easily add in compression.
  • IEnumerable<T> events.  The list of events to write out to the file.
  • Func<T,DatetimeOffset> dtFunc.  In order to easily stream events into StreamInsight, we need to know the temporal properties of the event (i.e. the StartTime).  Since the CsvSequenceStore is generic, we hand in a function that identifies the temporal properties (in our case, returning e.Timestamp).
  1. public void WriteEvents<T>(Stream stream, IEnumerable<T> events,
  2. Func<T, DateTimeOffset> dtFunc)
  3. {
  4. var encodingArray = new byte[16000];
  5. var fields = typeof(T).GetProperties();
  6. var lastField = fields.Last();
  7.  
  8. var sb = new StringBuilder();
  9. var str = String.Empty;
  10. var byteLen = 0;
  11.  
  12. foreach (var e in events)
  13. {
  14. sb.Clear(); str = String.Empty; byteLen = 0;
  15.  
  16. // Write out the event kind, start time and end time
  17. sb.Append("Point,");
  18. sb.Append(dtFunc(e).Ticks);
  19. sb.Append(",,");
  20.  
  21. foreach (var o in fields)
  22. {
  23. sb.Append(o.GetValue(e, null).ToString());
  24. if (o != lastField)
  25. sb.Append(",");
  26. }
  27. sb.AppendLine();
  28. str = sb.ToString();
  29.  
  30. byteLen = Encoding.ASCII.GetBytes(str, 0, str.Length, encodingArray, 0);
  31. stream.Write(encodingArray, 0, byteLen);
  32. }
  33. }

This is a very straightforward function.  Using reflection, we obtain the list of public properties for the type (via GetProperties()), and use a StringBuilder to assemble the text.  Wanted to avoid using String.Join(), hence remembering lastField.  Then iterate through the list of properties, and use the reflection method GetValue to write out the string representation to the line.

As a base Stream class only understands writing out bytes, we convert the line into a byte array and write it out.

Note that I ASCII encoding here.  Really should be using Unicode encoding, but I happen to know that the data is pure ASCII (at least it is THIS time).  Should a future blog feature Unicode data, this line will change

After executing the script, I have a compressed .CSV file containing weather data.  Let’s have a look at the contents.  First step will be to unzip it into a plain .csv file (via the gzip.exe utility – I could write a script to do this, but we’ll get to using GzipStream to uncompress in the next section).

Now, we crack open our .CSV files with note and have a look (not that these only have 5 weather files in them, as I left the .Take() method uncommented on line 5).

Point,633979188000000000,,71628,1/1/2010 5:00:00 AM,-5.3,-6,95,9,15,4.8,100.03,0,-11,0

Excellent – we now have (mostly) human-readable data in a downloadable (i.e. small) format that we can extract quickly.  On to extraction!

Reading back the canned data set

We’ll stay in LINQPad for the time being (will switch over to Visual Studio when it comes time to create the actual data context – feel free to work through this code in VS.NET if it’s more comfortable).  Now that we have the canned data set (as our compressed .CSV file), we need to be able to read it back out again in a form that can be turned into a CepStream<T>.

Download the LINQ script Read_WeatherData.

This is very similar, yet a little simpler, than the code which we used in the last section to write the data into the compressed CSV file.  The parsing code is the same (also using the CsvSequenceStore class, only now using the ReadEvent<T> method in place of the WriteEvents<T> method.  The new piece that has been introduced is the assign function, which defines how we map from the CSV format into POCO (plain old CLR object) format.

This doesn’t seem like a terribly generic way of implementing this, and its not.  The reason I’m using a function delegate, not reflection (via PropertyInfo.SetValue() and Convert.ChangeType()) is that we’re now in performance critical code and those two methods were killing performance.

I’ll write the code for dynamically generating this function in a later blog, but for now if you’re interested in the performance numbers, head on over to my performance blog on this topic.

  1. void Main()
  2. {
  3. CsvSequenceStore store = new CsvSequenceStore();
  4. long count = 0;
  5.  
  6. Action<string[], WeatherReading> assign = (str, e) =>
  7. {
  8. e.StationCode = Int32.Parse(str[0]);
  9. e.Timestamp = DateTime.Parse(str[1]);
  10. e.Temperature = Double.Parse(str[2]);
  11. e.DewpointTemperature = Double.Parse(str[3]);
  12. e.RelativeHumidity = Double.Parse(str[4]);
  13.  
  14. e.WindDirection = Double.Parse(str[5]);
  15. e.WindSpeed = Double.Parse(str[6]);
  16. e.Visibility = Double.Parse(str[7]);
  17. e.Humidex = Double.Parse(str[8]);
  18. e.WindChill = Double.Parse(str[9]);
  19. e.WeatherConditions = Int32.Parse(str[9]);
  20. };
  21. using (FileStream fs = new FileStream(@"C:\temp\weather_data_out.csv.gz", FileMode.Open))
  22. {
  23. using (GZipStream zipStream = new GZipStream(
  24. fs, CompressionMode.Decompress))
  25. {
  26. var evts2 = store.ReadEvents<WeatherReading>(zipStream, assign);
  27. var weatherStream = evts2.Select (e => e.Payload)
  28. .Take(50);
  29. weatherStream.Dump("weather events");
  30. }
  31. }
  32. }

Executing this simple main function gives us the shiny weather events (50 of them, as limited by the Take() method on line 30):

The core of this approach lies in the ReadEvents<T> method, which is remarkably simple given the amount of code we’ve written to get to this point .  Remembering that each line of CSV consists of the event shape (Point, Interval), the Start and End times, then the payload.  We use a wrapper object, TypedEventPayload<T> to encapsulate the payload along with the event characteristics.

  1. public IEnumerable<TypedEventPayload<T>> ReadEvents<T>(Stream stream,
  2. Action<string[], T> assign) where T : new()
  3. {
  4. StreamReader sr = new StreamReader(stream);
  5. String line = null;
  6. var fields = typeof(T).GetProperties();
  7. var lastField = fields.Last();
  8.  
  9. while ((line = sr.ReadLine()) != null)
  10. {
  11. if (line.StartsWith("EventShape,"))
  12. continue;
  13. string[] tokens = line.Split(new char[] { ',' });
  14. TypedEventPayload<T> ep = new TypedEventPayload<T>();
  15. ep.Payload = new T();
  16.  
  17. ep.Shape = (EventShape)Enum.Parse(typeof(EventShape), tokens[0]);
  18. ep.StartTime = new DateTimeOffset(Int64.Parse(tokens[1]), TimeSpan.FromSeconds(0));
  19. if (String.IsNullOrEmpty(tokens[2]))
  20. ep.EndTime = ep.StartTime.AddTicks(1);
  21. else
  22. ep.EndTime = new DateTimeOffset(Int64.Parse(tokens[2]), TimeSpan.FromSeconds(0));
  23. assign(tokens.Skip(3).ToArray(), ep.Payload);
  24.  
  25. yield return ep;
  26. }
  27. }

We now have an IEnumerable<T> which we can convert into a CepStream<T> via use of the ToPointStream method. 

The really critical piece here is returning the events as an IEnumerable<T>, not as an array or list.  If we had to load the entire file into memory before sending a single event to StreamInsight, that wouldn’t be performant from either a memory or a latency perspective.  Instead, as we pull and deserialize events from the CSV file we yield them back.

This will allow the CepStream<T> to consume events as they are raised (which we’ll do in the next step).

Finally, we convert the IEnumerable<T> into a CepStream<T> and run some basic queries over it:

  1. var evts2 = store.ReadEvents<WeatherReading>(zipStream, assign);
  2. var weatherStream = evts2.Select (e => e.Payload)
  3. .ToPointStream(Application, t =>
  4. PointEvent.CreateInsert(t.Timestamp, t),
  5. AdvanceTimeSettings.IncreasingStartTime);
  6. var query = from e in weatherStream
  7. .TumblingWindow(TimeSpan.FromDays(1000), HoppingWindowOutputPolicy.ClipToWindowEnd)
  8. select new
  9. {
  10. Count = e.Count()
  11. };

Excellent – we now have a CepStream<T> – the only remaining step to having a canned data set is simple packaging

Wait!  You’re probably looking at the IEnumerable<UntypedEventPayload> ReadEvents method and thinking to yourself “Mark.. that’s dumb.  That won’t work, because of the IEnumerable state machine, and how Dispose is handled!”.  And you’d be right.

Here’s the problem:

  • The method ReadEvents assumes that someone else is responsible for initializing and disposing of the stream (and any underlying resources – such as the file handle). 
  • IEnumerable, and the corresponding yield statement, create an internal state machine when compiled (i.e. the enumeration uses deferred execution – the behavior we want).  The IEnumerable is not finished with the underlying stream until it has enumerated through all of the available values (in our case read to the end of the file).
  • As such, the lifetime of the Stream needs to be tied to the lifetime of the enumerator (as only the code that manages the enumerator knows when it is complete).
  • Since this design has the stream managed separately from the enumerator, the underlying stream could be closed before the enumerator even begins to evaluate.

Our current code base works, as we force the entire query to complete before exiting the using block that creates the stream.  In a LINQPad context wherein we need to raise the stream from a function and have no direct control over the lifetime of a query this will have the effect of closing the underyling file before any events are read.

We need to redesign our code to encapsulate the file reading into the same class that exposes IEnumerable, and handles stream cleanup correctly.

Reading back the canned data set – the Right Way!

Now, to take the prototype we built out in the last section, and put it in a form suitable for embedding into a library (which the LINQPad context will leverage).  We’ll create a base class, SequenceBase, which encapsulates creating streams from files (including compressed files).  This will allow us to more easily plug in different types of “serializers”.

SequenceBase is very straightforward, simply providing a helper function to create a new stream upon demand (to allow the enumerator to wrap it in a using() statement).

  1. public abstract class SequenceBase
  2. {
  3. private string _fileName;
  4. private bool _compress;
  5. private bool _open;
  6.  
  7. protected Stream GetStream()
  8. {
  9. // If opening the file,Open/read data/allow others to read
  10. // If writing a new file,Create/write data/deny others access
  11. FileMode fMode = (_open) ? FileMode.Open : FileMode.Create;
  12. FileAccess aMode = (_open) ? FileAccess.Read : FileAccess.Write;
  13. FileShare fShare = (_open) ? FileShare.Read : FileShare.None;
  14.  
  15. // Set the compression mode
  16. CompressionMode cMode = (_open) ? CompressionMode.Decompress
  17. : CompressionMode.Compress;
  18.  
  19. // Create the base file stream
  20. var fs = new FileStream(_fileName, fMode, aMode, fShare);
  21. Stream _stream = fs;
  22.  
  23. // If using Gzip compression, wrap the file stream
  24. if (_compress)
  25. {
  26. var zs = new System.IO.Compression.GZipStream(fs, cMode);
  27. _stream = zs;
  28. }
  29. return _stream;
  30. }
  31.  
  32. public SequenceBase(string fileName, bool compress, bool open)
  33. {
  34. _fileName = fileName;
  35. _compress = compress;
  36. _open = open;
  37. }
  38. }

With this as a foundation, our Csv reader class becomes very straightforward to implement – essentially taking the contents of the ReadEvents<T> method from the previous section, and wrapping them up in a class derived from SequenceBase.

  1. public class CsvSequenceReaderTyped<T> :
  2. SequenceBase, IEnumerable<IntervalEvent<T>>
  3. where T : new()
  4. {
  5. protected Action<string[], T> _assign;
  6.  
  7. public CsvSequenceReaderTyped(string fileName, bool compress,
  8. Action<string[], T> assign) : base(fileName, compress, true)
  9. {
  10. _assign = assign;
  11. }
  12.  
  13. public IEnumerator<IntervalEvent<T>> GetEnumerator()
  14. {
  15. using (var stream = GetStream())
  16. {
  17. using (var sr = new StreamReader(stream))
  18. {
  19. String line = null;
  20. var fields = typeof(T).GetProperties();
  21. var lastField = fields.Last();
  22.  
  23. while ((line = sr.ReadLine()) != null)
  24. {
  25. if (line.StartsWith("EventShape,"))
  26. continue;
  27. string[] tokens = line.Split(new char[] { ',' });
  28. var shape = (EventShape)Enum.Parse(typeof(EventShape), tokens[0]);
  29. var startTime = new DateTimeOffset(Int64.Parse(tokens[1]), TimeSpan.FromSeconds(0));
  30. DateTimeOffset endTime;
  31.  
  32. if (String.IsNullOrEmpty(tokens[2]))
  33. endTime = startTime.AddTicks(1);
  34. else
  35. endTime = new DateTimeOffset(Int64.Parse(tokens[2]), TimeSpan.FromSeconds(0));
  36.  
  37. var ep = IntervalEvent<T>.CreateInsert(
  38. startTime, endTime, new T());
  39. _assign(tokens.Skip(3).ToArray(), ep.Payload);
  40.  
  41. yield return ep;
  42. }
  43. }
  44. }
  45. }
  46.  
  47. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  48. {
  49. return GetEnumerator();
  50. }
  51. }

Note that we assume that the assignment Action<string[], T> is handed in when the reader is created.

The really critical piece here is creating (via GetStream()) and disposing of (via the using statement) of our underlying file resources each time (which required that the resource management code – opening and closing files – be merged with the resource using code).

That’s it – now that we have our IEnumerable<T> wrapped up in a class that can close out the underlying file stream on Dispose(), we are ready to wrap it up in a LINQPad data context.

Creating a LINQPad data context which exposes the data set as a CepStream<>

Creating the custom data context will be the shortest section of this blog post .  With a data source (either an IEnumerable<T>, IObservable<T> or a StreamInsight adapter) ready to go, all that stands between you and LINQPad data context goodness is exposing the CepStream<T> object in a class.

Really – that’s it.  Here are the steps:

  • Create a class that derives from StreamInsight Context.
    • StreamInsightContext is defined in StreamInsightLinqPad.Samples.dll, which is typically found in C:\ProgramData\LINQPad\Drivers\DataContext\4.0\StreamInsightLinqPad (3d3a4b0768c9178e)
    • Expose the two streams (weather data and station data), based on the code we explored in the reading back the canned data section).

Or, to look at the code:

  1. public class WeatherContext : StreamInsightContext
  2. {
  3. private static readonly string weatherFile;
  4. private static readonly string stationFile;
  5.  
  6. static WeatherContext()
  7. {
  8. // Look for the data files in the same directory as this assembly
  9. var assemblyDir = Path.GetDirectoryName(
  10. Assembly.GetExecutingAssembly().Location);
  11. weatherFile = Path.Combine(assemblyDir, @"weather_data_out.csv.gz");
  12. stationFile = Path.Combine(assemblyDir, @"station_data_out.csv.gz");
  13. }
  14.  
  15. public WeatherContext(Server server)
  16. : base(server)
  17. { }
  18.  
  19. public CepStream<WeatherReading> SmallWeather
  20. {
  21. get
  22. {
  23. return new CsvSequenceReaderTyped<WeatherReading>
  24. (weatherFile, true, WeatherReading.GetFill()).Take(50)
  25. .ToStream(Application, AdvanceTimeSettings.IncreasingStartTime);
  26. }
  27. }
  28.  
  29. public CepStream<WeatherReading> LargeWeather
  30. {
  31. get
  32. {
  33. return new CsvSequenceReaderTyped<WeatherReading>
  34. (weatherFile, true, WeatherReading.GetFill())
  35. .ToStream(Application, AdvanceTimeSettings.IncreasingStartTime);
  36. }
  37. }
  38. public CepStream<StationData> Stations
  39. {
  40. get
  41. {
  42. return new CsvSequenceReaderTyped<StationData>
  43. (stationFile, true, StationData.GetFill())
  44. .ToStream(Application, AdvanceTimeSettings.IncreasingStartTime)
  45. .AlterEventLifetime(e => new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc),
  46. e => TimeSpan.MaxValue);
  47. }
  48. }
  49. }

There are a few changes from the last section, to clean things up a bit in a project:

  • The assignment functions are returned by static functions on the data types themselves (as opposed to public properties – didn’t want to have a non-StreamInsight compatible data type in the event type definitions).
  • Instead of raising a generic wrapper class and then converting into PointEvent<T>, we instead raise TypedEvent<T> directly from the file (i.e. our enumerator is now IEnumerator<IntervalEvent<T>> GetEnumerator()
  • The file names are inferred to be in the same directory as the context assembly.
  • In the Stations stream, we perform an AlterEventLifetime to convert the list of stations into a reference stream by extending the time from January 1st 2000 (the timestamp of the first record in the data set), to max time.
    • Ordinarily, this would be done with query logic in your application, but for ease of use in LINQPad (i.e. removing an extra step) I did it in the context.

Go ahead and compile the project, to obtain the WeatherData.LinqpadContext.dll.  We’ll load this into LINQPad by

  • Add Connection, then select Microsoft StreamInsight from the list of drivers.
  • Select Custom Context from the Context Kind dropdown.
  • Click on the ellipsis for Assembly Path (…), then browse to the directory containing WeatherData.LinqpadContext.dll.
  • Click OK to finish selecting the new context.

Our weather context should now be available on the connections list.  Before we can start querying these streams, we will need to take our data files (weather_data_out.csv.gz and station_data_out.csv.gz from our first step) and copy them to the assembly directory (in this case, the bin directory for our project).

If you’re not sure which directory you need to place the files in, simply run a query against the data context.  The exception will show you the correct directory. 

Now that we have our custom context ready to go, let’s run some queries.

Running some Queries

With the query context loaded up, let’s go ahead and run some queries against it.

  • From LINQPad, click File, New Query.
  • From the Language drop-down, select C# Statements.
  • From the Database drop-down, select StreamInsight: WeatherContext

Now we’re ready to run some queries.  Let’s start with a couple of basic one to look at the SmallWeather and Stations streams.

 

Excellent!  One last little query to close out our walkthrough of creating a data context – join the reference stream with the small data stream and filter for a specific airport.

Remember – you can execute just the selected text in LINQPad by highlighting it, then pressing F5 (or clicking Query -> Execute), which is why all of the screenshots show highlighted text.