Integration in the Cloud: Part 4 – Asynchronous Messaging Pattern

So far in this blog series we’ve been looking at how Enterprise Integration Patterns apply to cloud integration scenarios. We’ve seen that a Shared Database Pattern works well when you have common data (and schema) and multiple consumers who want consistent access.  The Remote Procedure Invocation Pattern is a good fit when one system desires synchronous access to data and functions sitting in other systems. In this final post in the series, I’ll walk through the Asynchronous Messaging Pattern and specifically demonstrate how to share data between clouds using this pattern.

What Is It?

While the remote procedure pattern provides looser coupling than the shared database pattern, it is still a blocking call and not particularly scalable.  Architects and developers use an asynchronous messaging pattern when they want to share data in the most scalable and responsive way possible.  Think of sending an email.  Your email client doesn’t sit and wait until the recipient has received and read the email message.  That would be atrocious. Instead, our email server does a multicast to recipients allows our email client to carry on. This is somewhat similar to publish/subscribe where the publisher does not dictate which specific receiver will get the message.

So in theory, the sender of the message doesn’t need to know where the message will end up.  They also don’t need to know *when* a message is received or processed by another party.  This supports disconnected client scenarios where the subscriber is not online at the same time as the publisher.  It also supports the principle of replicable units where one receiver could be swapped out with no direct impact to the source of the message.  We see this pattern realized in Enterprise Service Bus or Integration Bus products (like BizTalk Server) which promote extreme loose coupling between systems.

Challenges

There are a few challenges when dealing with this pattern.

  • There is no real-time consistency. Because the message source asynchronously shares data that will be processed at the convenience of the receiver, there is a low likelihood that the systems involved are simultaneously consistent.  Instead, you end up with eventual consistency between the players in the messaging solution.
  • Reliability / durability is required in some cases. Without a persistence layer, it is possible to lose data.  Unlike the remote procedure invocation pattern (where exceptions are thrown by the target and both caught and handled by the caller), problems in transmission or target processing do not flow back to the publisher.  What happens if the recipient of a message is offline?  What if the recipient is under heavy load and rejecting new messages? A durable component in the messaging tier can protect against such cases by doing store-and-forward type implementation that doesn’t remove the message from the durable store until it has been successfully consumed.
  • A router may be useful when transmitting messages. Instead of, or in addition to a durable store, a routing component can help manage the central subscriptions for pub/sub transmissions, help with protocol bridging, data transformation and workflow (e.g. something like BizTalk Server). This may not be needed in distributed ESB solutions where the receiver is responsible for most of that.
  • There is limited support for this pattern in packaged software products.  I’ve seen few commercial products that expose asynchronous inbound channels, and even fewer that have easy-to-configure ways to publish outbound events asynchronously.  It’s not that difficult to put adapters in front of these systems, or mimic asynchronous publication by polling a data tier, but it’s not the same.

Cloud Considerations

What are things to consider when doing this pattern in a cloud scenario?

  • To do this between cloud and on-premises solutions, this requires creativity. I showed in the previous post how one can use Windows Azure AppFabric to expose on-premises endpoints to cloud applications. If we need to push data on-premises, and Azure AppFabric isn’t an option, then you’re looking at doing a VPN or internet-facing proxy service. Or, you could rely on aggressive polling of a shared queue (as I’ll show below).
  • Cloud provider limits and architecture will influence solution design. Some vendors, such as Salesforce.com, limit the frequency and amount of polling that it will do. This impacts the ability to poll a durable store used between cloud applications. The distributed nature of cloud services. and embrace of the eventual consistency model, can change how one retrieves data.  For example, Amazon’s Simple Queue Service may not be first-in-first out, and uses a sampling algorithm that COULD result in a query not returning all the messages in the logical queue.

Solution Demonstration

Let’s say that the fictitious Seroter Corporation has a series of public websites and wants a consistent way to push customer inquiries from the websites to back end systems that process these inquiries.  Instead of pushing these inquiries directly into one or many CRM systems, or doing the low-tech email option, we’d rather put all the messages into a queue and let each interested party pull the ones they want.  Since these websites are cloud-hosted, we don’t want to explicitly push these messages into the internal network, but rather, asynchronously publish and poll messages from a shared queue hosted by Amazon Simple Queue Service (SQS). The polling applications could either be another cloud system (CRM system Salesforce.com) or an on-premises system, as shown below.

2011.11.14int01

So I’ll have a web page built using Ruby and hosted in Cloud Foundry, a SQS queue that holds inquiries submitted from that site, and both an on-premises .NET application and a SaaS Salesforce.com application that can poll that queue for messages.

Setting up a queue in SQS is so easy now, that I won’t even make it a sub-section in this post.  The AWS team recently added SQS operations to their Management Console, and they’ve made it very simple to create, delete, secure and monitor queues. I created a new queue named Seroter_CustomerInquiries.

2011.11.14int02

Sending Messages from Cloud Foundry to Amazon Simple Queue Service

In my Ruby (Sinatra) application, I have a page where a user can ask a question.  When they click the submit button, I go into the following routine which builds up the SQS message (similar to the SimpleDB message from my previous post) and posts a message to the queue.

post '/submitted/:uid' do	# method call, on submit of the request path, do the following

   #--get user details from the URL string
	@userid = params[:uid]
	@message = CGI.escape(params[:message])
    #-- build message that will be sent to the queue
	@fmessage = @userid + "-" + @message.gsub("+", "%20")

	#-- define timestamp variable and format
	@timestamp = Time.now
	@timestamp = @timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
	@ftimestamp = CGI.escape(@timestamp)

	#-- create signing string
	@stringtosign = "GET\n" + "queue.amazonaws.com\n" + "/084598340988/Seroter_CustomerInquiries\n" + "AWSAccessKeyId=ACCESS_KEY" + "&Action=SendMessage" + "&MessageBody=" + @fmessage + "&SignatureMethod=HmacSHA1" + "&SignatureVersion=2" + "&Timestamp=" + @ftimestamp + "&Version=2009-02-01"

	#-- create hashed signature
	@esignature = CGI.escape(Base64.encode64(OpenSSL::HMAC.digest('sha1',@@awskey, @stringtosign)).chomp)

	#-- create AWS SQS query URL
	@sqsurl = "https://queue.amazonaws.com/084598340988/Seroter_CustomerInquiries?Action=SendMessage" + "&MessageBody=" + @fmessage + "&Version=2009-02-01" + "&Timestamp=" + @ftimestamp + "&Signature=" + @esignature + "&SignatureVersion=2" + "&SignatureMethod=HmacSHA1" + "&AWSAccessKeyId=ACCESS_KEY"

	#-- load XML returned from query
	@doc = Nokogiri::XML(open(@sqsurl))

   #-- build result message which is formatted string of the inquiry text
	@resultmsg = @fmessage.gsub("%20", " ")

	haml :SubmitResult
end

The hard part when building these demos was getting my signature string and hashing exactly right, so hopefully this helps someone out.

After building and deploying the Ruby site to Cloud Foundry, I could see my page for inquiry submission.

2011.11.14int03

When the user hits the “Send Inquiry” button, the function above is called and assuming that I published successfully to the queue, I see the acknowledgement page.  Since this is an asynchronous communication, my web app only has to wait for publication to the queue, not invoking a function in a CRM system.

2011.11.14int04

To confirm that everything worked, I viewed my SQS queue and can clearly see that I have a single message waiting in the queue.

2011.11.14int05

.NET Application Pulling Messages from an SQS Queue

With our message sitting safely in the queue, now we can go grab it.  The first consuming application is an on-premises .NET app.  In this very feature-rich application, I poll the queue and pull down any messages found.  When working with queues, you often have two distinct operations: read and delete (“peek” is also nice to have). I can read messages from a queue, but unless I delete them, they become available (after a timeout) to another consumer.  For this scenario, we’d realistically want to read all the messages, and ONLY process and delete the ones targeted for our CRM app.  Any others, we simply don’t delete, and they go back to waiting in the queue. I haven’t done that, for simplicity sake, but keep this in mind for actual implementations.

In the example code below, I’m being a bit lame by only expecting a single message. In reality, when polling, you’d loop through each returned message, save its Handle value (which is required when calling the Delete operation) and do something with the message.  In my case, I only have one message, so I explicitly grab the “Body” and “Handle” values.  The code shows the “retrieve messages” button click operation which in turn calls “receive” operation and “delete” operation.

private void RetrieveButton_Click(object sender, EventArgs e)
        {
            lbQueueMsgs.Items.Clear();
            lblStatus.Text = "Status:";

            string handle = ReceiveFromQueue();
            if(handle!=null)
                DeleteFromQueue(handle);

        }

private string ReceiveFromQueue()
        {
            //timestamp formatting for AWS
            string timestamp = Uri.EscapeUriString(string.Format("{0:s}", DateTime.UtcNow));
            timestamp = DateTime.Now.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
            timestamp = HttpUtility.UrlEncode(timestamp).Replace("%3a", "%3A");

            //string for signing
            string stringToConvert = "GET\n" +
            "queue.amazonaws.com\n" +
            "/084598340988/Seroter_CustomerInquiries\n" +
            "AWSAccessKeyId=ACCESS_KEY" +
            "&Action=ReceiveMessage" +
            "&AttributeName=All" +
            "&MaxNumberOfMessages=5" +
            "&SignatureMethod=HmacSHA1" +
            "&SignatureVersion=2" +
            "&Timestamp=" + timestamp +
            "&Version=2009-02-01" +
            "&VisibilityTimeout=15";

            //hash the signature string
			  string awsPrivateKey = "PRIVATE KEY";
            Encoding ae = new UTF8Encoding();
            HMACSHA1 signature = new HMACSHA1();
            signature.Key = ae.GetBytes(awsPrivateKey);
            byte[] bytes = ae.GetBytes(stringToConvert);
            byte[] moreBytes = signature.ComputeHash(bytes);
            string encodedCanonical = Convert.ToBase64String(moreBytes);
            string urlEncodedCanonical = HttpUtility.UrlEncode(encodedCanonical).Replace("%3d", "%3D");

             //build up request string (URL)
            string sqsUrl = "https://queue.amazonaws.com/084598340988/Seroter_CustomerInquiries?Action=ReceiveMessage" +
            "&Version=2009-02-01" +
            "&AttributeName=All" +
            "&MaxNumberOfMessages=5" +
            "&VisibilityTimeout=15" +
            "&Timestamp=" + timestamp +
            "&Signature=" + urlEncodedCanonical +
            "&SignatureVersion=2" +
            "&SignatureMethod=HmacSHA1" +
            "&AWSAccessKeyId=ACCESS_KEY";

            //make web request to SQS using the URL we just built
            HttpWebRequest req = WebRequest.Create(sqsUrl) as HttpWebRequest;
            XmlDocument doc = new XmlDocument();
            using (HttpWebResponse resp = req.GetResponse() as HttpWebResponse)
            {
                StreamReader reader = new StreamReader(resp.GetResponseStream());
                string responseXml = reader.ReadToEnd();
                doc.LoadXml(responseXml);
            }

			 //do bad xpath and grab the body and handle
            XmlNode handle = doc.SelectSingleNode("//*[local-name()='ReceiptHandle']");
            XmlNode body = doc.SelectSingleNode("//*[local-name()='Body']");

            //if empty then nothing there; if not, then add to listbox on screen
            if (body != null)
            {
                //write result
                lbQueueMsgs.Items.Add(body.InnerText);
                lblStatus.Text = "Status: Message read from queue";
                //return handle to calling function so that we can pass it to "Delete" operation
                return handle.InnerText;
            }
            else
            {
                MessageBox.Show("Queue empty");
                return null;
            }
        }

private void DeleteItem(string itemId)
        {
            //timestamp formatting for AWS
            string timestamp = Uri.EscapeUriString(string.Format("{0:s}", DateTime.UtcNow));
            timestamp = DateTime.Now.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
            timestamp = HttpUtility.UrlEncode(timestamp).Replace("%3a", "%3A");

            string stringToConvert = "GET\n" +
            "sdb.amazonaws.com\n" +
            "/\n" +
            "AWSAccessKeyId=ACCESS_KEY" +
            "&Action=DeleteAttributes" +
            "&DomainName=SeroterInteractions" +
            "&ItemName=" + itemId +
            "&SignatureMethod=HmacSHA1" +
            "&SignatureVersion=2" +
            "&Timestamp=" + timestamp +
            "&Version=2009-04-15";

            string awsPrivateKey = "PRIVATE KEY";
            Encoding ae = new UTF8Encoding();
            HMACSHA1 signature = new HMACSHA1();
            signature.Key = ae.GetBytes(awsPrivateKey);
            byte[] bytes = ae.GetBytes(stringToConvert);
            byte[] moreBytes = signature.ComputeHash(bytes);
            string encodedCanonical = Convert.ToBase64String(moreBytes);
            string urlEncodedCanonical = HttpUtility.UrlEncode(encodedCanonical).Replace("%3d", "%3D");

            //build up request string (URL)
            string simpleDbUrl = "https://sdb.amazonaws.com/?Action=DeleteAttributes" +
            "&DomainName=SeroterInteractions" +
            "&ItemName=" + itemId +
            "&Version=2009-04-15" +
            "&Timestamp=" + timestamp +
            "&Signature=" + urlEncodedCanonical +
            "&SignatureVersion=2" +
            "&SignatureMethod=HmacSHA1" +
            "&AWSAccessKeyId=ACCESS_KEY";

            HttpWebRequest req = WebRequest.Create(simpleDbUrl) as HttpWebRequest;

            using (HttpWebResponse resp = req.GetResponse() as HttpWebResponse)
            {
                StreamReader reader = new StreamReader(resp.GetResponseStream());

                string responseXml = reader.ReadToEnd();
            }
        }

When the application runs and pulls the message that I sent to the queue earlier, it looks like this.

2011.11.14int06

Nothing too exciting on the user interface, but we’ve just seen the magic that’s happening underneath. After running this (which included reading and deleting the message), the SQS queue is predictably empty.

Force.com Application Pulling from an SQS Queue

I went ahead and sent another message from my Cloud Foundry app into the queue.

2011.11.14int07

This time, I want my cloud CRM users on Salesforce.com to pull these new inquiries and process them.  I’d like to automatically convert the inquiries to CRM Cases in the system.  A custom class in a Force.com application can be scheduled to execute every interval. To account for that (as the solution below supports both on-demand and scheduled retrieval from the queue), I’ve added a couple things to the code.  Specifically, notice that my “case lookup” class implements the Schedulable interface (which allows it be scheduled through the Force.com administrative tooling) and my “queue lookup” function uses the @future annotation (which allows asynchronous invocation).

Much like the .NET application above, you’ll find operations below that retrieve content from the queue and then delete the messages it finds.  The solution differs from the one above in that it DOES handle multiple messages (not that it loops through retrieved results and calls “delete” for each) and also creates a Salesforce.com “case” for each result.

//implement Schedulable to support scheduling
global class doCaseLookup implements Schedulable
{
	//required operation for Schedulable interfaces
    global void execute(SchedulableContext ctx)
    {
        QueueLookup();
    }

    @future(callout=true)
    public static void QueueLookup()
    {
	  //create HTTP objects and queue namespace
     Http httpProxy = new Http();
     HttpRequest sqsReq = new HttpRequest();
     String qns = 'http://queue.amazonaws.com/doc/2009-02-01/';

     //monkey with date format for SQS query
     Datetime currentTime = System.now();
     String formattedTime = currentTime.formatGmt('yyyy-MM-dd')+'T'+ currentTime.formatGmt('HH:mm:ss')+'.'+ currentTime.formatGmt('SSS')+'Z';
     formattedTime = EncodingUtil.urlEncode(formattedTime, 'UTF-8');

	  //build signing string
     String stringToSign = 'GET\nqueue.amazonaws.com\n/084598340988/Seroter_CustomerInquiries\nAWSAccessKeyId=ACCESS_KEY&' +
			'Action=ReceiveMessage&AttributeName=All&MaxNumberOfMessages=5&SignatureMethod=HmacSHA1&SignatureVersion=2&Timestamp=' +
			formattedTime + '&Version=2009-02-01&VisibilityTimeout=15';
     String algorithmName = 'HMacSHA1';
     Blob mac = Crypto.generateMac(algorithmName, Blob.valueOf(stringToSign),Blob.valueOf(PRIVATE_KEY));
     String macUrl = EncodingUtil.urlEncode(EncodingUtil.base64Encode(mac), 'UTF-8');

	  //build SQS URL that retrieves our messages
     String queueUrl = 'https://queue.amazonaws.com/084598340988/Seroter_CustomerInquiries?Action=ReceiveMessage&' +
			'Version=2009-02-01&AttributeName=All&MaxNumberOfMessages=5&VisibilityTimeout=15&Timestamp=' +
			formattedTime + '&Signature=' + macUrl + '&SignatureVersion=2&SignatureMethod=HmacSHA1&AWSAccessKeyId=ACCESS_KEY';

     sqsReq.setEndpoint(queueUrl);
     sqsReq.setMethod('GET');

     //invoke endpoint
     HttpResponse sqsResponse = httpProxy.send(sqsReq);

     Dom.Document responseDoc = sqsResponse.getBodyDocument();
     Dom.XMLNode receiveResponse = responseDoc.getRootElement();
     //receivemessageresult node which holds the responses
     Dom.XMLNode receiveResult = receiveResponse.getChildElements()[0];

     //for each Message node
     for(Dom.XMLNode itemNode: receiveResult.getChildElements())
     {
        String handle= itemNode.getChildElement('ReceiptHandle', qns).getText();
        String body = itemNode.getChildElement('Body', qns).getText();

        //pull out customer ID
        Integer indexSpot = body.indexOf('-');
        String customerId = '';
        if(indexSpot > 0)
        {
           customerId = body.substring(0, indexSpot);
        }

        //delete this message
        DeleteQueueMessage(handle);

	     //create a new case
        Case c = new Case();
        c.Status = 'New';
        c.Origin = 'Web';
        c.Subject = 'Web request: ' + body;
        c.Description = body;

		 //insert the case record into the system
        insert c;
     }
  }

  static void DeleteQueueMessage(string handle)
  {
	 //create HTTP objects
     Http httpProxy = new Http();
     HttpRequest sqsReq = new HttpRequest();

     //encode handle value associated with queue message
     String encodedHandle = EncodingUtil.urlEncode(handle, 'UTF-8');

	 //format the date
     Datetime currentTime = System.now();
     String formattedTime = currentTime.formatGmt('yyyy-MM-dd')+'T'+ currentTime.formatGmt('HH:mm:ss')+'.'+ currentTime.formatGmt('SSS')+'Z';
     formattedTime = EncodingUtil.urlEncode(formattedTime, 'UTF-8');

		//create signing string
     String stringToSign = 'GET\nqueue.amazonaws.com\n/084598340988/Seroter_CustomerInquiries\nAWSAccessKeyId=ACCESS_KEY&' +
					'Action=DeleteMessage&ReceiptHandle=' + encodedHandle + '&SignatureMethod=HmacSHA1&SignatureVersion=2&Timestamp=' +
					formattedTime + '&Version=2009-02-01';
     String algorithmName = 'HMacSHA1';
     Blob mac = Crypto.generateMac(algorithmName, Blob.valueOf(stringToSign),Blob.valueOf(PRIVATE_KEY));
     String macUrl = EncodingUtil.urlEncode(EncodingUtil.base64Encode(mac), 'UTF-8');

	  //create URL string for deleting a mesage
     String queueUrl = 'https://queue.amazonaws.com/084598340988/Seroter_CustomerInquiries?Action=DeleteMessage&' +
					'Version=2009-02-01&ReceiptHandle=' + encodedHandle + '&Timestamp=' + formattedTime + '&Signature=' +
					macUrl + '&SignatureVersion=2&SignatureMethod=HmacSHA1&AWSAccessKeyId=ACCESS_KEY';

     sqsReq.setEndpoint(queueUrl);
     sqsReq.setMethod('GET');

	  //invoke endpoint
     HttpResponse sqsResponse = httpProxy.send(sqsReq);

     Dom.Document responseDoc = sqsResponse.getBodyDocument();
  }
}

When I view my custom APEX page which calls this function, I can see the button to query this queue.

2011.11.14int08

When I click the button, our function retrieves the message from the queue, deletes that message, and creates a Salesforce.com case.

2011.11.14int09

Cool!  This still required me to actively click a button, but we can also make this function run every hour.  In the Salesforce.com configuration screens, we have the option to view Scheduled Jobs.

2011.11.14int10

To actually create the job itself, I had created an Apex class which schedules the job.

global class CaseLookupJobScheduler
{
    global void CaseLookupJobScheduler() {}

    public static void start()
    {
 		// takes in seconds, minutes, hours, day of month, month and day of week
		//the statement below tries to schedule every 5 min, but SFDC only allows hourly
        System.schedule('Case Queue Lookup', '0 5 1-23 * * ?', new doCaseLookup());
    }
}

Note that I use the System.schedule operation. While my statement above says to schedules the doCaseLookup function to run every 5 minutes, in reality, it won’t.  Salesforce.com restricts these jobs from running too frequently and keeps jobs from running more than once per hour. One could technically game the system by using some of the ten allowable polling jobs to set of a series of jobs that start at different times of the hour. I’m not worrying about that here. To invoke this function and schedule the job, I first went to the System Log menu.

2011.11.14int12

From here, I can execute Apex code.  So, I can call my start() function, which should schedule the job.

2011.11.14int13

Now, if I view the Scheduled Jobs view from the Setup screens, I can see that my job is scheduled.

2011.11.14int14

This job is now scheduled to run every hour.  This means that each hour, the queue is polled and any found messages are added to Salesforce.com as cases.  You could use a mix of both solutions and manually poll if you want to (through a button) but allow true asynchronous processing on all ends.

Summary

Asynchronous messaging is a great way to build scalable, loosely coupled systems. A durable intermediary helps provide assurances of message delivery, but this patterns works without it as well.  The demonstrations in this post shows how two cloud solutions can asynchronously exchange data through the use of a shared queue that sits between them.  The publisher to the queue has no idea who will retrieve the message and the retrievers have no direct connection to those who publish messages.  This makes for a very maintainable solution.

My goal with these posts was to demonstrate that classic Integration patterns work fine in cloudy environments. I think it’s important to not throw out existing patterns just because new technologies are introduced. I hope you enjoyed this series.



Categories: .NET, AWS, Cloud, Cloud Foundry, General Architecture, Salesforce.com, SOA

6 replies

  1. If you have two consumers of the message, how do you decide who gets to delete it from the queue?

    • It’s a good question. If I were aggregating messages from multiple sites, I’d expect that a particular consumer would be looking at message metadata and only deleting the ones they kept. If ANY consumer could process the message, then you’d likely want first-one-wins rules. If multiple parties need the same message, then you might move to more of a pub/sub model where everyone gets their own copy.

Trackbacks

  1. Integration in the Cloud: Part 1 – Introduction « Richard Seroter's Architecture Musings
  2. Windows Azure and Cloud Computing Posts for 11/17/2011+ - Windows Azure Blog
  3. Distributed Weekly 129 — Scott Banwart's Blog
  4. Integration in the Cloud « Vincent-Philippe Lauzon's blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: