Streaming Strategies in Mule 4

Manisha Patil
Another Integration Blog
7 min readJul 24, 2024

Have you ever wondered how to process large data files in Mule 4? Are you the one who came across out of memory error, null pointer exceptions or heap space allocation related issues? Then it’s time to concentrate on data streaming. In this blog we will explore what streams are, their types and how to utilize them to help applications to work efficiently. So, Let’s Start!!

What is Stream:

In simple language, Stream is nothing but a continuous flow.

Let’s understand this with an example suppose you have multiple records in database once you queried the database you got result set, if this result set is large in size, it gets loaded in memory at a time and you might come across out of memory error as your heap space is not sufficient to accommodate so many records, this is when streaming can come in picture!! In streaming we have a cursor as nothing but a pointer to any record in a database. In streams, the cursor gets wrapped into an object which could be a stream or iterator this again wrapped into another stream so like this chaining of streams can happen. In Streaming, data comes to you as and when needed (bit by bit) rather than loading whole into memory at a time.

Streaming Strategies:

There are 2 types of Streaming Strategies.

In Non-Repeatable Streaming strategy payload is read only once. Stream is not available to the next processor. It is memory efficient because payload did not get stored in memory for long just read the data and get rid of it. Use this when you want to use data only once.

In Repeatable Streaming strategy payload gets read multiple times. Depending on the configuration it will be either stored in Memory (when we have less file size) or in file on disk storage (for huge files). We can configure buffer size, incremental buffer size and max buffer size as per the requirements.

Repeatable File store stream is the default streaming strategy in Mule 4.

Modules which support Streaming:

Streaming Supported Modules List

All above are the connectors which support streaming. To configure Streaming go to the advanced section of each connector and you will find an option of streaming strategy to select from drop down.

Note: HTTP Connectors and on new or updated file connectors return Stream Objects whereas other connectors return iterable objects.

Demo:

Let’s understand more with demo scenarios:

Our use case involves querying a database so, first we will create a simple table in the database to query it. I am using MySQL Workbench where you can opt for anyone which you prefer; only the connection part in Mule Code will vary based on the database selected.

Database Queries
  1. Non-Repeatable Stream/Iterable:
Non-Repeatable stream Configured

As configured above, first we will query the database which will give us the result in Object format, then convert it into Json and print the result. Again, second time doing the same activity of printing payload just to show that non repeatable streams consumed once cannot be read again.

INFO  2024-07-19 16:54:46,671 [[MuleRuntime].uber.02: [streamingdemo].uber@org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource.lambda$reallyDoStart$17:462 @2ba36487] [processor: ; event: ] org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource: Message source 'listener' on flow 'streamingdemoFlow' successfully started
INFO 2024-07-19 16:55:53,088 [[MuleRuntime].uber.02: [streamingdemo].streamingdemoFlow.CPU_LITE @17888665] [processor: streamingdemoFlow/processors/0; event: a88f8f41-45c1-11ef-883c-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Request Received
INFO 2024-07-19 16:55:53,256 [[MuleRuntime].uber.08: [streamingdemo].streamingdemoFlow.BLOCKING @71b00165] [processor: streamingdemoFlow/processors/2; event: a88f8f41-45c1-11ef-883c-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: <<Non repeatable iterator>>
INFO 2024-07-19 16:55:53,359 [[MuleRuntime].uber.08: [streamingdemo].streamingdemoFlow.BLOCKING @71b00165] [processor: streamingdemoFlow/processors/3; event: a88f8f41-45c1-11ef-883c-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"empname": "Manisha",
"empid": 2
},
{
"empname": "Ramesh",
"empid": 1
},
{
"empname": "Divya",
"empid": 3
},
{
"empname": "priya",
"empid": 4
},
{
"empname": "Swapy",
"empid": 5
}
]
INFO 2024-07-19 16:55:53,359 [[MuleRuntime].uber.08: [streamingdemo].streamingdemoFlow.BLOCKING @71b00165] [processor: streamingdemoFlow/processors/4; event: a88f8f41-45c1-11ef-883c-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: <<Non repeatable iterator>>
INFO 2024-07-19 16:55:53,363 [[MuleRuntime].uber.08: [streamingdemo].streamingdemoFlow.BLOCKING @71b00165] [processor: streamingdemoFlow/processors/5; event: a88f8f41-45c1-11ef-883c-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [

]

Console Logs showing Payload did not print a second time as it was already consumed.

Postman Response

So, the learning is Non-Repeatable streams once read cannot be read second time.

2. Repeatable In-memory Stream/Iterable:

Repeatable In-memory Stream/Iterable configured

Now streaming strategy changed to Repeatable in-memory iterable. In this strategy we have to configure below fields.

Initial Buffer size: The Initial Size

Buffer Size Increment: The rate at which the buffer increases. We can increase buffer size once initial buffer variable filled up with configured instances.

Max in memory instances: The maximum buffer size. Once this value exceeds it will start throwing error.

 Started app 'streamingdemo'                                        *
* Application plugins: *
* - Database : 1.14.6 *
* - Sockets : 1.2.3 *
* - HTTP : 1.8.0 *
* Application libraries: *
* - mysql-connector-java-8.0.30.jar *
* - protobuf-java-3.19.4.jar *
**********************************************************************
INFO 2024-07-19 18:53:42,186 [[MuleRuntime].uber.02: [streamingdemo].uber@org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource.lambda$reallyDoStart$17:462 @7a03f925] [processor: ; event: ] org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource: Message source 'listener' on flow 'streamingdemoFlow' successfully started
INFO 2024-07-19 18:53:47,853 [[MuleRuntime].uber.02: [streamingdemo].streamingdemoFlow.CPU_LITE @f68a721] [processor: streamingdemoFlow/processors/0; event: 21741881-45d2-11ef-8081-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Request Received
INFO 2024-07-19 18:53:47,966 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @34a9297b] [processor: streamingdemoFlow/processors/2; event: 21741881-45d2-11ef-8081-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [{empname=Manisha, empid=2}, {empname=Ramesh, empid=1}, {empname=Divya, empid=3}, {empname=priya, empid=4}, {empname=Swapy, empid=5}]
INFO 2024-07-19 18:53:47,980 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @34a9297b] [processor: streamingdemoFlow/processors/3; event: 21741881-45d2-11ef-8081-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"empname": "Manisha",
"empid": 2
},
{
"empname": "Ramesh",
"empid": 1
},
{
"empname": "Divya",
"empid": 3
},
{
"empname": "priya",
"empid": 4
},
{
"empname": "Swapy",
"empid": 5
}
]
INFO 2024-07-19 18:53:47,985 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @34a9297b] [processor: streamingdemoFlow/processors/4; event: 21741881-45d2-11ef-8081-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [{empname=Manisha, empid=2}, {empname=Ramesh, empid=1}, {empname=Divya, empid=3}, {empname=priya, empid=4}, {empname=Swapy, empid=5}]
INFO 2024-07-19 18:53:47,985 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @34a9297b] [processor: streamingdemoFlow/processors/5; event: 21741881-45d2-11ef-8081-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"empname": "Manisha",
"empid": 2
},
{
"empname": "Ramesh",
"empid": 1
},
{
"empname": "Divya",
"empid": 3
},
{
"empname": "priya",
"empid": 4
},
{
"empname": "Swapy",
"empid": 5
}
]

Logs showing Payload can be read multiple times when configured to Repeatable In-memory stream/Iterable. whole data is loaded inside heap memory.

Postman Response

3. Repeatable file store stream/Iterable:

Let’s see Repeatable File store stream. In this data will get persist to file store once exceeded from the configured one.

Repeatable file store stream/Iterable

Now we have only 1 configuration that is In memory objects, so only 500 objects will be allowed in in-memory, if it exceeds then it will persist to file store inside mule runtime.

* Started app 'streamingdemo'                                        *
* Application plugins: *
* - Database : 1.14.6 *
* - Sockets : 1.2.3 *
* - HTTP : 1.8.0 *
* Application libraries: *
* - mysql-connector-java-8.0.30.jar *
* - protobuf-java-3.19.4.jar *
**********************************************************************
INFO 2024-07-22 14:56:11,476 [[MuleRuntime].uber.09: [streamingdemo].uber@org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource.lambda$reallyDoStart$17:462 @75ee9cc1] [processor: ; event: ] org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource: Message source 'listener' on flow 'streamingdemoFlow' successfully started
INFO 2024-07-22 14:59:19,568 [[MuleRuntime].uber.09: [streamingdemo].streamingdemoFlow.CPU_LITE @91710e8] [processor: streamingdemoFlow/processors/0; event: df5805f1-480c-11ef-8205-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: Request Received
INFO 2024-07-22 14:59:19,621 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @4fe84322] [processor: streamingdemoFlow/processors/2; event: df5805f1-480c-11ef-8205-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [{empname=Manisha, empid=2}, {empname=Ramesh, empid=1}, {empname=Divya, empid=3}, {empname=priya, empid=4}, {empname=Swapy, empid=5}]
INFO 2024-07-22 14:59:19,626 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @4fe84322] [processor: streamingdemoFlow/processors/3; event: df5805f1-480c-11ef-8205-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"empname": "Manisha",
"empid": 2
},
{
"empname": "Ramesh",
"empid": 1
},
{
"empname": "Divya",
"empid": 3
},
{
"empname": "priya",
"empid": 4
},
{
"empname": "Swapy",
"empid": 5
}
]
INFO 2024-07-22 14:59:19,627 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @4fe84322] [processor: streamingdemoFlow/processors/4; event: df5805f1-480c-11ef-8205-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [{empname=Manisha, empid=2}, {empname=Ramesh, empid=1}, {empname=Divya, empid=3}, {empname=priya, empid=4}, {empname=Swapy, empid=5}]
INFO 2024-07-22 14:59:19,630 [[MuleRuntime].uber.03: [streamingdemo].streamingdemoFlow.BLOCKING @4fe84322] [processor: streamingdemoFlow/processors/5; event: df5805f1-480c-11ef-8205-44e517d72478] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
{
"empname": "Manisha",
"empid": 2
},
{
"empname": "Ramesh",
"empid": 1
},
{
"empname": "Divya",
"empid": 3
},
{
"empname": "priya",
"empid": 4
},
{
"empname": "Swapy",
"empid": 5
}
]

Logs showing payload gets read multiple times and hence get printed twice.

General considerations:

Some of the considerations we have to follow while dealing with huge size files to prevent from getting out of memory errors.

  1. Do not use sizeOf(payload) function to check whether a file contains data or empty, instead use isEmpty(payload) function.
  2. Do not use transform message connector directly for huge files instead first try to cut data into chunks using for each or batch job scopes.

That’s all from my side. If you want to play with the project which I have used here feel free to clone it from the below GIT repo. You can try with other connectors like FTP/SFTP to get more ideas on the topic.

Happy Learning!!

--

--

Manisha Patil
Another Integration Blog

MuleSoft Mentor|| 4xMuleSoftCertified Developer | MCIA, MCD L1,L2, MCI associate Developer || Integration Developer.