Streams

iamprovidence
13 min readDec 27, 2023

--

Imagine a typical task in which you have an XML file that needs to be converted and stored separately as JSON.

<?xml version="1.0" encoding="utf-8" ?>
<people>
<person name="John" surname="Doe" age="18"/>
<person name="Jane" surname="Doe" age="19"/>
</people>

The naive implementation could have the following look:

  • load an XML file content
  • deserialize the content to C# objects
  • serialize the data to JSON
  • store it in a new file
// load an XML file
var peopleXml = await File.ReadAllTextAsync("people.xml");

// create objects
var xmlDoc = new XmlDocument();
xmlDoc.LoadXml(peopleXml);

var people = xmlDoc
.DocumentElement
.SelectNodes("person")
.Cast<XmlNode>()
.Select(node => new
{
Name = node.Attributes["name"].Value,
Surname = node.Attributes["surname"].Value,
Age = int.Parse(node.Attributes["age"].Value),
})
.ToList();

// serialize to JSON
var peopleJson = JsonSerializer.Serialize(people);

// create a new file
File.WriteAllText("people.json", peopleJson);

Even though it is working, this approach has its flaws. While in our training example, XML file is small, in practice usually you will have to deal with 10GB monsters. The code above will load all 10GB into memory, create new objects that add at least 10GB, and then serialize it back to JSON string which is again 10GB. So you end up with an application that allocates 30GB of memory for no reason!

That is where Streams came into play. They allow to transfer and processing of data in small chunks, resulting not only in smaller memory allocation but also in decreased latency.

If you are interested in how to efficiently work with large data, get use of byte-by-byte processing, and avoid unnecessary allocation stay with me. We will demystify stream architecture, get practical experience in working with streams, decorators, adapters, and rewrite our pathetic piece of code from the beginning. By the end of the article, you will know how to avoid popular stream mistakes and get a comprehensive knowledge of working with them.

So, set aside your morning coffee, we are embarking on a journey to the world of streams in C#⛵️

Stream architecture

Before starting working with streams you should understand its architecture. There are 3 main concepts to learn:

  • a backing store
  • decorators
  • adapters

A backing store

A backing store is a storage medium. It can either be a source from which data are read sequentially, or a destination where data are written sequentially, or both at the same time.

Streams provide a low-level API, so the data is processed exclusively in bytes. Here are a few examples of a backing store: a file, network, memory, console, blob storage etc.

To start processing data in a backing store you have to open it. That is what streams are for.

There is an abstract Stream class that provides all the needed operations:

abstract class Stream
{
// Read:
public abstract bool CanRead { get; }
public abstract int Read (byte[] buffer, int offset, int count)
public virtual int ReadByte ();

// Write:
public abstract bool CanWrite { get; }
public abstract void Write (byte[] buffer, int offset, int count);
public virtual void WriteByte (byte value);
public void CopyTo(Stream destination);

// Search:
public abstract bool CanSeek { get; }
public abstract long Position { get; set; }
public abstract void SetLength (long value);
public abstract long Length { get; }
public abstract long Seek (long offset, SeekOrigin origin);

// Close:
public virtual void Close ();
public void Dispose ();
public abstract void Flush ();
}

For each backing store, there is its own stream:

Those are the most popular ones: FileStream, ConsoleStream, MemoryStream, NetworkStream, etc.

Here is an example of how to create a FileStream and start reading data from it:

await using var fs = new FileStream("people.xml", FileMode.Open);

int firstByte = fs.ReadByte();
Console.WriteLine(fs.Position); // 1

We got our first byte from a backing store. Not very exciting, isn’t it 🙃 Don’t worry. It will get better soon. I promise 😁

Decorators

The next important concepts are decorators. Yes, the very same decorator pattern from GoF.

Decorators dynamically extend the behavior of existing streams without changing their implementation.

They also inherit from an abstract Stream class, but don’t access any backing store, just wrap up an existing stream.

Decorators free streams from the need to implement things like compression and encryption on their own.

They can be attached at runtime and also combined in chains (i.e., using multiple decorators on a single stream).

Those are the most important decorators: BufferedStream, DeflateStream, GZipStream, CryptoStream, AuthenticatedStreametc.

Consider the case: you have a large file. It is not an option to read an entire file into memory. So we decided to use a stream. Great! However, when you read each byte individually it can also lead to additional overhead costs. Moreover, a single byte is not very useful. So a solution would be to read at least 100 bytes on every access to a backing store. This can be achieved with BufferedStream decorator:

await using var fs = new FileStream("people.xml", FileMode.Open);
await using var bs = new BufferedStream(fs, bufferSize: 100);

int firstByte = bs.ReadByte();
Console.WriteLine(bs.Position); // 1
Console.WriteLine(fs.Position); // 100

int secondByte = bs.ReadByte();
Console.WriteLine(bs.Position); // 2
Console.WriteLine(fs.Position); // 100

In the example above FileStream is wrapped up with a decorator BufferedStream that has a buffer size of 100 bytes. When calling bs.ReadByte() the underlying FileStream won’t read 1 byte but 100 that will be stored in the buffer. On every next 99 calls to bs.ReadByte() the data will be read from a buffer and not from a file.

In practice, there is no need to use BufferedStream with FileStream since FileStream already supports bufferization.

Adapters

Once again, good old adapter pattern from GoF😌.

Streams work with bytes. Which is great for most backing storage. Bytes are agile, but also a low-level concept not many want to work with.

Therefore you have stream adapters. When your backing storage holds text data, you can use adapters. They allow you to convert byte streams into a preferred and more easily manageable format, like Strings, XML, JSON etc.

Stream adapters are not streams themselves. They inherit from a separate set of abstract classes: TextReader and TextWriter.

TextReader, as you may guess, provides basic functionality for reading:

abstract class TextReader
{
// Read:
public virtual string ReadLine();
public virtual string ReadToEnd();

// Close:
public virtual void Close ();
public void Dispose ();
}

While TextWriter allows you to write text into a stream:

abstract class TextWriter
{
// Write:
public virtual void Write(string value);
public virtual void WriteLine(string value);

// Close:
public virtual void Close ();
public void Dispose ();
public virtual void Flush ();
}

Those are the most important adapters: StreamReader/StreamWriter, XmlReader/XmlWriter, JsonTextReader/JsonTextWriter, etc.

Just for comparison. Here is an example of writing data:

  • into a raw stream:
await using FileStream fileStream = File.OpenWrite("text.txt");

byte[] data = Encoding.UTF8.GetBytes("Hello World");
fileStream.Write(data, offset: 0, count: data.Length);
  • into a stream adapter:
await using FileStream fileStream = File.OpenWrite("text.txt");
await using StreamWriter writer = new StreamWriter(fileStream);

writer.WriteLine("Hello World");

Now let’s see it for read operation:

  • read text from a raw stream:
await using FileStream fileStream = File.OpenRead("text.txt");

byte[] byteArray = new byte[fileStream.Length];
fileStream.Read(byteArray, offset: 0, count: (int)fileStream.Length);

string fileContent = Encoding.UTF8.GetString(byteArray);
  • read text from a stream adapter:
await using FileStream fileStream = File.OpenRead("text.txt");
using StreamReader reader = new StreamReader(fileStream);

string fileContent = reader.ReadLine();

Adapters allow developers to work with high-level API and still get the benefits of stream batch processing.

Bringing all together

Back to the example from the very begining of this article. Now that you have learned everything about streams, decorators, and adapters, it is time to apply all newly obtained knowledge in practice.

This time we won’t be so naive and load 10 GB in memory, but process the data in smaller parts:

// open read stream to XML file
await using var xmlStream = new FileStream("people.xml", FileMode.Open);
using var xmlReader = XmlReader.Create(xmlStream);

// open write stream to JSON file
await using var jsonStream = new FileStream("people.json", FileMode.Create);
await using var streamWriter = new StreamWriter(jsonStream);
await using var jsonWriter = new JsonTextWriter(streamWriter);

// transfer data from one stream to another
jsonWriter.WriteStartArray();
while (xmlReader.Read())
{
if (xmlReader.NodeType == XmlNodeType.Element && xmlReader.Name == "person")
{
var person = new
{
Name = xmlReader.GetAttribute("name"),
Surname = xmlReader.GetAttribute("surname"),
Age = int.Parse(xmlReader.GetAttribute("age")),
};

var personJson = JsonSerializer.Serialize(person);

await jsonWriter.WriteRawAsync(personJson);
}
}
jsonWriter.WriteEndArray();

I know, I know 😒. The code is much bulkier than what we are used to, however, we have finally achieved are desired goal to reduce memory load.

Still not impressed 😨? Do not let those complicated code structures scare you. With a little help of refactoring, you can always hide all those grumpy low-level details in some kind of abstraction. Just turn on your imagination 😉

await foreach (var person in LazyLoadPeopleAsync())
{
// something meaningful
Console.WriteLine(person);
}

// reading a stream
// but for a calling code it is the same as using foreach loop
async IAsyncEnumerable<Person> LazyLoadPeopleAsync()
{
await using var xmlStream = new FileStream("people.xml", FileMode.Open);
using var xmlReader = XmlReader.Create(xmlStream);

while (xmlReader.Read())
{
if (xmlReader.NodeType == XmlNodeType.Element && xmlReader.Name == "person")
{
var person = new Person
{
Name = xmlReader.GetAttribute("name"),
Surname = xmlReader.GetAttribute("surname"),
Age = int.Parse(xmlReader.GetAttribute("age")),
};

yield return person;
}
}
}

If you still think I’m feeding you some kind of nonsense, you can finish reading right here 😒. Honestly, just by mastering all the information above you can confidently call yourself a stream guru🧙‍♂️.

However, I have prepared a few additional sections for you. Those can be handled only by experienced devs. There is no shame if you don’t consider yourself one of those. Whether read them or not is purely your choice 😉

Disposing of a stream

Streams often work with low-level concepts and unmanaged resources therefore should be disposed.

Initial design implies that you would call Close() to free up a stream, but then IDisposable was added and we ended up with two methods doing the same Close() and Dispose().

Calling Close()/Dispose() multiple times won’t cause any issues.

Not to pollute your code with closing a stream, use using or await using keywords.

Streams with a backing storage release unmanaged resources. Decorators are not required to be disposed. When calling Dispose() on decorators it will dispose underlying stream. The same is true for adapters. In the case of a chain of decorators/adapters, closing the outermost object closes the entire chain.

To improve the performance some streams bufferize the data. This means when you are writing data to a stream it will not immediately appear in a backing storage. Instead, it will first accumulate in the buffer before being written. To immediately write data from the buffer to the backing store the Flush() method is used. Flush() is automatically called when closing a stream.

Concurrent reading/writing to stream

Concurrent reading and writing to a stream can be challenging because streams are typically not designed for concurrent access. Surely you can use Semaphore, Monitor, and other locking mechanisms, but there is a better way.

There is a static method in a Stream class that returns a synchronized (thread-safe) wrapper for a given Stream called Stream.Synchronized(). The wrapper takes a lock for every operation, so you don’t have to.

// Create or open a text file for writing
await using FileStream fileStream = new FileStream("synchronized.txt", FileMode.OpenOrCreate);
// Create thread-safe stream
await using Stream synchronizedStream = Stream.Synchronized(fileStream);

// Write data to the synchronized stream from multiple threads
var tasks = Enumerable
.Range(0, 3)
.Select(index => Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
byte[] data = Encoding.UTF8.GetBytes($"Thread - {index} write {i}.\n");

// There is a lock operator inside .Write() method
synchronizedStream.Write(data);
}
})
.ToList();

await Task.WhenAll(tasks);

Output to Null

Frequently different API that works with streams provides other useful methods. In case you want to use those but discard all the streamed data there is a Stream.Null placeholder:

usgin var inputStream = new FileStream("file.csv");
using var outputStream = Stream.Null;

var processort = new CsvProcessor(inputStream, outputStream);
var headers = processort.GetHeaders();

There are also similar properties that allow you to redirect output to nowhere for adapters — TextReader.Null/TextWriter.Null.

Console.SetOut(TextWriter.Null);
Console.WriteLine("This line will not be printed.");

StringReader/StringWriter

A good cow has a bad calf. StringReader/StringWriter are adapters that inherit TextReader/TextReader but have nothing to do with streams. They work purely on strings. The question asking by itself:

Those are useful for generic operations, when you have your data in memory but want to use Stream API:

string json = @"
{
'name': 'John',
'surname': 'Doe'
}";

TextReader textReader = new StringReader(json);

// read a String instead of a Stream
JsonTextReader jsonReader = new JsonTextReader(textReader);

The same is true for a StringWriter. It is just a wrapper over StringBuilder with a TextWriter API.

using TextWriter fileWriter = new StreamWriter("output.txt");
using TextWriter stringWriter = new StringWriter();

// can write to the Stream and String
WriteData(fileWriter);
WriteData(stringWriter);

Console.WriteLine(stringWriter.ToString());

void WriteData(TextWriter textWriter)
{
textWriter.WriteLine("Hello World");
}

They both have Dispose(), which does nothing. But calling it won’t hurt.

MemoryStream

Another imposter among us is called MemoryStream. However, this one is a big deal, so let’s discuss it in closer detail, as it deserves.

As practice shows, not many know, that one of the beloved streams (MemoryStream) is not a real stream at all. It behaves like a stream, looks like a stream, however, does not truly embody its characteristics.

MemoryStream is just a wrapper over List<byte> with a stream API

Developers usually tend to forget that and to therefore the next mistakes usually occur:

  1. Load a whole backing store into memory

Often developers would just load a backing store into MemoryStream hoping to still get use of byte-by-byte processing and reduced memory allocation:

using var fileStream = new FileStream("text.txt", FileMode.Open);
using var memoryStream = new MemoryStream();

fileStream.CopyTo(memoryStream);

However, MemoryStream is not a stream! Everything is in List<byte>. As a result there is no chunk processing at all! Not only that, the entire backing store is in memory. That brings us back to the issues we tried to solve at the beginning.

Remember when you store something in MemoryStream it means you just store it in the memory of your app.

2. Wrong return type

You can often find a method that returns a Stream while in practice it is a MemoryStream:

public Stream GetData()
{
var memoryStream = new MemoryStream();
. . .

return memoryStream;
}

When all the data is loaded, it is preferable to return byte[] so as not to confuse calling code with an illusive idea working with a Stream:

public byte[] GetData() { . . . }

This will make your code cleaner and more understandable. After all, you have all the data anyway. MemoryStream and byte[] are almost like brothers. Not a surprise different APIs support both Stream and byte[].

However, MemoryStream is useful when you implement an interface from a third-party library and are forced to return a Stream:

// someoneelse's code you have no access to
interface IDataProvider
{
Stream GetData();
}

// your code
class MyDataProvider: IDataProvider
{
Stream GetData()
{
var memoryStream = new MemoryStream();
. . .
return memoryStream;
}
}

3. Buffer expanding

MemoryStream dynamically increases its buffer on a need:

With a code like this:

var ms = new MemoryStream();
for (int i = 0; i < 10_000; i++)
{
ms.WriteByte((byte)i);
}

There will be multiple reallocations of the inner array the same way it is with List<byte>. That is why it is preferable to set a capacity or write all the data at once.

4. Working with position

MemoryStream has a property (Position) to get and set the current position in a stream. Every time you write into a stream position is moved. Reading a stream will start from the current position.

MemoryStream is usually used for both writing and reading. In a such case, when you write something into a stream and then start reading from it, you will read after the last element:

using var stream = GetData();
using var reader = new StreamReader(stream);
var firstByte = reader.Read();
Console.WriteLine(firstByte); // -1, reading after the last element

Stream GetData()
{
var memoryStream = new MemoryStream();
memoryStream.WriteByte(42);

Console.WriteLine(memoryStream.Position); // 1

return memoryStream;
}

So you should change the Position before returning a stream for a reading:

public Stream GetData()
{
var memoryStream = new MemoryStream();
. . .

// allow calling code to read a stream from the begining
memoryStream.Position = 0;
return memoryStream;
}

5. Retrieve inner buffer

You can access writen bytes from MemoryStream with .ToArray() and .GetBuffer() methods.

.ToArray() :

  • returns a new byte array containing a copy of the MemoryStream contents
  • any modifications made to the byte array do not affect the original MemoryStream

.GetBuffer():

  • returns the underlying byte array used by the MemoryStream without creating a new copy
  • the returned array may have extra unused bytes at the end (beyond the length of the data in the stream)
  • any modification made to the byte array will affect the original MemoryStream
var ms = new MemoryStream(capacity: 3);
ms.WriteByte(23);

var arr1 = ms.ToArray(); // [23], a separate copy

var buffer = ms.GetBuffer();
buffer[0] = 19; // [19, 0, 0]

var arr2 = ms.ToArray(); // [19], the original stream has been modified

As always, it is a trade-off between memory allocation and safe usage.

6. Closing a stream

MemoryStream does not work with unmanaged resources, therefore Dispose() does not free anything. It is not mandatory to close it.

However, I still would suggest using it, to keep the code consistent. Additionally, it will also set CanRead/CanWrite to false.

Wrapping it up

What an adventure 😮‍💨. You did it😉 Hooray 🎉

I was not expecting it to be so big, but it is what it is 😌

Sorry for manipulating you in the middle. I just really did not want to get back to this topic multiple times 🙃. I am hoping you got the key ideas I tried to explore:

  • streams are used to transfer and modify large objects, not loading them entirely in memory, but in parts
  • decorators allow to extend stream behavior with bufferization, encryptions and so on without changing their API
  • adapters provide simplified API to work with text streams
  • streams should be closed/disposed after being used
  • Stream.Synchronized() allows to work with streams concurrently
  • use Stream.Null to discard the stream
  • StringReader/StringWriter provides a way to process strings as if they were streams
  • MemoryStream is nothing but a wrapper over List<byte>

Combining all those and you will enhance your ability to build scalable and robust applications 😉.

👏 Claps if you enjoyed this article

💬 Let me know what is your experience with streams

☕️ Buy me a coffee with a link below if you want to receive more of those

✅ And don’t forget to follow if you want to deep your C# knowledge

--

--

iamprovidence

👨🏼‍💻 Full Stack Dev writing about software architecture, patterns and other programming stuff https://www.buymeacoffee.com/iamprovidence