Streaming in Mule

Rukhsar Praveen
Another Integration Blog
9 min readDec 6, 2022

Process millions of records in a few seconds or minutes.

Objectives:

  • Streaming Overview
  • Advantages of streaming
  • Limitations
  • Use cases
  • Streaming with sample workflows
  • Testing metrics and execution report: Mule 4 testing metrics with 1 million and 10 million records and Mule 3 testing metrics with 1M, 10M, and 20M records.

1. Streaming Overview

Streaming is the process where we refer the data as its bytes arrives to the flow rather than scanning and loading the entire document to index it. Streaming speeds the processing of large documents without overloading the data into memory. It helps to process the large set of data with low resources consumption in an efficient manner. Let’s try to understand how Mule supports end-to-end streaming and can execute millions of records in few seconds or minutes.

Streaming In Mulesoft:

i. Streaming enablement: We can enable streaming in Mule through two modes:

  • Through internal configuration properties of streaming inside connectors configurations. Example: opting streaming strategy properties
  • Through DataWeave configuration properties we can achieve streaming output. Example: deferred=true or streaming=true

ii. Types of streaming: We have an option to set streaming strategy configuration inside connector which has three types of streaming:

a) Repeatable file store stream (default): Mule 4 introduces repeatable streams as its default framework handling streams. It enables us to read stream data more than once and have concurrent access to the stream.b) Non-repeatable stream:
· Stream can only be read once.
· Non- repeatable streaming improves performance.
· Need a very tight optimization for performance and resource consumption.
c) Repeatable in memory stream: This strategy creates a temporary file to the disk to store the contents without allocated buffer size.It is useful to use for small size of data.

iii. Data Format that supports streaming.

CSV, JSON,XML

iv. Connectors support streaming:

File, FTP, SFTP, Database, HTTP etc.

2. Advantages of Streaming:

  • Streaming technique offers a huge advantage over fully loading the data into memory, since it prevents the Java heap from being exhausted by big input files. With less allocated heap size we can execute large data sets.
  • Batch execution and other execution scopes for processing large dataset can be removed which can reduce code complexity.
  • Based on our previous PSR result we have observed that the throughput has increased 30x times (3000 Rec/sec Vs 90000 Rec/sec)
  • Execution time difference from previous test result to streaming goes down from 1.5Hr to 3.35 Min for 20M records.
  • Increase of max streaming memory to process huge data load in wrapper config file.

3. Limitations:

  • In non-repeatable streaming Streamed data can be referenced only once. No reference to the streamed payload is found in a nested lambda. Access of output Streamed data fails if a script attempts to reference the same payload more than once with pipe closed error.
  • Exception handling to capture faulty data from streamed payload is not achievable.
  • If stream exceeds this limit, the application fails. This strategy defaults to a buffer size of 512 KB. For larger streams, the buffer is expanded by a default increment size of 512 KB until it reaches the configured maximum buffer size.
  • Grouping logic is not supported as per observation.
  • Lookup function will not work with deferred = true

4. Use Cases:

· When less or no data validation is required.
· When business logic is not much complex.
· When data load is huge to process.

5. Streaming with sample workflow:

a) Mule 4 Observations:

Full Streaming Flow:

Mule runtime: 4.4.0
Full Streaming and Partial Streaming.

Create a sample basic flow and try to read an input csv file through streaming and convert it into json format with streaming and write the content into json file via local file system.

Partial Streaming Flow:

Create a sample basic flow and try to read an input csv file with non-repeatable streaming method and convert the data into json format with streaming and write it into a local file system in chunks of 1000 size. We can observer here that before writing the payload into file we are using foreach to divide the data into chunk which will load the data into memory thus describes partial streaming strategy.

Create a flow and try to read an input csv file with streaming and convert it into json format with basic transformation in streaming but before writing the payload into file we are using foreach to divide the data into chunk which will load the data into memory while using for-each loop.

Code:

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd">
<file:config name="File_Config" doc:name="File Config" doc:id="de7a47ca-8d15-4ddd-897d-6814acb68101" >
<file:connection workingDir="C:\Downloads\a_sample_test_streaming\input\" />
</file:config>
<file:config name="File_Config1" doc:name="File Config" doc:id="93ad232f-99b5-4654-ba49-1c9708ad695f" >
<file:connection workingDir="C:\Downloads\a_sample_test_streaming\output\" />
</file:config>
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="608b8176-7c12-4098-a334-05e89504b46f" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="sample_app_streaming_full_streaming_Flow" doc:id="a88d073e-b3b8-45f2-82d5-955772212cab" >
<http:listener doc:name="Listener" doc:id="cf3b94dd-9e24-4a48-a0d8-062500b2d287" config-ref="HTTP_Listener_config" path="/test_streaming"/>
<logger level="INFO" doc:name="Logger" doc:id="75cd6022-6982-4038-871b-abdbe5a1768d" message='#["start of the flow"]'/>
<file:read doc:name="Read" doc:id="65f4313d-0763-44c4-be72-025c73fe62f9" config-ref="File_Config" path="input.csv">
<non-repeatable-stream />
</file:read>
<ee:transform doc:name="Transform Message" doc:id="2609318a-e0c4-4a77-ae05-583c23a0c329">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
@StreamCapable()
input payload application/csv
output application/json deferred=true
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<file:write doc:name="Write" doc:id="4ffd10f8-5e1d-4f9e-8b56-a0154f22b84a" config-ref="File_Config1" path='#["output" ++ uuid() ++ ".json"]' mode="CREATE_NEW"/>
<logger level="INFO" doc:name="Logger" doc:id="8ca51dcd-e669-41d4-961d-1f27c6754f6c" message='#["end of the flow"]'/>
</flow>
<flow name="sample_app_streaming_partial_streaming_flow" doc:id="ee465e85-3ffd-479f-a5b7-cf2200e3fe25" >
<http:listener doc:name="Listener" doc:id="05c809b1-86ac-4e65-aa1d-493636941311" config-ref="HTTP_Listener_config" path="/test_streaming/for-each"/>
<logger level="INFO" doc:name="Logger" doc:id="75cd6022-6982-4038-871b-abdbe5a1768d" message='#["start of the flow"]'/>
<file:read doc:name="Read" doc:id="65f4313d-0763-44c4-be72-025c73fe62f9" config-ref="File_Config" path="input.csv">
<non-repeatable-stream />
</file:read>
<foreach doc:name="For Each" doc:id="6f787bad-a0aa-45bf-8efa-066514a4f8e8" collection="payload" batchSize="1000">
<ee:transform doc:name="Transform Message" doc:id="2609318a-e0c4-4a77-ae05-583c23a0c329">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
@StreamCapable()
input payload application/csv
output application/json deferred=true
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<file:write doc:name="Write" doc:id="4ffd10f8-5e1d-4f9e-8b56-a0154f22b84a" config-ref="File_Config1" path='#["output" ++ uuid() ++ ".json"]' mode="CREATE_NEW" />
</foreach>
<logger level="INFO" doc:name="Logger" doc:id="8ca51dcd-e669-41d4-961d-1f27c6754f6c" message='#["end of the flow"]'/>

</flow>
</mule>

Testing Metrics and statics: -

Execution Table : : Testing Metrics Report

1. Partial Streaming :

Records size: 10 Million
Streaming: partial
Heap size: 4GB
Process timing: 18minutes
CPU Usage: Above 25%
Please refer the statics below :-

Statics:

2. Full Streaming :

Records size: 10 Million
Heap Size: 1GB
Streaming: Yes
Process timing: 3minutes
CPU Usage: below 10%
Please refer the statics below :-

Statics:-

b) Mule 3 Observations: -

Repetable Streaming Flow: -

Mule runtime: 4.3.0
streaming with XML and JSON
with streaming and without streaming

Create a prototype application to understand the e2e streaming with basic sample flow. Try to read an input csv file through on new or update file connector in repeatable streaming mode as we want to refer the file in concurrent method with scatter gather.Use a set payload connector for setting a mimetype with streaming properties to achieve octet stream or byte streaming with proper specification of mimetype for an ingested input file and add  it as an additional property. After that process the input file concurrently with scatter gather to get two formats of data one is for XML format and other is for JSON format and then write it into file via local file system.
  • Property configuration of On New or Update connector to read input file in repeatable stream mode.
  • Property configuration of set Payload connector.
  • Process XML dwl file.
  • Process JSON dwl file.
Output:-
Execution time of JSON: 0:00:54 for 1M Item records
Execution time of XML: 0:01:09 for 1M Item records

1. Streaming with Grouping Logic:

Observation:

We have tested few entities in Adapter which has grouping logic like Item, Actual vehicle load outbound, Production method etc. where we have processed the data with grouping logic like grouping the data based on few columns ex: material number or Item number etc. which will be applicable on whole ingested payload. In this case the Input payload will get loaded into memory at the time of dwl execution due to group by function and then further transformation will get processed which can results into back pressure in case of huge payload that cause restart of application abruptly or may cause more resource utilization.Below is the Testing report: -

2. Streaming Without Grouping Logic:

Observation:

We have also tested entities which does not have grouping logic. So, we have tested one of the entities for millions of records on Adapter and captured few test results below:

Streaming Role for Faulty records or Error records :

In case of any error, Message event is not getting redirected to error handler.

MuleSoft Product team Response: -

When we use deferred=true, it will pass the stream to the next component without directly throwing the error on the Transform Message component. This is the downside of using deferred. This is expected behavior

In case of any error, the first faulty record and subsequent records get dropped.

MuleSoft Product team Response: -

All the successful records are getting processed and error records are getting skipped. Expected behavior from MuleSoft.

Handle on error records such that it can be pushed to an API.

MuleSoft Product team Response: -

If we tried to pass failed records information to another flow using look up function which cannot be used with deferred mode true. Expected behavior from MuleSoft

Handle on exception for error records so that exception information can be logged and stored as per business use case.

MuleSoft Product team Response: -

Able to successfully log the erroneous records to the console with a customer error message from DataWeave code itself.

Important links:

Author : Rukhsar Praveen

Dzone:

LinkedIn:

https://www.linkedin.com/in/rukhsar-praveen-9b987310a/

Github:

--

--

Rukhsar Praveen
Another Integration Blog

Mulesoft Mentor || Mulesoft Leader || Senior Consultant at Deloitte