AWS Standard SQS Queue with Spring Boot
Greetings, I will talk about Amazon SQS in this article.
What is Amazon SQS?
AWS SQS, or Amazon Web Services Simple Queue Service, is a fully managed message queuing for microservices, distributed systems, and serverless applications
First 1,000,000 Amazon SQS Requests per month are free
Two Types of Queues:
- Standard Queues: Amazon SQS offers standard as the default queue type. Standard queues support a nearly unlimited number of API calls per second, per API action (
SendMessage
,ReceiveMessage
, orDeleteMessage
). Standard queues support at-least-once message delivery. However, occasionally (because of the highly distributed architecture that allows nearly unlimited throughput), more than one copy of a message might be delivered out of order. - FIFO Queues: FIFO (First-In-First-Out) queues have all the capabilities of the standard queues, but are designed to enhance messaging between applications when the order of operations and events is critical, or where duplicates can’t be tolerated.
Tech Stack
- Spring Boot
- AWS SQS
Spring Boot Application
Dependencies
Spring Boot Version: 3.1.0
Java Version: 17
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>1.12.632</version>
</dependency>
</dependencies>
application.properties
I described create access key and secret key in the below post.
aws.access.key=${AWS_ACCESS_KEY:your_access_key}
aws.secret.key=${AWS_SECRET_KEY:your_secret_key}
aws.queueName=spring-aws-queue
SqsConfig
@Configuration
public class SqsConfig {
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
@Bean
public AmazonSQS amazonSQSClient() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}
}
BasicAWSCredentials:
Creates a new BasicAWSCredentials
object using the accessKey
and secretKey
fields. This object is used to authenticate with AWS services.
AmazonSQSClientBuilder:
Begins building an AmazonSQS
client with default settings.
Message
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
private String id;
private String content;
private Date createdAt;
}
Publisher
@Service
@Log4j2
public class Publisher {
@Value("${aws.queueName}")
private String queueName;
private final AmazonSQS amazonSQSClient;
private final ObjectMapper objectMapper;
public Publisher(AmazonSQS amazonSQSClient, ObjectMapper objectMapper) {
this.amazonSQSClient = amazonSQSClient;
this.objectMapper = objectMapper;
}
public void publishMessage(String id) {
try {
GetQueueUrlResult queueUrl = amazonSQSClient.getQueueUrl(queueName);
var message = Message.builder()
.id(id)
.content("message")
.createdAt(new Date()).build();
var result = amazonSQSClient.sendMessage(queueUrl.getQueueUrl(), objectMapper.writeValueAsString(message));
} catch (Exception e) {
log.error("Queue Exception Message: {}", e.getMessage());
}
}
}
amazonSQSClient.sendMessage():
This line sends the created message to the SQS queue.
Consumer
@Service
@Log4j2
public class Consumer {
@Value("${aws.queueName}")
private String queueName;
private final AmazonSQS amazonSQSClient;
public Consumer(AmazonSQS amazonSQSClient) {
this.amazonSQSClient = amazonSQSClient;
}
@Scheduled(fixedDelay = 5000) // It runs every 5 seconds.
public void consumeMessages() {
try {
String queueUrl = amazonSQSClient.getQueueUrl(queueName).getQueueUrl();
ReceiveMessageResult receiveMessageResult = amazonSQSClient.receiveMessage(queueUrl);
if (!receiveMessageResult.getMessages().isEmpty()) {
com.amazonaws.services.sqs.model.Message message = receiveMessageResult.getMessages().get(0);
log.info("Read Message from queue: {}", message.getBody());
amazonSQSClient.deleteMessage(queueUrl, message.getReceiptHandle());
}
} catch (Exception e) {
log.error("Queue Exception Message: {}", e.getMessage());
}
}
}
consumeMessages():
The method runs every 5 seconds. Polls queue messages and deletes read message.
Spring Boot Application Main Class
@SpringBootApplication
@EnableScheduling
public class SpringAwsSqsApplication {
public static void main(String[] args) {
SpringApplication.run(SpringAwsSqsApplication.class, args);
}
@Bean
public ApplicationRunner runner(Publisher publisher) {
return args -> {
Thread.sleep(3000);
for (int i = 0; i < 10; i++) {
publisher.publishMessage(String.valueOf(i));
}
};
}
}
ApplicationRunner runner():
This method creates ApplicationRunner
bean that, when executed, will wait for 3 seconds and then publish 10 messages.
Run the Spring Boot Application
You should see like below logs in console when application run.
GitHub Repository:
https://github.com/mertcakmak2/Medium-Stories-Projects/tree/master/spring-aws-sqs
References:
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html
https://javatodev.com/how-to-use-amazon-sqs-with-spring-boot/