Building a Cloud Application with Spring Cloud Stream and Apache Kafka
The goal of this tutorial is to create a cloud-ready application based on Spring Cloud Stream and Apache Kafka (as a messaging system).
Pre Requirements
- Java 8— download it from here or run this command in Your terminal
sudo apt-get install openjdk-8-jre
- Apache Kafka 1.00 with Scala 2.12— download from here
- Be familiar with Spring Framework
- Oracle VirtualBox — download from here
- Docker-machine — download from here
Configure and run Apache Kafka
Apache Kafka contains of two main components:
- Zookeeper — used for configuration and redistribution of events
- Broker server — broker server used for message consuming/producing
To start you must sequentially execute the following commands:
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server.properties &
If you are using Windows
use scripts from bin\windows
directory.
The configuration files for zookeeper
and broker
define the following main parameters:
zookeeper client
HTTP port (default2181
)broker ID
— unique Kafka server identifier (default0
)zookeeper
connection URL (defaultlocalhost:2181
)broker
listener port (default9092
)
Test Messaging System
To test if everything is running and interconnected we’ll send and receive messages via console
. To do this run the following command in terminal
:
> bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic testing
In a new terminal
launch producer
:
> bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic testing
If you type messages in producer
— these messages should appear in consumer
:
Create Cloud Ready Application
To create this application we will be using the following technologies and languages:
- Java 8
- Spring Boot 1.5.10.RELEASE
- Apache Maven 3.5.2 (or later)
Http-request-consumer Micro-service
This micro-service consumes HTTP requests from users and sends them to Apache Kafka broker.
Application itself is quite simple — there is only one Rest controller
which consumes POST
operations and sends request body to thehttp-message
Kafka topic. Additionally, to each message a prefix
is appended from a centralised configuration server:
package org.cynic.messaging.http.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
private final Source source;
private final String prefix;
public MessageController(Source source, @Value("${http.prefix:''}") String prefix) {
this.source = source;
this.prefix = prefix;
}
@PostMapping("/send-message")
@ResponseStatus(HttpStatus.ACCEPTED)
public void sendMessage(@RequestBody String message) {
source.output().
send(MessageBuilder.
withPayload(String.join("|", prefix, message)).
build()
);
}
}
Event-processor Micro-service
This micro-service consumes events from Apache Kafka Broker and processes them.
The application consumes messages from the http-message
topic. Also note that the binding is using a grouping policy — all consumers belonging to the same group consume exactly one message from the topic:
package org.cynic.messaging.consumer.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Component
public class EventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
private final Sink sink;
public EventProcessor(Sink sink) {
this.sink = sink;
}
@ServiceActivator(inputChannel = Sink.INPUT)
public void processMessage(String message) {
LOGGER.info("--------------------------");
LOGGER.info("{}", message);
LOGGER.info("--------------------------");
}
}
Gateway Micro-service
This micro-service acts as a gateway for all HTTP requests and distributes them to different instances of http-request-consumer
micro-service.
package org.cynic.messaging.gateway;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;
@EnableDiscoveryClient
@EnableZuulProxy
@SpringBootApplication
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
Configuration Micro-service
To store all configuration in a unified centralised way we’ll create a separate application based on Spring Configuration. All configurations will be stored locally (inside micro-service).
Configuration properties of each micro-service are stored in a separate YML
file. Common configuration properties like logging level, error handling and etc. are defined in service\application.yml
.
Service Discovery Micro-Service
To discover and balance requests among instances each micro-service must register itself with service discovery. Spring Cloud using Eureka
as a service registry implementation for this functionality. To enable service discovery defaultZone
must be defined.
package org.cynic.messaging.discovery;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@EnableEurekaServer
@SpringBootApplication
public class DiscoveryApplication {
public static void main(String[] args) {
SpringApplication.run(DiscoveryApplication.class, args);
}
}
As can be seen application is simple and there is no any extra configuration.
Run application
The application must be started in the appropriate sequence:
discovery.jar
— service discoveryconfiguration.jar
— centralized configurationhttp-event-consumer.jar
— bridge micro-service to emit event to Apache Kafka topicevent-consumer.jar
— event consuming application with business logicgateway.jar
— unified gateway to the application via HTTP protocol
Now aHTTP POST
operation can be done to http://127.0.0.1:4000/api/send-message
. The operation body will be used as message content and will be processed in the event-consumer
micro-service.
13:22:07.831 o.c.m.c.s.EventProcessor : -------------------------
13:22:07.831 o.c.m.c.s.EventProcessor : Prefix|body
13:22:07.831 o.c.m.c.s.EventProcessor : -------------------------
Configuration
The gateway
micro-service listens for HTTP POST
operations on port 4000. All requests to /api/**
will be routed to http-request-consumer
micro-service. This configuration is defined in a centralized configuration micro-service configuration
:
zuul:
ignoredServices: '*'
host:
connect-timeout-millis: 60000
socket-timeout-millis: 60000
routes:
api:
path: /api/**
serviceId: http-request-consumer
server:
port: 4000
http-event-consumer
works as a bridge micro-service and sends consumed requests to thehttp-message
topic on Apache Kafka:
spring:
cloud:
stream:
bindings:
output:
destination: http-message
...
...http:
prefix: 'Prefix'
Additionally there is a defined custom configuration property http.prefix
which will be added to each message.
event-processor
consumes messages from thehttp-message
topic and processes them. To ensure exactly one delivery — we introduce thehttp-messages
group. It means that any message inside topic will be consumed by only one event-processor
.
spring:
cloud:
stream:
bindings:
input:
destination: http-message
group: http-messages
...
...
«Dockerize» with docker-compose and docker-machine
To «dockerize» the application, the following essential steps must be performed:
- Create images of each custom artifact — in this case 5 docker images will be created:
registry
,configuration
,gateway
,event-processor
,http-event-consumer
. - Create docker-compose configuration file and define dependencies between them.
- Create Apache Kafka and Zookeeper services to run messaging system inside docker.
- Reconfigure application to communicate with Apache Kafka and Zookeeper inside docker
- Create docker machine virtual host to run all services inside
version: '2.1'
services:
# Eureka
registry
...
environment:
- REGISTRY_SERVICE_ZONE=http://discovery:8080/eureka/
# Configuration service
configuration:
...
environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/
# Zuul
gateway:
...
environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/
- CONFIGURATION_SERVICE_NAME=configuration
# Application
http-request-consumer:
...
environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/
- CONFIGURATION_SERVICE_NAME=configuration
event-processor:
...
environment:
- REGISTRY_SERVICE_ZONE=http://registry:8080/eureka/
- CONFIGURATION_SERVICE_NAME=configuration
#Zookeeper
zookeeper:
image: zookeeper:3.4
...
#Kafka
kafka:
build: ./kafka/
restart: always
environment:
- ZOOKEEPER_IP=zookeeper
depends_on:
zookeeper:
condition: service_started
networks:
- messaging-system-network
...
...
Create docker machine
To create a virtual machine with 4GB RAM
and docker-compose
inside — execute the following command in terminal
:
> docker-machine create -d virtualbox --virtualbox-memory "4096" messaging-system
> docker-machine ssh messaging-system sudo curl -L "https://github.com/docker/compose/releases/download/1.10.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
> docker-machine ssh messaging-system sudo chmod +x /usr/local/bin/docker-compose
Now we have a virtual machine messaging-system
with docker
and docker-compose
and allocated 4GB RAM
.
Next step will be to share the created application with the virtual machine. To do it run Oracle VM VirtualBox
, select messaging-system
and open context menu:
Settings -> Shared Folders
To mount theapp
folder execute:
> docker-machine ssh messaging-system sudo mkdir /mnt/app
> docker-machine ssh messaging-system sudo mount -t vboxsf app /mnt/app
Now the shared folder is accessible inside the virtual machine under /mnt/app
mount point.
Build docker-compose
To build the application execute maven scripts as follows:
> mvn clean package -q -f discovery\pom.xml
> mvn clean package -q -f configuration\pom.xml
> mvn clean package -q -f http-request-consumer\pom.xml
> mvn clean package -q -f event-processor\pom.xml
> mvn clean package -q -f gateway\pom.xml
and to build the docker containers execute the following commands:
> docker-machine ssh messaging-system docker-compose -f /mnt/app/messaging-system.yml build
> docker-machine ssh messaging-system docker-compose -f /mnt/app/messaging-system.yml up -d
Now our application is running inside themessaging-system
docker machine.
Running and testing
There are two exposed ports inside messaging-system
:
- 80 — default port for communication with application
- 8080 — Service Discovery application
To find out themessaging-system
IP address (in my case I got 192.168.99.100
) run:
> docker-machine ip messaging-system
Now we can call the application and see the Service Discovery:
To call the application make a HTTP POST operation with a non-empty body to the address http://192.168.99.100/api/service/send-message
.
Application scale
Our application cloud is ready and can be easily scaled using docker-compose operations:
> docker-machine ssh messaging-system docker-compose -f /mnt/app/messaging-system.yml scale http-request-consumer=2
In the same manner the event-processor
application can be scaled. Due to the usage of Zuul
gateway our application automatically discovers new instances of services and balances requests among them
Links
- GitHub Repository — application source code
- Spring Cloud Stream — message driven project
- Spring Framework — Spring ecosystem landing page
- Spring Integration — Enterprise Integration Patter family implementation
- Apache Kafka — project page and tutorials