Simple Spring Boot Service to Kubernetes Application: Step 15

Brian Rook
4 min readApr 10, 2020

--

In order publish messages, we need a message broker and add some logic to use the message broker. We’re going to use a cloud based SaaS service as a message broker and use spring cloud stream to interact with it.

Setup the Message Broker

We’ll need a message broker in order to publish and we can stand up a free one at cloudamqp. Go ahead and register an account. Once that is done, create a new instance. We’re going to create a free instance called medium in whatever region you want. When that is done, click on the instance to get the details. We’re going to need the host, user and password from here.

Write the Publisher Logic

Create a new package called com.brianrook.medium.customer.messaging and in that package create 2 more sub packages for mapper and message. First create the message payload in message using a class called CustomerCreatedMessage:

package com.brianrook.medium.customer.messaging.message;

import lombok.Data;

@Data
public class CustomerCreatedMessage {
private Long customerId;
private String firstName;
private String lastName;
private String phoneNumber;
private String email;
}

Create a mapper interface under mapper called CustomerMessageMapper with this content:

package com.brianrook.medium.customer.messaging.mapper;

import com.brianrook.medium.customer.messaging.message.CustomerCreatedMessage;
import com.brianrook.medium.customer.service.model.Customer;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;

@Mapper
public interface CustomerMessageMapper {
CustomerMessageMapper INSTANCE = Mappers.getMapper(CustomerMessageMapper.class);

CustomerCreatedMessage customerToCustomerCreatedMessage(Customer customer);
}

in com.brianrook.medium.customer.config createCustomerCreateBinding:

package com.brianrook.medium.customer.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface CustomerCreateBinding {

@Output("customerCreateChannel")
MessageChannel customerCreate();
}

and in com.brianrook.medium.customer.messaging create CustomerCreatePublisher

package com.brianrook.medium.customer.messaging;

import com.brianrook.medium.customer.messaging.mapper.CustomerMessageMapper;
import com.brianrook.medium.customer.messaging.message.CustomerCreatedMessage;
import com.brianrook.medium.customer.service.model.Customer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(CustomerCreateBinding.class)
public class CustomerCreatePublisher {

@Autowired
CustomerCreateBinding customerCreateBinding;

public void publishCustomerCreate(Customer customer) {
CustomerCreatedMessage customerMessage = CustomerMessageMapper.INSTANCE.customerToCustomerCreatedMessage(customer);
Message<CustomerCreatedMessage> msg = MessageBuilder.withPayload(customerMessage).build();

customerCreateBinding.customerCreate().send(msg);
}
}

and hook it into our service CustomerService:

...@Autowired
CustomerCreatePublisher customerCreatePublisher;
...public Customer saveCustomer(Customer customer) {
if (customerExists(customer))
{
throw new CreateCustomerException(String.format("customer with email: %s already exists", customer.getEmail()));
}
Customer savedCustomer = persistCustomer(customer);
customerCreatePublisher.publishCustomerCreate(savedCustomer);

return savedCustomer;
}

And finally we’ll need to configure our application, add this configuration to application.yaml:

spring:...  cloud:
stream:
bindings:
output:
destination: queue.customer.create
content-type: application/json
rabbitmq:
addresses: amqp://shrimp.rmq.cloudamqp.com/c***j

Here we’re telling spring boot to create a rabbit destination on the instance and where to connect to. Make sure to strip out the username and password from the url and replace with your own instance name.

We also need to add the rabbit binder libraries to the pom.xml

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Testing

Update CustomerControllerTest to test that we can publish this message.

...@Autowired
MessageCollector messageCollector;
@Autowired
AppointmentCreateBinding appointmentCreateBinding;
...@Test
public void testAddCustomerSuccess() throws URISyntaxException, JsonProcessingException {
... //validate AMQP
Message<String> publishedMessage = (Message<String>)messageCollector
.forChannel(customerCreateBinding.customerCreate())
.poll();
assertThat(publishedMessage).isNotNull();
ObjectMapper om = new ObjectMapper().registerModule(new JavaTimeModule());

CustomerCreatedMessage appointmentMessage = om.readValue(
publishedMessage.getPayload(),
CustomerCreatedMessage.class);
assertThat(appointmentMessage.getCustomerId()).isGreaterThan(0l);
}
...@Test
public void testAddCustomerConflict() throws URISyntaxException {
...

//validate AMQP
assertThat(messageCollector
.forChannel(customerCreateBinding.customerCreate())
.isEmpty()).isTrue();
}

Here we’re using the spring application context to hook into the spring bindings used to talk to our AMQP broker. We’re essentially wiring in before the logic to publish a message is executed and pulling the messages out of the pipeline before they get sent. We can also confirm that messages are not sent in the case of errors.

We’ve also added significant functionality so lets update our application version using semantic versioning. Update the pom.xml:

<groupId>com.brianrook.medium</groupId>
<artifactId>medium-customer</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>medium-customer</name>

I was also having trouble with the LoggingAspect trying to log system actions on startup, so I changed the pointcut configuration to look like this, in order to limit what was visible to aspectj:

@Around("within(com.brianrook.medium.customer.controller. .*) " +
"&& within(com.brianrook.medium.customer.service. .*) " +
"&& within(com.brianrook.medium.customer.messaging. .*) " +
"&& within(com.brianrook.medium.customer.dao. .*) ) ")

We can also startup the application locally if we add the following configuration to the application startup config:

-Dspring.profiles.active=herokudb
-Dspring.datasource.username=<db username>
-Dspring.datasource.password=<db password>
-Dspring.rabbitmq.username=<cloudamqp username>
-Dspring.rabbitmq.password=<cloudamqp password>

If we save a customer, we should be able to see a message published in the cloudamqp manager console.

Kubernetes Configuration

This configuration is only valid for running locally for tests though. We need to configure our running application. We can do that by adding to our helm values.yaml file and uploading another secrets file.

values.yaml

...content: |-
spring:
... cloud:
stream:
bindings:
output:
destination: queue.customer.create
content-type: application/json
rabbitmq:
addresses: amqp://shrimp.rmq.cloudamqp.com/c***j
...secretsToEnv:... - name: SPRING_RABBITMQ_PASSWORD
valueFrom:
secretKeyRef:
name: amqp-secrets
key: SPRING_RABBITMQ_PASSWORD
- name: SPRING_RABBITMQ_USERNAME
valueFrom:
secretKeyRef:
name: amqp-secrets
key: SPRING_RABBITMQ_USERNAME

Upload your secrets file

kubectl create secret generic amqp-secrets --from-literal=SPRING_RABBITMQ_PASSWORD=<cloud amqp password> --from-literal=SPRING_RABBITMQ_USERNAME=<cloud amqp user/vhost>

Build and Commit

git checkout -b customer-updates
mvn clean install
git add .
git commit -m "new functionality and publishing"
git push --set-upstream origin customer-updates
git checkout master
git merge customer-updates
git push

Now lets let the build pipeline generate a new chart version and use the updated chart to deploy to minikube

After the build completes go to the helm chart registry and locate the latest version

In this case we want 5

take a look at helm to see what is currently deployed

$ helm list
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
medium default 2 2020-04-04 13:56:58.2124564 -0600 MDT deployed medium-customer-3 0.0.1

update the repositories

helm repo update

now list our repo contents

$ helm search repo codefresh
NAME CHART VERSION APP VERSION DESCRIPTION
codefresh/medium-customer 5 0.0.1 A Helm chart for Kubernetes

we can see that the chart versions match, so we pulled down the latest built version. Lets deploy it.

helm upgrade medium codefresh/medium-customer --version=5

we should be able to verify any of the new functionality we added

GET /customer/all HTTP/1.1
Host: 172.17.144.73:30001
Content-Type: application/json
Content-Type: text/plain

Previous

Next

--

--