How to use the Kafka interface of Azure Event Hubs with Spring Cloud Stream

When I think of the word “imposter” my mind goes to movies where the criminal is revealed after their disguise is removed. But imposters don’t have to be evil geniuses. Sometimes imposters are good. You may buy generic types of pharmaceuticals or swap out beef for a meat-less hamburger. The goal is to get the experience of what you’re after, but through secondary means. In that sense, Azure Event Hubs is a fantastic imposter.

Now to be sure, Azure Event Hubs is a terrific standalone cloud service. If you need to reliably ingest tons of events in a scalable way, there aren’t many (any?) better options.

Microsoft’s gotten into the habit of putting facades onto their cloud services to turn them into credible imposters. For instance, Azure Cosmos DB has its own native interface, but also ones that mimic MongoDB and Apache Cassandra. Azure Event Hubs got into the action by recently adding an Apache Kafka interface. If you have apps or tools that use those interfaces, it’s much easier to adopt the Azure “equivalents” now. Azure isn’t actually offering MongoDB, Cassandra, or Kafka, but their first party services resemble them enough that you don’t have to change code to get the benefits of Azure’s global scale. Good strategy.

Java developers everywhere use Spring Cloud Stream to talk to popular message brokers like RabbitMQ and Apache Kafka. I thought it’d be fun to see if Spring Cloud Stream “just works” with the new Event Hubs interface. LET’S SEE WHAT HAPPENS.

Creating our Azure Event Hub

I started in the Microsoft Azure portal. From there, I chose to add a new Event Hubs namespace which holds all my actual Event Hubs. For kicks, I chose the “basic” pricing tier which allows a single consumer group and a hundred connections. I set the “enable Kafka” flag and configured three throughput units (each Event Hubs partition scales to a maximum of one throughput unit).

2018.05.29-kafka-01

Once I had an Event Hubs namespace, I added an Event Hub to it. I gave the Event Hub a name, and three partitions to spread the data. If I had chosen a plan besides “basic”, I would have also been able to set the message retention period beyond one day.

2018.05.29-kafka-02

That’s it. Might be the simplest possible way to create a Kafka-like service!

Spring Boot project setup

The whole point of Spring Boot is to eliminate boilerplate code and make it easier to focus on building robust apps. For example, you don’t want to mess with all that broker-specific logic when you want to pass messages or events around. Spring Boot and Spring Cloud Stream make it straightforward.

To get going, I went to start.spring.io. Devs create over 800,000 projects per month from here, so I added two more to the mix. My first project (source code here) added Spring Cloud Stream and Web dependencies. This is my message producer. Then I created a second project (source code here) with the Spring Cloud Stream dependency. This one acts as my message consumer.

2018.05.29-kafka-03

This is a Maven project, so I added one more dependency directly to the pom.xml file. This one tells my project to activate the Kafka-specific objects upon application startup.


  org.springframework.cloud
  spring-cloud-stream-binder-kafka

Configuring the producer

With my projects created, it was time to configure them. First up, the producer.

I made this one simple for demo purposes. First, I added Spring Boot annotations on the primary class. The @RestController one exposes annotated operations as HTTP endpoints. The second annotation was @EnableBinding(Source.class) which set this up as a Spring Cloud Stream object that used channels identified in the default “Source” class.

Next, I autowired a Source object that gets autoconfigured by the Spring Boot process at startup. Finally, I defined a method that responds to HTTP POST requests and sends a message to the “output” channel. This is where the message is sent to Event Hubs, but notice that my code is completely unaware of that fact.

@EnableBinding(Source.class)
@RestController
@SpringBootApplication
public class EventHubsKafkaApplication {

  public static void main(String[] args) {
	SpringApplication.run(EventHubsKafkaApplication.class, args);
  }

  @Autowired
  Source mySource;

  @RequestMapping(method=RequestMethod.POST, path="/")
  public String PublishMessage(@RequestBody String company) {
    mySource.output().send(MessageBuilder.withPayload(company).build());

    return "success";
  }
}

That’s all the code I needed to test this out. All that was left for my producer was to configure the application.properties file. This lets me set some of the properties needed to connect to my message broker. Before setting them, I went back to the Microsoft Azure portal and grabbed the connection string for my Event Hub.

2018.05.29-kafka-04
With that connection string in hand, I set up my application.properties file. First, I defined my brokers. In this case, it’s the fully qualified domain name of my Event Hubs namespace. Next I set the channel destination, in this case, the Event Hub named “eh1.” Finally, I configured my security settings, which includes the connection string.

spring.cloud.stream.kafka.binder.brokers=rseroter-eventhubs-tu1-cg1.servicebus.windows.net:9093
spring.cloud.stream.bindings.output.destination=eh1

spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://rseroter-eventhubs-tu1-cg1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

With that, the producer app was done.

Configuring the consumer

The consumer app is even simpler! Its main class also has an @EnableBinding annotation, and this one binds to the channels in the default Sink class. Then, it makes use of the very handy @StreamListener which grabs data from the sink and handles content negotiation.

@EnableBinding(Sink.class)
@SpringBootApplication
public class EventHubsKafkaConsumerApplication {

  public static void main(String[] args) {
    SpringApplication.run(EventHubsKafkaConsumerApplication.class, args);
  }

  @StreamListener(target=Sink.INPUT)
  public void logMessages(String msg) {
    System.out.println("message is: " + msg);
}

This was all the code needed to talk to a message broker or event stream processor and do something when a message arrives. Amazing.

The application properties for the consumer are virtually identical to the producer. The only difference is the second property that refers to the input channel, versus the producer that refers to the output channel.

spring.cloud.stream.kafka.binder.brokers=rseroter-eventhubs-tu1-cg1.servicebus.windows.net:9093
spring.cloud.stream.bindings.input.destination=eh1

spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://rseroter-eventhubs-tu1-cg1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

With that, I had two working apps.

Testing everything

I first ran a simple test. This involved starting up both the producer and the consumer apps. The producer noticed that I had three partitions, and handled accordingly. The consumer recognized the three partitions as well, and joined an anonymous consumer group (as we didn’t specify one).

2018.05.29-kafka-05

I kicked up Postman and posted a JSON message to the REST endpoint exposed by my app. I sent in messages with company names of company-1, company-2, and company-3. You can see here that they show up in my consumer app!

2018.05.29-kafka-08

To make sure this wasn’t some other witchcraft going on, I checked the Microsoft Azure portal, and sure enough, see the connections and messages counted.

2018.05.29-kafka-07

Simple, right?

BONUS: Messing with Kafka Consumer Groups

I wanted to try something else, too. Kafka offers “consumer groups” where a single consumer instance within the group gets the message. This lets you load balance by having multiple instances of your consumer app, without each one getting a copy of the message. Every consumer group gets the message, so the same message may get read many times by different consumer groups. Azure Event Hubs has the same concept, and it behaves the same way. Spring Cloud Stream also offers this abstraction, even when the underlying broker (e.g. RabbitMQ) doesn’t offer it.

I could set the consumer group property directly in the application.properties file. Just add “spring.cloud.stream.bindings.input.group=app1” to it. But I wanted to pass this in at application startup so that I could start up multiple instances of the app with different consumer group values.

I started up an instance of my consumer (java -jar event-hubs-kafka-consumer-0.0.1-SNAPSHOT.jar –spring.cloud.stream.bindings.input.group=app1) and passed in that property. Notice that it read the full log (because this is the first time the consumer group saw it), and the consumer group is indicated in the log.

2018.05.29-kafka-09

Interestingly, when I started a second instance, it didn’t grab what was already read by the consumer group (expected), but when I sent in three more events, BOTH instances got a copy (not expected). Saw some other weirdness when I set the “partitioned=true” on the consumer, and provided an “instanceIndex” number for each consumer app instance.

2018.05.29-kafka-10

What’s ALSO interesting is that even though I set up an Event Hubs namespace for a single consumer group, I can apparently create as many as I want with the Kafka interface. Here, I started up another instance of my consumer, but with a different consumer group ID (java -jar event-hubs-kafka-consumer-0.0.1-SNAPSHOT.jar –spring.cloud.stream.bindings.input.group=app2), and it also read the full log, as you’d expect from a new consumer group. In fact, I created two new ones (app2, app3) and both worked.

2018.05.29-kafka-11

So, it seems like the consumer group limits aren’t applying here. I checked the consumer groups in Azure to see if these were being created behind the scenes, but using the Azure API, I still only saw the $Default one there. I have no idea where the Kafka consumer groups show up but they’re clearly in place. Otherwise, I wouldn’t have seen the correct behavior as each new consumer group came online!

2018.05.29-kafka-12

I’ll chalk it up to one of two possible things: (1) I’m a mediocre programmer so I probably screwed something up, or (2) it’s an alpha product and everything might not be wired up just yet. Or both!

Regardless, it’s VERY simple to try out a Kafka-compatible interface on a cloud-hosted service thanks to Azure Event Hubs and Spring Cloud Stream. Kafka users should keep an eye on Azure Event Hubs as a legit option for a cloud-hosted event stream processor.

Advertisements


Categories: Cloud, Microservices, Microsoft Azure, OSS, Pivotal, Spring

5 replies

  1. I tried running your code and while the producer appears to work (although it never shows any output in the logs), the consumer does not work. It does a few things:
    1. It keeps printing out the ConsumerConfig values every second
    2. After some number of seconds it prints out the errors:
    org.apache.kafka.common.KafkaException: Unexpected error fetching metadata for topic input
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:348) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1489) ~[kafka-clients-1.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getPartitionInfo$3(KafkaMessageChannelBinder.java:577) [spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$2(KafkaTopicProvisioner.java:371) ~[spring-cloud-stream-binder-kafka-core-2.0.2.BUILD-20180802.140206-6.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:370) ~[spring-cloud-stream-binder-kafka-core-2.0.2.BUILD-20180802.140206-6.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getPartitionInfo(KafkaMessageChannelBinder.java:573) [spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:331) [spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:126) [spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:279) [spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:77) [spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:131) [spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$0(BindingService.java:154) [spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_161]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[na:1.8.0_161]
    at java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:1.8.0_161]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_161]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_161]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_161]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_161]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
    Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
    and
    org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:391) ~[spring-cloud-stream-binder-kafka-core-2.0.2.BUILD-20180802.140206-6.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.getPartitionInfo(KafkaMessageChannelBinder.java:573) ~[spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:331) ~[spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:126) ~[spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:279) ~[spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:77) ~[spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:131) ~[spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$0(BindingService.java:154) ~[spring-cloud-stream-2.0.2.BUILD-20180717.170855-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_161]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[na:1.8.0_161]
    at java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:1.8.0_161]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_161]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_161]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_161]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_161]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]
    Caused by: org.apache.kafka.common.KafkaException: Unexpected error fetching metadata for topic input
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:348) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1489) ~[kafka-clients-1.0.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getPartitionInfo$3(KafkaMessageChannelBinder.java:577) ~[spring-cloud-stream-binder-kafka-2.0.2.BUILD-20180802.140413-5.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$2(KafkaTopicProvisioner.java:371) ~[spring-cloud-stream-binder-kafka-core-2.0.2.BUILD-20180802.140206-6.jar:2.0.2.BUILD-SNAPSHOT]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:370) ~[spring-cloud-stream-binder-kafka-core-2.0.2.BUILD-20180802.140206-6.jar:2.0.2.BUILD-SNAPSHOT]
    … 16 common frames omitted
    Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
    Do you know what’s wrong?

  2. Hey Richard –

    I’m on the Event Hubs team, just wanted to say thanks for trying out the Kafka protocol head back when it was in preview. GA deployments now support Kafka topic and Kafka consumer group auto-creation, and while max limit quotas apply to topics, but consumer groups aren’t limited – so we don’t actually expose Kafka consumer groups in the same way that regular EH consumer groups are.

    Again, thanks for testing the feature out and writing this great article. Feel free to reach out if you’ve got questions.

    Arthur

Trackbacks

  1. Here’s where you’ll find me over the Summer - BizTalkGurus
  2. Microsoft Integration Weekly Update: June 04, 2018 | Hooking Stuffs Together

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: