Streaming database data to .csv file efficiently— A practical implementation using Spring Boot and HikariCP
In this article I am going to explain how to download large files by streaming data directly from a single database using a main and a secondary connection pool.
Before that, I would like to explain some base concepts and to contextualize our implementation.
Introduction
Connection Pooling Frameworks - HikariCP
In production environments where we expect thousands of concurrent requests from clients, opening and closing database connections for every single request can cause the database to perform poorly.
We can resolve this problem by pooling connections from clients. Instead of creating a new connection with every request, connection poolers reuse existing limited connections. It prevents the overhead of creating a new connection to the database.
By just simply implementing a database connection container, we can effectively save the cost of performing a huge number of expensive database trips, hence boosting the overall performance of our database-driven applications.
HikariCP is a very lightweight (at roughly 130Kb) and lightning-fast JDBC connection pooling framework developed by Brett Wooldridge around 2012. HikariCP is already included in spring-boot-starter-data-jpa and spring-boot-starter-jdbc packages.
Contextualization
Imagine you are building an application backend and you are required to implement:
- Core CRUD operations.
- Downloads — through database streaming.
- Executing import operations involving parsing and inserting large .csv datasets.
- Scheduled tasks over large database table rows.
Implementing these types of throughput sensitive features without thinking about the usage of your connection pool can be problematic, considering that clients are concurrently executing transactions over your application every second.
Have you heard of Connection is not available, request timed out after x ms… ?
As already stated at the beginning of this article, connection pools are optimised through frameworks like HikariCP to create, maintain and deliver a limited number of database connections that are supposed to be used and returned to the pool in milliseconds. If you make use of those limited connections for long-running operations, you are going to exhaust your connections from the pool facing timeout exceptions.
Database health checks through application utilize these connections of course, so what do you think is going to happen when Load Balancers in your Cloud Infrastructure cannot obtain a connection to check if your application and database are healthy.
Mayday, we’re going down.
Let’s just add more connections!
Please, don’t (or at least do it consciously regarding the available hardware)
Creating new connections just for the sake of it implies high memory allocation and requires username, password, TSL specifications in order to authenticate on every creation, causing overhead in your application and database.
On the other hand, the number of connections are strictly related to the number of cores of your database CPU.
“You want a small pool, saturated with threads waiting for connections”.
Extract from an excellent recommended article about pool sizing: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing
So what can we do?
OLAP and OLTP separation is what you are looking for, you may think, and you are right.
But let us imagine for a moment you’re working on a recently founded start-up where Analytical and Transactional division through properly ETL processes in different datasources is not an option (at least at this point). Start-ups’ reality (small development teams, low budgets, limited schedule, limited resources) sometimes does not allow us to take the risk of an implementation that jeopardies delivering visible value to customers by making architectural redefinitions like these, consuming time, hard work and money.
A solution
Why don’t we distribute operations into multiple pools?
Let’s define a Main Connection Pool, responsible for resolving all CRUD Core transactions, and a secondary fail-fast Processing Connection Pool that we can invoke to execute operations like downloads, imports or scheduled tasks, emulating an OLTP/OLAP distribution.
If all the connections of your secondary pool get exhausted downloading, importing or executing scheduled tasks, your main pool would never be affected and your application can remain healthy and operational for its main purpose.
You would also be able to configure different attributes like timeouts or transaction isolation levels per connection pool.
Yes… this implementation has a hardware resource limit. As stated, we cannot just add connections infinitely. An increase either on the demand of the Processing Pool or the Main Pool would probably mean a hardware upgrade, more connections required, and more speed resolving database processes in order to return connections to the pool as fast as possible.
Code please…
GitHub Repository: https://github.com/msampietro/spring-download-multiple-pools
Our example is a known Actor-Movie API (check out ERD diagram). The purpose of the example is to trigger a download of a complete list of actors, initialized in 250k rows and a complete list of movies, initialized in 550k rows, in the browser.
Instead of copying and pasting raw code lines, we are going to refer to class names and specific code fragments in order to be as clear as possible. For setup, project structure and specific class detailed explanation please check out the repo README.md
Configuration Classes
Let’s start defining our connection pool configuration classes:
MainDatasourceConfig.java(Primary) and ProcessingDatasourceConfig.java (Secondary)
- MainDatasourceConfig.java
public class MainDatasourceConfig {...} -> THE MAIN DATASOURCE AND CONNECTION POOL
In this class we are defining the main Datasource and a fresh new EntityManager and TransactionManager based on it.
I will explain some important methods.
protected static HikariConfig buildHikariConfig(Map<String, Object> dataSourceProperties) {...}
buildHikariConfig(…) is manually referencing application.properties file properties to build the HikariConfig object needed to instantiate the Datasource. (You can do the same directly from the properties file itself if you want to, but I prefer it to be programmatically. We are going to reuse this method later).
The most important properties to consider in this method (for now) are: the pool name “Hikari-1-Main”, max connections also known as max pool size, and min idle connections defined as 20 and 10 correspondingly in the application.properties file
@Primary
@Bean(name = "dataSource")
public DataSource dataSource() {
var hikariConfig = buildHikariConfig(getDataSourceProperties());
return new HikariDataSource(hikariConfig);
}
datasource() Is actually instantiating the Datasource using the defined properties.
@Primary
@Bean(name = "entityManagerFactory")
public LocalContainerEntityManagerFactoryBean entityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("dataSource") DataSource dataSource) {
return builder
.dataSource(dataSource)
.packages("com.msampietro.springdownloadmultiplepools.module")
.properties(getJpaProperties())
.build();
}
entityManagerFactory(…) creates the EntityManager concerning the Main Datasource and scans entities and repositories of our project packages using the .package(…) builder method.
- ProcessingDatasourceConfig.java
public class ProcessingDatasourceConfig {...} -> THE SECONDARY DATASOURCE AND FIXED CONNECTION POOL@Bean(name = "processingDataSource")
public DataSource processingDataSource(@Qualifier("dataSourceProperties") Map<String, Object> dataSourcePropertiesHolder) {
Map<String, Object> dataSourceProperties = (Map<String, Object>) dataSourcePropertiesHolder.get("dataSourceProperties");
var config = MainDatasourceConfig.buildHikariConfig(dataSourceProperties);
config.setMaximumPoolSize(config.getMinimumIdle());
config.setPoolName("HikariPool-Processing");
config.getDataSourceProperties().remove("socketTimeout");
return new HikariDataSource(config);
}
processingDataSource(…) receives the same configurations as the Main Datasource through the “dataSourceProperties” @Bean but redefines the pool name to match “HikariPool-Processing” and the number of connections. In this case, if Main Pool’s min idle connections is equal to 10 then the secondary pool max connections are going to be equal to 10. As we are not specifying a different min idle for this pool, it’s going to default to a fixed size (max = min idle)
You can think of tons of different options and approaches for these configuration properties in accordance with your requirements. This was just the approach proposed for this example.
@Bean(name = "processingEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean processingEntityManagerFactory(EntityManagerFactoryBuilder builder,
@Qualifier("processingDataSource") DataSource processingDataSource,
@Qualifier("jpaProperties") Map<String, String> jpaProperties) {
return builder
.dataSource(processingDataSource)
.persistenceUnit("processing")
.packages("com.msampietro.springdownloadmultiplepools.module")
.properties(jpaProperties)
.build();
}
processingEntityManagerFactory(…) creates the EntityManager concerning the Secondary Datasource and scans the same entities and repositories of our project packages using the .package(…) builder method.
The EntityManager in this case is named using .persistenceUnit(“processing”). We are going to use this persistence unit name when injecting the secondary EntityManager in our services.
Clarification: As we do not want to repeat entities or repositories, every save, update, find, derived query or any method executed through Jpa Repositories is going to use the @Primary EntityManager, taking connections of the main pool. This occurs because we are defining the same .package(..) (meaning the same entities and repositories) for both pools and one is prioritized over the other. In order to make the secondary EntityManager effective we are going to explicitly specify that EntityManager in manual Typed Queries in our services. You can specify a different .package scan if you wish, pointing to repositories in a different location of the project structure, repeating entities and repositories but making use of the default Jpa Repository methods without having to build manual Typed Queries.
Finally, we are required to add the following configuration annotations in our secondary datasource config class:
@Configuration
@PropertySource({"classpath:application.properties"})
@EnableTransactionManagement
@EnableJpaRepositories(entityManagerFactoryRef = "processingEntityManagerFactory",
transactionManagerRef = "processingTransactionManager",
basePackages = "com.msampietro.springdownloadmultiplepools.module")
public class ProcessingDatasourceConfig {...}
We do not need these annotations in our main datasource config class because Spring uses default named beans for the EntityManager and the TransactionManager
If everything is properly configured we should see both pools trace logs in the console.
Download Classes
- CSVWriterWrapper.java
This class is just an abstraction of the CSV lib selected for this example (univocity parsers -> strongly recommended!) you can use whatever csv lib you want through the wrapper.
- BaseExportService.java
The BaseExportService is an abstract class that builds a Typed Query based on a list of Selections and it streams the query results directly to the OutputStream using the CSVWriterWrapper. Each implementation class (ActorExportServiceImpl and MovieExportServiceImpl) is responsible for defining the file header, the query selections and the result associations.
Let’s explain the base method where the magic occurs:
@Transactional(value = "processingTransactionManager", propagation = Propagation.NOT_SUPPORTED)
@Override
public void exportStreamToCsv(OutputStream outputStream) {
log.info("({}) - HikariPool-Processing Idle Connections before exportStreamToCsv: {}", this.getClass(),
this.getProcessingDataSourcePoolMetadata().getIdle());
var start = Instant.now();
boolean autoCommit = getCurrentSessionAutoCommitPropertyAndSetFalse();
boolean readOnly = getCurrentSessionReadOnlyPropertyAndSetTrue();
TypedQuery<Tuple> typedQuery = buildTypedQuery();
try (var csvWriter = new CSVWriterWrapper(outputStream);
Stream<Tuple> streamData = typedQuery.getResultStream()) {
log.info("({}) - HikariPool-Processing Idle Connections during exportStreamToCsv: {}", this.getClass(),
this.getProcessingDataSourcePoolMetadata().getIdle());
csvWriter.writeNext(headerNames);
streamData.forEach(d -> csvWriter.writeNext(this.toStringArray(d)));
csvWriter.flush();
}
restoreAutoCommitAndReadOnly(autoCommit, readOnly);
var end = Instant.now();
log.info("({}) - Download processed in {} seconds", this.getClass(), Duration.between(start, end).toSeconds());
}
- First of all we specifically obtain a Session using the @Transactional annotation referring the secondary TransactionManager (remember the ProcessingDatasourceConfig.java class) without propagation.
- The next thing to do is to obtain the current Session auto-commit and read-only attributes and set them to the desired values. In order to stream data successfully we need to specify a FETCH_SIZE hint that needs auto-commit to be false and read-only to be true.
- The next step is to build the typed query and stream the results (based on each implementation) directly to the OutputStream using our CSVWriterWrapper.
- Finally we restore the auto-commit and read-only to their original values as a security precaution.
Now let’s zoom in on the buildTypedQuery() method:
private TypedQuery<Tuple> buildTypedQuery() {
var builder = this.getProcessingEntityManager().getCriteriaBuilder();
CriteriaQuery<Tuple> query = builder.createTupleQuery();
Root<T> root = query.from(modelType);
query.multiselect(this.buildSelections(root, query, builder));
if (this.getSort() != null && this.getSort().isSorted())
query.orderBy(toOrders(sort, root, builder));
TypedQuery<Tuple> typedQuery = this.getProcessingEntityManager().createQuery(query);
Map<String, Object> queryHints = buildQueryHints();
queryHints.forEach(typedQuery::setHint);
return typedQuery;
}
Besides setting the implementation class selection list (this.buildSelections(…)) and order (this.getSort(…)), the method is effectively creating the Typed Query using the secondary persistence unit injected in the class (remember when we established the persistence unit name in the secondary entity manager)
@PersistenceContext(unitName = "processing")
private EntityManager processingEntityManager;
and then it calls buildQueryHints()
/**
* HINT_FETCH_SIZE ignored if autoCommit = true.
* If pool autoCommit is true then it should be disabled for method execution
**/
private Map<String, Object> buildQueryHints() {
Map<String, Object> queryHints = new HashMap<>();
queryHints.put(HINT_FETCH_SIZE, "5000");
queryHints.put(HINT_READONLY, "true");
queryHints.put(HINT_CACHEABLE, "true");
return queryHints;
}
that sets the FETCH_SIZE, READONLY and CACHEABLE required hints to achieve an efficient data stream from the db.
Finally, the controller endpoints that triggers the download:
- ActorController.java
@GetMapping(value = "/data")
public ResponseEntity<StreamingResponseBody> getData(HttpServletResponse response) {
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
response.setHeader(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=actors_export.csv");
StreamingResponseBody stream = actorExportService::exportStreamToCsv;
return ResponseEntity.ok(stream);
}
The getData(HttpServletResponse response) method uses the Spring SteamingResponseBody passing its OutputStream to our BaseExportService exportToCsv(OutputStream outputStream) method. It also sets some required response headers.
When using StreamingResponseBody, I recommended you to configure TaskExecutor used in Spring for asynchronous requests processing. Take a look to the application.properties Task Executor configuration section.
Now we can start our downloads!
Call http://localhost:8080/movies/data or http://localhost:8080/actors/data from your browser.
We can confirm in the console logs that before the query execution we had 10 idle connections in the secondary pool, and during download 9.
And that’s it!
Hope you’ve enjoyed :)
A special thanks to Salvador Ribolzi for the article revision.
Repository
References & Interesting Articles
Contact Info
Email: sampietromartin1@gmail.com