Streaming in Mule

Rohit Singh
Another Integration Blog
12 min readJun 12, 2023

This tutorial will help you in understanding the concepts of the Streaming in Mule Apps and how to use it to stream huge volumes of data with different Streaming Strategies and how we can use streaming in DataWeave.

What is Streaming in Mule Apps?

Streaming supports efficient processing of large data objects such as files, documents, and records by streaming the data through Mule rather than reading the whole thing into memory.

Streaming provides the following advantages:

  • Allows flows to consume very large messages in an efficient way
  • Message payloads are not read into memory
  • Simple routing rules based on message metadata are still possible
  • You can combine streaming and non-streaming endpoints.

Data Streams Consumption

To understand how data streams were consumed in traditional way (Mule 3), review the following points:

  • Data streams cannot be consumed more than once
  • In the following example, the flow shows the HTTP Listener source that receives a POST method with a body payload to write to the files.
  • The flow writes the first file correctly, while the second file is created with empty content because each component that consumes a stream expects to receive a new stream.
  • After the first File Write operation consumes the stream, the second File Write operation receives an empty stream.
  • Thereby, the second operation has no content to write to a file.
  • Data streams cannot be consumed at the same time
  • In the following example, the flow uses a Scatter-Gather router to split a data stream and simultaneously log and write the payload to a file. The application get some parts of the stream in the file and the rest on the log because different processor chains can not process the data stream content simultaneously.

Streaming Transformers and Filters

Many transformers and filters can read input streams, process the contents, and send them on. However, most of these do not process the stream in real time; instead, they read the stream, load it into memory, process it, and then send it on. Therefore, transformers and filters can become a bottleneck in your application if you regularly stream large files.

The following transformers and filters do support true streaming and process the data as streams without loading them into memory first:

  • XSLT Transformer
  • XmlToXMLStreamReader Transformer
  • DomToOutputHandler transformer (if the incoming XML format is a SAXSource or XMLStreamReader)
  • SXC filter

Streams vs Iterable

Most of the time people gets confused with these terms .To understand the concept of Streaming we first need to understand the difference between these two.

Database and Salesforce Connectors content are considered as iterable because we can consider record(database) and object(salesforce) as Stream .While in case of File, FTP, HTTP, Sockets we cannot divide the data with such units so for them we give them as buffer unit, each buffer act as 1 stream transmitted .

Repeatable Streams

Mule 4 introduces repeatable streams as its default framework for handling streams. Repeatable streams enable you to:

  • Read a stream more than once.
  • Have concurrent access to the stream.

As a component consumes the stream, Mule saves its content into a temporary buffer. The runtime then feeds the component from the temporary buffer, ensuring that each component receives the full stream, regardless of how much of the stream was already consumed by any prior component. This happens automatically and requires no special configuration by you, which prevents the need to find workarounds to save the stream elsewhere so you can access it again. This configuration automatically fixes the first two Mule 3 examples outlined above.

All repeatable streams support parallel access, which means that you don’t need to worry about whether two components are trying to read the same stream when each component is running on a different thread. Mule automatically ensures that when component A reads the stream it doesn’t generate any side effects in component B.

Streaming Strategies

There are three types of streaming strategies available in MuleSoft:

  • Repeatable file store stream (by default, this strategy is selected).
  • Non repeatable stream.
  • Repeatable in-memory stream.

File-Stored Repeatable Stream

File storage is the default streaming strategy in Mule 4.

This strategy initially uses an in-memory buffer size of 512 KB. For larger streams, the strategy creates a temporary file to the disk to store the contents, without overflowing your memory.

If you need to handle large or small files, you can change the buffer size (inMemorySize) to optimize performance:

  • Configuring a larger buffer size increases performance by avoiding the number of times the runtime needs to write the buffer to your disk, but it also limits the number of concurrent requests your application can process.
  • Configuring a smaller buffer size saves memory load.

You can also set the buffer’s unit of measurement (bufferUnit).

In-Memory Repeatable Stream

The in-memory strategy is the default configuration for the Mule Kernel (formerly called Mule Runtime Community Edition).

This strategy defaults to a buffer size of 512 KB. For larger streams, the buffer is expanded by a default increment size of 512 KB until it reaches the configured maximum buffer size. If the stream exceeds this limit, the application fails.

You can customize this behavior by setting the initial size of the buffer (initialBufferSize), the rate at which the buffer increases (bufferSizeIncrement), the maximum buffer size (maxinMemorySize), and the unit of measurement for the buffer size value (bufferUnit).

Every component in Mule 4 that returns an InputStream or a Streamable collection supports repeatable streams. These components include:

  • File connector
  • FTP connector
  • Database connector
  • HTTP connector
  • Sockets connector
  • Salesforce connector

Repeatable File Store (Iterable)

This configuration is the default for Mule Enterprise Edition. This strategy uses a default configured in-memory buffer of 500 objects. If your query returns more results than the buffer size, Mule serializes those objects and writes them to your disk. You can configure the number of objects Mule stores in the in-memory buffer. The more objects you save in-memory, the better performance you get from avoiding writes to disk.

For example, you can set a buffer size of 100 objects in-memory for a query from the Salesforce Connector:

Repeatable File Store (Iterable):

<sfdc:query query=”dsql:…”>
<ee:repeatable-file-store-iterable inMemoryObjects=”100"/>
</sfdc:query>

Repeatable In-Memory (Iterable)

This configuration, which is the default for the Mule Kernel, configures a default buffer size of 500 Objects. If the query result is larger than that, the buffer expands to a default increment size of 100 objects until it reaches the configured maximum buffer size. If the stream exceeds this limit, the app fails. You can customize the initial size of the buffer (initialBufferSize), the rate at which the buffer increases (bufferSizeIncrement), and the maximum buffer size (maxBufferSize).

For example, this configuration sets an in-memory buffer of 100 objects that increments at 100 objects per increment and allows a maximum buffer size of 500 objects.

Repeatable In-Memory (Iterable):

<sfdc:query query=”dsql:…”>
<repeatable-in-memory-iterable initialBufferSize=”100" bufferSizeIncrement=”100" maxBufferSize=”500" />
</sfdc:query>

Disabling Repeatable Streaming

You can disable repeatable streaming through the non-repeatable-stream and non-repeatable-iterable strategies. The strategy to use depends on the type of stream.

Use this option only if you are certain that there is no need to consume the stream several times and only if you need a very tight optimization for performance and resource consumption.

Streaming in DataWeave

DataWeave supports end-to-end streaming through a flow in a Mule application. Streaming speeds the processing of large documents without overloading memory.

  • DataWeave processes streamed data as its bytes arrive instead of scanning the entire document to index it.
  • When in deferred mode, DataWeave can also pass streamed output data directly to a message processor without saving it to the disk.
  • This behavior enables DataWeave and Mule to process data faster and consume fewer resources than the default processes for reading and writing data.

To stream successfully, it is important to understand the following:

  • The basic unit of the stream is specific to the data format.
  • The unit is a record in a CSV document, an element of an array in a JSON document, or a collection in an XML document.

Streaming accesses each unit of the stream sequentially. Streaming does not support random access to a document.

Enabling Streaming

Streaming is not enabled by default. You can use two configuration properties to stream data in a supported data format:

Streaming property, for reading source data as a stream deferred writer property, for passing an output stream directly to the next message processor in a flow.

To enable streaming when reading source data, you must set the streaming reader property to true on the data source. You can enable streaming on a data source through the MIME Type setting outputMimeType. You can set the property in a Mule component that accepts source data, such as an HTTP listener or an On New or Updated File operation.

To pass the streamed output to the next message processor, you can use the output directive in a DataWeave script, for example: output application/json deferred=true

Streaming CSV

CSV is simplest format for streaming because of its structure. Each row below the CSV header is a streamable record. The following CSV example consists of records that contain name, lastName, and age values:

name,lastName,age mariano,achaval,37 leandro,shokida,30

To stream this CSV example, the following script selects values from each record. It uses the map function to iterate over each record in the document.

payload map (record) ->
{
FullName: record.lastName ++ “,” ++ record.name, Age: record.age
}

Although streaming does not support random access to the entire document, a DataWeave script can access data randomly within each record because each record is loaded into memory. For example, the expression record.lastName “,” record.name, can access a lastName value before it accesses a name value even though the order of values is reversed in the input.

However, streaming does not work in the following script. The script requires random access to the entire document to return the elements in a different order than they are given in the input.

[payload[-2], payload[-1], payload[3]]

Streaming JSON

The unit of a JSON stream is each element in an array. Configuration :

Note that DataWeave 2.2.0 support for JSON streaming for Mule 4.2 was limited by the requirement that the root be an array. DataWeave support in Mule 4.3 includes streaming to arrays that are not at the root of the input.

{
“name” : “Mariano”, “lastName”: “Achaval”, “family”: [
{“name”: “Sara”, “age”: 2},
{“name”: “Pedro”, “age”: 4},
{“name”: “Matias”, “age”: 8}
],
“age”: 37
}

In this example, DataWeave can stream payload.family and perform random access within each element of that array. However,

DataWeave cannot randomly access the container object. For example, it is not possible to stream { a: payload.age , b: payload.family} because age follows family, and DataWeave cannot go backwards.

Streaming XML

XML is more complicated than JSON because there are no arrays in the document.

To enable XML streaming, DataWeave provides the following reader property to define the location in the document to stream:

collectionPath

For example, assume the following XML input:

<order>
<header>
<date>Wed Nov 15 13:45:28 EST 2006</date>
<customer number=”123123">Joe</customer>
</header>
<order-items>
<order-item id=”31">
<product>111</product>
<quantity>2</quantity>
<price>8.90</price>
</order-item>
<order-item id=”31">
<product>222</product>
<quantity>7</quantity>
<price>5.20</price>
</order-item>
</order-items>
</order>

Given this XML source data, you can set the unit of the stream to <order-item/> by setting collectionPath=order.order-items in the outputMimeType value:

<flow name=”dw-streaming-example” >

<http:listener doc:name=”Listener”

outputMimeType=”application/xml; collectionpath=order.order-items; streaming=true” config-ref=”HTTP_Listener_config” path=”/input”/>

</flow>

Note that you need to set both streaming=true and the collectionPath value. If either is missing, DataWeave will not stream the content. The following DataWeave script streams the XML input using each <order-items/> element as the streamable unit.

%dw 2.0

output application/xml

— -

{

salesorder: {

itemList: payload.order.”order-items”.*”order-item” map { (“i_” ++ $$) : {

id: $.@id,

productId: $.product,

quantity: $.quantity, price: $.price

}

}

}

}

Validate a Script for Streamed Data (Experimental Feature)

To check that your code can process an input stream successfully, DataWeave provides the following advanced, experimental annotation and a related directive:

@StreamCapable()

Use this annotation to validate whether the script can sequentially access a variable (typically the payload variable).

input directive:

The @StreamCapable() annotation requires the use of an input directive in the DataWeave script that identifies the MIME type of the data source, for example, input payload application/xml.

The DataWeave validator (which is triggered by the @StreamCapable annotation in the script) checks a script against the following criteria:

  • The variable is referenced only once.
  • No index selector is set for negative access, such as [-1].
  • No reference to the variable is found in a nested lambda.

If all criteria are met, the selected data is streamable.

The following example validates successfully. The script is designed to act on the JSON input from the JSON streaming section:

%dw 2.0 @StreamCapable()

input payload application/json output application/json

— -

payload.family filter (member) -> member.age > 3

The script successfully validates and returns the following output:

[

{

“name”: “Pedro”, “age”: 4

},

{

“name”: “Matias”, “age”: 8

}

]

Validation Failures

If any of the criteria that the validator checks is false, the validation fails.

Before proceeding, note that validation can fail in some cases when streaming works. If you write a script in a way that sequentially accesses the input variable in a given data source, streaming works, but that script might not work in all cases. For example, JSON does not place a restriction on the order of the keys in an object. If the keys in some JSON documents arrive in a different order than the script expects, streaming will fail in those cases. The annotation processor follows the rules of the format and cannot assume that the keys always arrive in the same order.

Error: Variable Is Referenced More Than Once

Validation fails if a script attempts to reference the same variable more than once.

The following script is designed to act on the JSON input from the JSON streaming section. Validation fails because the script attempts to reference the payload variable more than once:

Error: Wrong Scope Reference

Validation fails if a script attempts to reference a variable from a scope that is different from the scope in which the variable is defined.

The following script fails because the payload variable is referenced from within the lambda expression [1,2,3] map ((item, index) → payload).

Even if the expression is [1] map ((item, index) → payload, streaming fails because payload is in the wrong scope.

Negative index selector is not allowed

The above script fails because the payload variable is referenced with negative index such as [-1].

Streaming Output

After processing streamed data, you can stream the output directly to another message processor. To facilitate this behavior, use the deferred writer property in the output directive of the DataWeave script, for example, output application/json deferred=true.

NOTE

Exceptions are not handled when you set deferred = true. For example, you can see this behavior in Studio when a flow throws an exception. If you are running an application in Studio debug mode and an exception occurs in a Transform Message component when deferred = true, the console logs the error, but the flow does not stop at the Trasnform Message component.

Building on the example in JSON Streaming, the following flow uses a DataWeave script in transform connector to convert json data to xml and write that into the file .

When we don’t give root tags in transform message then it will give the error on the console but keep the flow working and it writes empty content in the file and return 200 status code to the user.

--

--