Adding Voice To Event Processing Applications Using Microsoft StreamInsight and Twilio

I recently did an in-person demonstration of how to use the cool Twilio service to send voice messages when Microsoft StreamInsight detected a fraud condition. In this blog post, I’ll walk through how I built the StreamInsight adapter, Twilio handler service and plugged it all together.

Here is what I built, with each numbered activity explained below.

2012.06.07twilio01

  1. Expense web application sends events to StreamInsight Austin. I built an ASP.NET web site that I deployed to the Iron Foundry environment that is provided by Tier 3’s Web Fabric offering. This web app takes in expense records from users and sends those events to the yet-to-be-released StreamInsight Austin platform. StreamInsight is Microsoft’s complex event processing engine that is capable of processing hundreds of thousands of events per second through a set of deployed queries. StreamInsight code-named Austin is the Windows Azure hosted version of StreamInsight that will be generally available in the near future. The events are sent by the Expense application to the HTTP endpoint provided by StreamInsight Austin.
  2. StreamInsight adapter triggers a call to the Twilio service. When a query pattern is matched in StreamInsight, the custom output adapter is called. This adapter uses the Twilio SDK for .NET to either initiate a phone call or send an SMS text message.
  3. Twilio service hits a URL that generates the call script. The Twilio VOIP technology works by calling a URL and getting back the Twilio Markup Language (TwiML) that describes what to say to the phone call recipient. Instead of providing a static TwiML (XML) file that instructs Twilio to say the same thing in each phone call, I built a simple WCF Handler Service that takes in URL parameters and returns a customized TwiML message.
  4. Return TwiML message to Twilio service. That TwiML that the WCF service produces is retrieved and parsed by Twilio.
  5. Place phone call to target. When StreamInsight invokes the Twilio service (step 2), it passes in the phone number of the call recipient. Now that Twilio has called the Handler Service and gotten back the TwiML instructions, it can ring the phone number and read the message.

Sound interesting?  I’m going to tackle this in order of execution (from above), not necessary order of construction (where you’d realistically build them in this order: (1) Twilio Handler Service, (2) StreamInsight adapter, (3) StreamInsight application, (4) Expense web site). Let’s dive in.

1. Sending events from the Expense web application to StreamInsight

This site is a simple ASP.NET website that I’ve deployed up to Tier 3’s hosted Iron Foundry environment.

2012.06.07twilio02

Whenever you provision a StreamInsight Austin environment in the current “preview” mode, you get an HTTP endpoint for receiving events into the engine. This HTTP endpoint accepts JSON or XML messages. In my case, I’m throwing a JSON message at the endpoint. Right now the endpoint expects a generic event message, but in the future, we should see StreamInsight Austin be capable of taking in custom event formats.

//pull Austin URL from configuration file
string destination = ConfigurationManager.AppSettings["EventDestinationId"];
//build JSON message consisting of required headers, and data payload
string jsonPayload = "{\"DestinationID\":\"http:\\/\\/sample\\/\",\"Payload\":[{\"Key\":\"CustomerName\",\"Value\":\""+ txtRelatedParty.Text +"\"},{\"Key\":\"InteractionType\",\"Value\":\"Expense\"}],\"SourceID\":\"http:\\/\\/dummy\\/\",\"Version\":{\"_Build\":-1,\"_Major\":1,\"_Minor\":0,\"_Revision\":-1}}";

//update URL with JSON flag
string requestUrl = ConfigurationManager.AppSettings["AustinEndpoint"] + "json?batching=false";
HttpWebRequest request = HttpWebRequest.Create(requestUrl) as HttpWebRequest;

//set HTTP headers
request.Method = "POST";
request.ContentType = "application/json";

using (Stream dataStream = request.GetRequestStream())
 {
     string postBody = jsonPayload;

     // Create POST data and convert it to a byte array.
     byte[] byteArray = Encoding.UTF8.GetBytes(postBody);
     dataStream.Write(byteArray, 0, byteArray.Length);
  }

HttpWebResponse response = null;

try
{
    response = (HttpWebResponse)request.GetResponse();
 }
 catch (Exception ex)
 { }

2. Building the StreamInsight application and Twilio adapter

The Twilio adapter that I built is a “typed adapter” which means that it expects a specific payload. That “Fraud Alert Event” object that the adapter expects looks like this:

public class FraudAlertEvent
    {
        public string CustomerName { get; set; }
        public string ExpenseDate { get; set; }
        public string AlertMessage { get; set; }
    }

Next, I built up the actual adapter. I used NuGet to discover and add the Twilio SDK to my Visual Studio project.

2012.06.07twilio03

Below is the code for my adapter, with comments inline. Basically, I dequeue events that matched the StreamInsight query I deployed, and then use the Twilio API to either initiate a phone call or send a text message.

public class TwilioPointOutputAdapter : TypedPointOutputAdapter
    {
        //member variables
        string acctId = string.Empty;
        string acctToken = string.Empty;
        string url = string.Empty;
        string phoneNum = string.Empty;
        string phoneOrMsg = string.Empty;
        TwilioRestClient twilioProxy;

        public TwilioPointOutputAdapter(AdapterConfig config)
        {
            //set member variables using values from runtime config values
            this.acctId = config.AccountId;
            this.acctToken = config.AuthToken;
            this.phoneOrMsg = config.PhoneOrMessage;
            this.phoneNum = config.TargetPhoneNumber;
            this.url = config.HandlerUrl;
        }

        ///
<summary> /// When the adapter is resumed by the engine, start dequeuing events again /// </summary>
        public override void  Resume()
        {
            DequeueEvent();
        }

        ///
<summary> /// When the adapter is started up, begin dequeuing events /// </summary>
        public override void  Start()
        {
            DequeueEvent();
        }

        ///
<summary> /// Function that pulls events from the engine and calls the Twilio service /// </summary>
        void DequeueEvent()
        {
		var twilioProxy = new TwilioRestClient(this.acctId, this.acctToken);

            while (true)
            {
                try
                {
                    //if the SI engine has issued a command to stop the adapter
                    if (AdapterState.Stopping == AdapterState)
                    {
                        Stopped();

                        return;
                    }

                    //create an event
                    PointEvent currentEvent = default(PointEvent);

                    //dequeue the event from the engine
                    DequeueOperationResult result = Dequeue(out currentEvent);

                    //if there is nothing there, tell the engine we're ready for more
                    if (DequeueOperationResult.Empty == result)
                    {
                        Ready();
                        return;
                    }

                    //if we find an event to process ...
                    if (currentEvent.EventKind == EventKind.Insert)
                    {
                        //append event-specific values to the Twilio handler service URL
                        string urlparams = "?val=0&action=Please%20look%20at%20" + currentEvent.Payload.CustomerName + "%20expenses";

                        //create object that holds call criteria
                        CallOptions opts = new CallOptions();
                        opts.Method = "GET";
                        opts.To = phoneNum;
                        opts.From = "+14155992671";
                        opts.Url = this.url + urlparams;

                        //if a phone call ...
                        if (phoneOrMsg == "phone")
                        {
                            //make the call
                            var call = twilioProxy.InitiateOutboundCall(opts);
                        }
                        else
                        {
                            //send an SMS message
                            var msg = twilioProxy.SendSmsMessage(opts.From, opts.To, "Fraud has occurred with " + currentEvent.Payload.CustomerName);
                        }
                    }
                    //cleanup the event
                    ReleaseEvent(ref currentEvent);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
        }
    }

Next, I created my StreamInsight Austin application. Instead of using the command line sample provided by the StreamInsight team, I created a little WinForm app that handles the provisioning of the environment, the deployment of the query, and the sending of test event messages.

2012.06.07twilio04

The code that deploys the “fraud detection” query takes care of creating the LINQ query, defining the StreamInsight query that uses the Twilio adapter, and starting up the query in the StreamInsight Austin environment. My Expense web application sends events that contain a CustomerName and InteractionType (e.g. “sale”, “complaint”, etc).

private void CreateQueries()
{
		...

		//put inbound events into 30-second windows
     var custQuery = from i in allStream
          group i by new { Name = i.CustomerName, iType = i.InteractionType } into CustomerGroups
          from win in CustomerGroups.TumblingWindow(TimeSpan.FromSeconds(30), HoppingWindowOutputPolicy.ClipToWindowEnd)
          select new { ct = win.Count(), Cust = CustomerGroups.Key.Name, Type = CustomerGroups.Key.iType };

     //if there are more than two expenses for the same company in the window, raise event
     var thresholdQuery = from c in custQuery
                   where c.ct > 2 && c.Type == "Expense"
                   select new FraudAlertEvent
                   {
                          CustomerName = c.Cust,
                          AlertMessage = "Too many expenses!",
                          ExpenseDate = DateTime.Now.ToString()
                    };

      //call DeployQuery which instantiates StreamInsight Query
      Query query5 = DeployQuery(thresholdQuery, "Threshold Query");
       query5.Start();
		...
}

private Query DeployQuery(CepStream queryStream, string queryName)
{
      //setup Twilio adapter configuration settings
      var outputConfig = new AdapterConfig
       {
            AccountId = ConfigurationManager.AppSettings["TwilioAcctID"],
            AuthToken = ConfigurationManager.AppSettings["TwilioAcctToken"],
            TargetPhoneNumber = "+1111-111-1111",
            PhoneOrMessage = "phone",
            HandlerUrl = "http://twiliohandlerservice.ironfoundry.me/Handler.svc/Alert/Expense%20Fraud"
       };

      //add logging message
      lbMessages.Items.Add(string.Format("Creating new query '{0}'...", queryName));

      //define StreamInsight query that uses this output adapter and configuration
      Query query = queryStream.ToQuery(
            queryName,
            "",
            typeof(TwilioAdapterOutputFactory),
            outputConfig,
            EventShape.Point,
            StreamEventOrder.FullyOrdered);

      //return query to caller
      return query;
}

3. Creating the Twilio Handler Service hosted in Tier 3’s Web Fabric environment

If you’re an eagle-eyed reader, you may have noticed my “HandlerUrl” property in the adapter configuration above. That URL points to a public address that the Twilio service uses to retrieve the speaking instructions for a phone call. Since I wanted to create a contextual phone message, I decided to build a WCF service that returns valid TwiML generated on demand. My WCF contract returns an XMLElement and takes in values that help drive the type of content in the TwiML message.

[ServiceContract]
    public interface IHandler
    {
        [OperationContract]
        [WebGet(
            BodyStyle = WebMessageBodyStyle.Bare,
            RequestFormat = WebMessageFormat.Xml,
            ResponseFormat = WebMessageFormat.Xml,
            UriTemplate = "Alert/{thresholdType}?val={thresholdValue}&action={action}"
            )]
        XmlElement GenerateHandler(string thresholdType, string thresholdValue, string action);
    }

The implementation of this service contract isn’t super interesting, but, I’ll include it anyway. Basically, if you provide a “thresholdValue” of zero (e.g. it doesn’t matter what value was exceeded), then I create a TwiML message that uses a woman’s voice to tell the call recipient that a threshold was exceeded and some action is required. If the “thresholdValue” is not zero, then this pleasant woman tells the call recipient about the limit that was exceeded.

        public XmlElement GenerateHandler(string thresholdType, string thresholdValue, string action)
        {
            string xml = string.Empty;

            if (thresholdValue == "0")
            {
                xml = "<!--?xml version='1.0' encoding='utf-8' ?-->" +
            "" +
            "" +
                "The " + thresholdType + " alert was triggered. " + action + "." +
                "" +
            "";
            }
            else
            {
                xml = "<!--?xml version='1.0' encoding='utf-8' ?-->" +
            "" +
            "" +
                "The " + thresholdType + " value is " + thresholdValue + " and has exceeded the threshold limit. " + action + "." +
                "" +
            "";
            }

            XmlDocument d = new XmlDocument();
            d.LoadXml(xml);

            return d.DocumentElement;
        }
    }

I then did a quick push of this web service to my Web Fabric / Iron Foundry environment.

2012.06.07twilio05

I confirmed that my service was online (and you can too as I’ve left this service up) by hitting the URL and seeing valid TwiML returned.

2012.06.07twilio06

4. Test the solution and confirm the phone call

Let’s commit some fraud on my website! I went to my Expense website, and according to my StreamInsight query, if I submitted more than 2 expenses for single client (in this case, “Microsoft”) within a 30 second window, a fraud event should be generated, and I should receive a phone call.

2012.06.07twilio07

After submitting a handful of events, I can monitor the Twilio dashboard and see when a phone call is being attempted and completed.

2012.06.07twilio08

Sure enough, I received a phone call. I captured the audio, which you can listen to here.

Summary

So what did we see? We saw that our Event Processing Engine in the cloud can receive events from public websites and trigger phone/text messages through the sweet Twilio service. One of the key benefits to StreamInsight Austin (vs. an onsite StreamInsight deployment) is the convenience of having an environment that can be easily reached by both on-premises and off-premises (web) applications. This can help you do true real-time monitoring vs. doing batch loads from off-premises apps into the on-premises Event Processing engine. And, the same adapter framework applies to either the onsite or cloud StreamInsight environment, so my Twilio adapter works fine, regardless of deployment model.

The Twilio service provides a very simple way to inject voice into applications. While not appropriate for all cases, obviously, there are a host of interesting use cases that are enhanced by this service. Marrying StreamInsight and Twilio seems like a useful way to make very interactive CEP notifications possible!

Author: Richard Seroter

Richard Seroter is currently the Chief Evangelist at Google Cloud and leads the Developer Relations program. He’s also an instructor at Pluralsight, a frequent public speaker, the author of multiple books on software design and development, and a former InfoQ.com editor plus former 12-time Microsoft MVP for cloud. As Chief Evangelist at Google Cloud, Richard leads the team of developer advocates, developer engineers, outbound product managers, and technical writers who ensure that people find, use, and enjoy Google Cloud. Richard maintains a regularly updated blog on topics of architecture and solution design and can be found on Twitter as @rseroter.

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.