Building a Cloud Application with Spring Cloud Stream and Apache Kafka

Kiril Nugmanov
Zenitech
Published in
7 min readFeb 13, 2018

The goal of this tutorial is to create a cloud-ready application based on Spring Cloud Stream and Apache Kafka (as a messaging system).

Pre Requirements

  • Java 8— download it from here or run this command in Your terminal
sudo apt-get install openjdk-8-jre
  • Apache Kafka 1.00 with Scala 2.12— download from here
  • Be familiar with Spring Framework
  • Oracle VirtualBox — download from here
  • Docker-machine — download from here

Configure and run Apache Kafka

Apache Kafka contains of two main components:

  • Zookeeper — used for configuration and redistribution of events
  • Broker server — broker server used for message consuming/producing

To start you must sequentially execute the following commands:

> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server.properties &

If you are using Windows use scripts from bin\windows directory.

The configuration files for zookeeper and broker define the following main parameters:

  • zookeeper client HTTP port (default 2181)
  • broker ID — unique Kafka server identifier (default 0)
  • zookeeper connection URL (default localhost:2181)
  • broker listener port (default 9092)

Test Messaging System

To test if everything is running and interconnected we’ll send and receive messages via console. To do this run the following command in terminal:

> bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic testing
Running consumer

In a new terminal launch producer:

> bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic testing
Running producer

If you type messages in producer— these messages should appear in consumer:

Running consumer and producer in console mode

Create Cloud Ready Application

To create this application we will be using the following technologies and languages:

  • Java 8
  • Spring Boot 1.5.10.RELEASE
  • Apache Maven 3.5.2 (or later)

Http-request-consumer Micro-service

This micro-service consumes HTTP requests from users and sends them to Apache Kafka broker.

Application itself is quite simple — there is only one Rest controller which consumes POST operations and sends request body to thehttp-message Kafka topic. Additionally, to each message a prefix is appended from a centralised configuration server:

package org.cynic.messaging.http.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

private final Source source;

private final String prefix;

public MessageController(Source source, @Value("${http.prefix:''}") String prefix) {
this.source = source;
this.prefix = prefix;
}

@PostMapping("/send-message")
@ResponseStatus(HttpStatus.ACCEPTED)
public void sendMessage(@RequestBody String message) {
source.output().
send(MessageBuilder.
withPayload(String.join("|", prefix, message)).
build()
);
}
}

Event-processor Micro-service

This micro-service consumes events from Apache Kafka Broker and processes them.

The application consumes messages from the http-message topic. Also note that the binding is using a grouping policy — all consumers belonging to the same group consume exactly one message from the topic:

package org.cynic.messaging.consumer.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class EventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
private final Sink sink;

public EventProcessor(Sink sink) {
this.sink = sink;
}

@ServiceActivator(inputChannel = Sink.INPUT)
public void processMessage(String message) {
LOGGER.info("--------------------------");
LOGGER.info("{}", message);
LOGGER.info("--------------------------");
}
}

Gateway Micro-service

This micro-service acts as a gateway for all HTTP requests and distributes them to different instances of http-request-consumer micro-service.

package org.cynic.messaging.gateway;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;

@EnableDiscoveryClient
@EnableZuulProxy
@SpringBootApplication
public class GatewayApplication {

public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}

Configuration Micro-service

To store all configuration in a unified centralised way we’ll create a separate application based on Spring Configuration. All configurations will be stored locally (inside micro-service).

Configuration properties of each micro-service are stored in a separate YML file. Common configuration properties like logging level, error handling and etc. are defined in service\application.yml.

Service Discovery Micro-Service

To discover and balance requests among instances each micro-service must register itself with service discovery. Spring Cloud using Eureka as a service registry implementation for this functionality. To enable service discovery defaultZone must be defined.

package org.cynic.messaging.discovery;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@EnableEurekaServer
@SpringBootApplication
public class DiscoveryApplication {

public static void main(String[] args) {
SpringApplication.run(DiscoveryApplication.class, args);
}
}

As can be seen application is simple and there is no any extra configuration.

Run application

The application must be started in the appropriate sequence:

  1. discovery.jar — service discovery
  2. configuration.jar — centralized configuration
  3. http-event-consumer.jar — bridge micro-service to emit event to Apache Kafka topic
  4. event-consumer.jar — event consuming application with business logic
  5. gateway.jar — unified gateway to the application via HTTP protocol

Now aHTTP POST operation can be done to http://127.0.0.1:4000/api/send-message. The operation body will be used as message content and will be processed in the event-consumer micro-service.

13:22:07.831 o.c.m.c.s.EventProcessor    : -------------------------
13:22:07.831 o.c.m.c.s.EventProcessor : Prefix|body
13:22:07.831 o.c.m.c.s.EventProcessor : -------------------------

Configuration

The gateway micro-service listens for HTTP POST operations on port 4000. All requests to /api/** will be routed to http-request-consumer micro-service. This configuration is defined in a centralized configuration micro-service configuration:

zuul:
ignoredServices: '*'
host:
connect-timeout-millis:
60000
socket-timeout-millis: 60000
routes:
api:
path:
/api/**
serviceId: http-request-consumer
server:
port:
4000

http-event-consumer works as a bridge micro-service and sends consumed requests to thehttp-message topic on Apache Kafka:

spring:
cloud:
stream:
bindings:
output:
destination:
http-message
...
...
http:
prefix: 'Prefix'

Additionally there is a defined custom configuration property http.prefix which will be added to each message.

event-processor consumes messages from thehttp-message topic and processes them. To ensure exactly one delivery — we introduce thehttp-messages group. It means that any message inside topic will be consumed by only one event-processor.

spring:
cloud:
stream:
bindings:
input:
destination:
http-message
group: http-messages
...
...

«Dockerize» with docker-compose and docker-machine

To «dockerize» the application, the following essential steps must be performed:

  1. Create images of each custom artifact — in this case 5 docker images will be created: registry, configuration, gateway, event-processor, http-event-consumer.
  2. Create docker-compose configuration file and define dependencies between them.
  3. Create Apache Kafka and Zookeeper services to run messaging system inside docker.
  4. Reconfigure application to communicate with Apache Kafka and Zookeeper inside docker
  5. Create docker machine virtual host to run all services inside
version: '2.1'
services:
# Eureka
registry
...
environment:
- REGISTRY_SERVICE_ZONE=http://discovery:8080/eureka/


# Configuration service
configuration:
...

environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/

# Zuul
gateway:
...
environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/
- CONFIGURATION_SERVICE_NAME=configuration

# Application
http-request-consumer:
...

environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/
- CONFIGURATION_SERVICE_NAME=configuration

event-processor:
...

environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/
- CONFIGURATION_SERVICE_NAME=configuration

#Zookeeper
zookeeper:
image:
zookeeper:3.4
...

#Kafka
kafka:
build:
./kafka/
restart: always
environment:
- ZOOKEEPER_IP=zookeeper
depends_on:
zookeeper:
condition:
service_started
networks:
- messaging-system-network
...
...

Create docker machine

To create a virtual machine with 4GB RAM and docker-compose inside — execute the following command in terminal:

> docker-machine create -d virtualbox --virtualbox-memory "4096" messaging-system
> docker-machine ssh messaging-system sudo curl -L "https://github.com/docker/compose/releases/download/1.10.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
> docker-machine ssh messaging-system sudo chmod +x /usr/local/bin/docker-compose

Now we have a virtual machine messaging-system with docker and docker-compose and allocated 4GB RAM.

Next step will be to share the created application with the virtual machine. To do it run Oracle VM VirtualBox, select messaging-system and open context menu:

Settings -> Shared Folders
Create shared folder in virtual machine

To mount theapp folder execute:

> docker-machine ssh messaging-system sudo mkdir /mnt/app
> docker-machine ssh messaging-system sudo mount -t vboxsf app /mnt/app

Now the shared folder is accessible inside the virtual machine under /mnt/app mount point.

Build docker-compose

To build the application execute maven scripts as follows:

> mvn clean package -q -f discovery\pom.xml
> mvn clean package -q -f configuration\pom.xml
> mvn clean package -q -f http-request-consumer\pom.xml
> mvn clean package -q -f event-processor\pom.xml
> mvn clean package -q -f gateway\pom.xml

and to build the docker containers execute the following commands:

> docker-machine ssh messaging-system docker-compose -f /mnt/app/messaging-system.yml build
> docker-machine ssh messaging-system docker-compose -f /mnt/app/messaging-system.yml up -d

Now our application is running inside themessaging-system docker machine.

Running and testing

There are two exposed ports inside messaging-system:

  • 80 — default port for communication with application
  • 8080 — Service Discovery application

To find out themessaging-system IP address (in my case I got 192.168.99.100) run:

> docker-machine ip messaging-system

Now we can call the application and see the Service Discovery:

Registered applications — CONFIGURATION and HTTP-REQUEST-CONSUMER

To call the application make a HTTP POST operation with a non-empty body to the address http://192.168.99.100/api/service/send-message.

Application scale

Our application cloud is ready and can be easily scaled using docker-compose operations:

> docker-machine ssh messaging-system docker-compose -f /mnt/app/messaging-system.yml scale http-request-consumer=2
Scaled HTTP-REQUEST-CONSUMER

In the same manner the event-processor application can be scaled. Due to the usage of Zuul gateway our application automatically discovers new instances of services and balances requests among them

Links

--

--