Using StreamInsight 2.1 and IEnumerable to Process SQL Server Data

Posted on August 3, 2012


The StreamInsight team recently released a major new version of their Complex Event Processing engine and I’ve finally gotten a chance to start playing around with it. StreamInsight 2.1 introduced a new programming model that elevated the importance of IEnumerable/IObservable as event sources/sinks and deprioritized the traditional adapter model. In order to truly replace the adapter model with IEnumerable/IObservable objects, we need to prove that we can do equal interactions with sources/sinks. My first test of this is what inspired this post. In this post, I’m going to try and retrieve data (events) stored in a Microsoft SQL Server database.

Before we get started, I’ll let you know that my complete Visual Studio project is available in my Github. Feel free to browse it, fork it, suggest changes or make fun of it.

The first thing I have is a SQL Server database. Let’s assume that server logs are loaded into this database and analyzed at a later time. For each log event, I store an ID, server name, event level (“Information”, “Warning”, “Error”) and the timestamp.


Fortunately for us, the .NET framework makes it relatively easy to get an IEnumerable from a SQL Server result set. In order to write a good LINQ query, I also wanted the results to be in a strongly typed collection. So, I took advantage of the useful Translate operation that comes with the LINQ DataContext class. First, I defined a class that mapped to the database table.

public class ServerEvent
        public int Id { get; set; }
        public string ServerName { get; set; }
        public string Level { get; set; }
        public DateTime Timestamp { get; set; }

In the method defined below (“GetEvents()”), I connect to my database, execute a command, and return a strongly typed IEnumerable collection.

private static IEnumerable<ServerEvent> GetEvents()
   //define connection string
   string connString = "Data Source=.;Initial Catalog=DemoDb;Integrated Security=SSPI;";

   //create enumerable to hold results
   IEnumerable<ServerEvent> result;

   //define dataconext object which is used later for translating results to objects
   DataContext dc = new DataContext(connString);
   //initiate and open connection
   conn = (SqlConnection)dc.Connection;

   //return all events stored in the SQL Server table
   SqlCommand command = new SqlCommand("select ID, ServerName, Level, Timestamp From ServerEvent", conn);
   //get the database results and set the connection to close after results are read
   SqlDataReader dataReader = command.ExecuteReader(System.Data.CommandBehavior.CloseConnection);

   //use "translate" to flip the reader stream to an Enumerable of my custom object type
   result = dc.Translate<ServerEvent>(dataReader);
   return result;

Now let’s take a peek at the StreamInsight code. After creating an embedded server and application (see the Github code for the full source), I instantiated my event source. This command is new in StreamInsight 2.1, and basically, I’m defining a point stream that invokes my “GetEvents()” method and treats each IEnumerable entry as a new point event (“CreateInsert”) with a timestamp derived from the data itself.

//define the (point event) source by creating an enumerable from the GetEvents operation
 var source = app.DefineEnumerable<ServerEvent>(() => GetEvents()).
         ToPointStreamable<ServerEvent, ServerEvent>(
               e => PointEvent.CreateInsert<ServerEvent>(e.Timestamp, e), 

After that, I defined my initial query. This is nothing more than a passthrough query, and doesn’t highlight anything unique to StreamInsight. Baby steps first!

//write LINQ query against event stream
 var query = from ev in source
                    select ev;

Next, I have my event sink, or output. This uses an IObservable which writes the output event to a Console window.

//create observer as sink and write results to console
 var sink = app.DefineObserver(() =>
                   Observer.Create<ServerEvent>(x => Console.WriteLine(x.ServerName + ": " + x.Level)));

Finally, I bind the query to the sink and run it.

//bind the query to the sink
using (IDisposable proc = query.Bind<ServerEvent>(sink).Run("MyProcess"))
       Console.WriteLine("Press [Enter] to close the application.");

When I run the application, I can see each event printed out.


Let’s try something more complicated. Let’s skip to a query that uses both groups and windows and highlights the value of using StreamInsight to process this data. In this 3rd query (you can view the 2nd one in the source code), I group each event based on their event level (e.g. “Error”) and create three minute event windows. The result of this should be a breakdown of each type of event level and a count of occurrences during a given window.

var query3 = from ev in source
                     group ev by ev.Level into levelgroup
                     from win in levelgroup.TumblingWindow(TimeSpan.FromMinutes(3))
                     select new EventSummary
                           EventCount = win.Count(),
                           EventMessage = levelgroup.Key

When I run the application again, I see each grouping and count. Imagine using this data in real-time to detect a emerging trend and proactively prevent a widespread outage.


I have a lot more to learn about how the new object model in StreamInsight 2.1 works, but it looks promising. I previously built a SQL Server StreamInsight adapter that polled a database (for more real-time results), and would love to figure out a way to make that happen with IObservable.

Download StreamInsight 2.1, take a walk through the new product team samples, and let me know if you come up with cool new ways to pull and push data into this engine!

About these ads
Posted in: StreamInsight