Event Processing in the Cloud with StreamInsight Austin: Part I-Building an Azure AppFabric Adapter

StreamInsight is Microsoft’s (complex) event processing engine which takes in data and does in-memory pattern matching with the goal of uncovering real-time insight into information.  The StreamInsight team at Microsoft recently announced their upcoming  capability (code named “Austin”) to deploy StreamInsight applications to the Windows Azure cloud.  I got my hands on the early bits for Austin and thought I’d walk through an example of building, deploying and running a cloud-friendly StreamInsight application.  You can find the source code here.

You may recall that the StreamInsight architecture consists of input/output adapters and any number of “standing queries” that the data flows over.  In order for StreamInsight Austin to be effective, you need a way for the cloud instance to receive input data.  For instance, you could choose to poll a SQL Azure database or pull in a massive file from an Amazon S3 bucket.  The point is that the data needs to be internet accessible.  If you wish to push data into StreamInsight, then you must expose some sort of endpoint on the Azure instance running StreamInsight Austin.  Because we cannot directly host a WCF service on the StreamInsight Austin instance, our best bet is to use Windows Azure AppFabric to receive events.  In this post, I’ll show you how to build an Azure AppFabric adapter for StreamInsight.  In the next post, I’ll walk through the steps to deploy the on-premises StreamInsight application to Windows Azure and StreamInsight Austin.

As a reference point, the final solution looks like the picture below.  I have a client application which calls an Azure AppFabric Service Bus endpoint started up by StreamInsight Austin, and then take the output of StreamInsight query and send it through an adapter to an Azure AppFabric Service Bus endpoint that relays the message to a subscribing service.

2011.7.5streaminsight18

I decided to use the product team’s WCF sample adapter as a foundation for my Azure AppFabric Service Bus adapter.  However, I did make a number of changes in order to simplify it a bit. I have one Visual Studio project that contains shared objects such as the input WCF contract, output WCF contract and StreamInsight Point Event structure.  The Point Event stores a timestamp and dictionary for all the payload values.

[DataContract]
    public struct WcfPointEvent
    {
        ///
 /// Gets the event payload in the form of key-value pairs. ///
        [DataMember]
        public Dictionary Payload { get; set; }

        ///
 /// Gets the start time for the event. ///
        [DataMember]
        public DateTimeOffset StartTime { get; set; }

        ///
 /// Gets a value indicating whether the event is an insert or a CTI. ///
        [DataMember]
        public bool IsInsert { get; set; }
    }

Each receiver of the StreamInsight event implements the following WCF interface contract.

[ServiceContract]
    public interface IPointEventReceiver
    {
        ///
 /// Attempts to dequeue a given point event. The result code indicates whether the operation /// has succeeded, the adapter is suspended -- in which case the operation should be retried later -- /// or whether the adapter has stopped and will no longer return events. ///
        [OperationContract]
        ResultCode PublishEvent(WcfPointEvent result);
    }

The service clients which send messages to StreamInsight via WCF must conform to this interface.

[ServiceContract]
    public interface IPointInputAdapter
    {
        ///
 /// Attempts to enqueue the given point event. The result code indicates whether the operation /// has succeeded, the adapter is suspended -- in which case the operation should be retried later -- /// or whether the adapter has stopped and can no longer accept events. ///
        [OperationContract]
        ResultCode EnqueueEvent(WcfPointEvent wcfPointEvent);
    }

I built a WCF service (which will be hosted through the Windows Azure AppFabric Service Bus) that implements the IPointEventReceiver interface and prints out one of the values from the dictionary payload.

public class ReceiveEventService : IPointEventReceiver
    {
        public ResultCode PublishEvent(WcfPointEvent result)
        {
            WcfPointEvent receivedEvent = result;
            Console.WriteLine("Event received: " + receivedEvent.Payload["City"].ToString());

            result = receivedEvent;
            return ResultCode.Success;
        }
    }

Now, let’s get into the StreamInsight Azure AppFabric adapter project.  I’ve defined a “configuration object” which holds values that are passed into the adapter at runtime.  These include the service address to host (or consume) and the password used to host the Azure AppFabric service.

public struct WcfAdapterConfig
    {
        public string ServiceAddress { get; set; }
        public string Username { get; set; }
        public string Password { get; set; }
    }

Both the input and output adapters have the required factory classes and the input adapter uses the declarative CTI model to advance the application time.  For the input adapter itself, the constructor is used to initialize adapter values including the cloud service endpoint.

public WcfPointInputAdapter(CepEventType eventType, WcfAdapterConfig configInfo)
{
this.eventType = eventType;
this.sync = new object();

// Initialize the service host. The host is opened and closed as the adapter is started
// and stopped.
this.host = new ServiceHost(this);
//define cloud binding
BasicHttpRelayBinding cloudBinding = new BasicHttpRelayBinding();
//turn off inbound security
cloudBinding.Security.RelayClientAuthenticationType = RelayClientAuthenticationType.None;

//add endpoint
ServiceEndpoint endpoint = host.AddServiceEndpoint((typeof(IPointInputAdapter)), cloudBinding, configInfo.ServiceAddress);
//define connection binding credentials
TransportClientEndpointBehavior cloudConnectBehavior = new TransportClientEndpointBehavior();
cloudConnectBehavior.CredentialType = TransportClientCredentialType.SharedSecret;
cloudConnectBehavior.Credentials.SharedSecret.IssuerName = configInfo.Username;
cloudConnectBehavior.Credentials.SharedSecret.IssuerSecret = configInfo.Password;
endpoint.Behaviors.Add(cloudConnectBehavior);

// Poll the adapter to determine when it is time to stop.
this.timer = new Timer(CheckStopping);
this.timer.Change(StopPollingPeriod, Timeout.Infinite);
}

On “Start()” of the adapter, I start up the WCF host (and connect to the cloud).  My Timer checks the state of the adapter and if the state is “Stopping”, the WCF host is closed.  When the “EnqueueEvent” operation is called by the service client, I create a StreamInsight point event and take all of the values in the payload dictionary and populate the typed class provided at runtime.

foreach (KeyValuePair keyAndValue in payload)
 {
       //populate values in runtime class with payload values
       int ordinal = this.eventType.Fields[keyAndValue.Key].Ordinal;
       pointEvent.SetField(ordinal, keyAndValue.Value);
  }
 pointEvent.StartTime = startTime;

 if (Enqueue(ref pointEvent) == EnqueueOperationResult.Full)
 {
        Ready();
 }

There is a fair amount of other code in there, but those are the main steps.  As for the output adapter, the constructor instantiates the WCF ChannelFactory for the IPointEventReceiver contract defined earlier.  The address passed in via the WcfAdapterConfig is applied to the Factory.  When StreamInsight invokes the Dequeue operation of the adapter, I pull out the values from the typed class and put them into the payload dictionary of the outbound message.

// Extract all field values to generate the payload.
result.Payload = this.eventType.Fields.Values.ToDictionary(
        f => f.Name,
        f => currentEvent.GetField(f.Ordinal));

//publish message to service
client = factory.CreateChannel();
client.PublishEvent(result);
((IClientChannel)client).Close();

I now have complete adapters to listen to the Azure AppFabric Service Bus and publish to endpoints hosted on the Azure AppFabric Service Bus.

I’ll now build an on-premises host to test that it all works.  If it does, then the solution can easily be transferred to StreamInsight Austin for cloud hosting.  I first defined the typed class that defines my event.

public class OrderEvent
    {
        public string City { get; set; }
        public string Product { get; set; }
    }

Recall that my adapter doesn’t know about this class.  The adapter works with the dictionary object and the typed class is passed into the adapter and translated at runtime.  Next up is setup for the StreamInsight host.  After creating a new embedded application, I set up the configuration object representing both the input WCF service and output WCF service.

//create reference to embedded server
using (Server server = Server.Create("RSEROTER"))
{

		//create WCF service config
     WcfAdapterConfig listenWcfConfig = new WcfAdapterConfig()
      {
          Username = "ISSUER",
          Password = "PASSWORD",
          ServiceAddress = "https://richardseroter.servicebus.windows.net/StreamInsight/RSEROTER/InputAdapter"
       };

     WcfAdapterConfig subscribeWcfConfig = new WcfAdapterConfig()
     {
           Username = string.Empty,
           Password = string.Empty,
           ServiceAddress = "https://richardseroter.servicebus.windows.net/SIServices/ReceiveEventService"
     };

     //create new application on the server
     var myApp = server.CreateApplication("DemoEvents");

     //get reference to input stream
     var inputStream = CepStream.Create("input", typeof(WcfInputAdapterFactory), listenWcfConfig, EventShape.Point);

     //first query
     var query1 = from i in inputStream
                            select i;

     var siQuery = query1.ToQuery(myApp, "SI Query", string.Empty, typeof(WcfOutputAdapterFactory), subscribeWcfConfig, EventShape.Point, StreamEventOrder.FullyOrdered);

     siQuery.Start();
    Console.WriteLine("Query started.");

    //wait for keystroke to end
    Console.ReadLine();

    siQuery.Stop();
    host.Close();
    Console.WriteLine("Query stopped. Press enter to exit application.");
    Console.ReadLine();

This is now a fully working, cloud-connected, onsite StreamInsight application.  I can take in events from any internal/external service caller and publish output events to any internal/external service.  I find this to be a fairly exciting prospect.  Imaging taking events from your internal Line of Business systems and your external SaaS systems and looking for patterns across those streams.

Looking for the source code?  Well here you go.  You can run this application today, whether you have StreamInsight Austin or not.  In the next post, I’ll show you how to take this application and deploy it to Windows Azure using StreamInsight Austin.

Author: Richard Seroter

Richard Seroter is currently the Chief Evangelist at Google Cloud and leads the Developer Relations program. He’s also an instructor at Pluralsight, a frequent public speaker, the author of multiple books on software design and development, and a former InfoQ.com editor plus former 12-time Microsoft MVP for cloud. As Chief Evangelist at Google Cloud, Richard leads the team of developer advocates, developer engineers, outbound product managers, and technical writers who ensure that people find, use, and enjoy Google Cloud. Richard maintains a regularly updated blog on topics of architecture and solution design and can be found on Twitter as @rseroter.