Exploring coroutines for file reading, in Kotlin and Python

Carmen Alvarez
11 min readJan 2, 2024

--

Why this article? 🤔

Coroutines can be difficult to wrap your head around. In an effort to try to understand them better, I dissected some examples of using them to read a file. I used to use Kotlin as my primary language, but have been using Python for a couple of years now. I thought it might help my understanding (and hopefully yours!) to play around with some simple examples. I initially had plans to write an article comparing coroutines in these two languages in general, with file reading being just one small example. Well, it turns out this one small example was enough for an entire article. 😅

What we’ll explore here 💡

Suppose we want to read a large file, without blocking the main thread. How can we use coroutines for this, and how do they behave differently in Kotlin versus Python?

Before jumping in, a quick definition of a coroutine, which should be enough for the purpose of this article, is from Brett Cannon, core Python developer:

functions whose execution you can pause.

We’ll see examples below of how a function’s execution is paused.

Requirements 📋

Consider a program which demonstrates reading a file without blocking the main thread. It:

  • In a continuous loop, prints the current time to the console, every few milliseconds.
  • Reads a large file, chunk by chunk. It prints a message to the console before starting to read the file, and upon completion, prints a message showing the number of bytes read.
  • To keep the examples as simple as possible, we won’t do any error handling.

We would like the loop which prints the current time to continue to execute while the large file is read. This will demonstrate that two tasks can be run “simultaneously”. (In a single-core system, they may not truly be run simultaneously, but the idea is that we continue to see the time being logged while the file is being read.)

Since the program prints the current time in a loop, to keep it simple, we must terminate the program with a ctrl-c keyboard interrupt.

Here’s what the program output could look like:

19:49:20.580397
Before reading file
19:49:20.631647
19:49:20.681933
19:49:20.732191
19:49:20.782460
19:49:20.832737
19:49:20.882948
19:49:20.933211
Done reading file, 560000000 bytes
19:49:20.984126
19:49:21.039549
19:49:21.094326
19:49:21.148235
^C

Let’s see how this can be done in Kotlin and in Python.

Building the script step by step 🏗️

In Kotlin, we can use a kts script, read_file.main.kts, and in Python, we can invoke a python script, read_file.py.

Step 1 — Logging

To start, we need to configure our scripts to include the current thread’s name when printing logs to the console. This will help us understand how the threads switch during the program execution.

In our Kotlin kts script, we can define a logging function:

/**
* Print a message to the screen, prefixed by the current thread name.
*/
fun log(message: Any) {
val logPrefix = Thread.currentThread().name
println("$logPrefix: $message")
}

In Python, we can use the built-in logger:

import logging

logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
logger = logging.getLogger()

Step 2 — Continuously print the current time to the console

Let’s define a function which continuously prints the current time every few milliseconds.

Kotlin:

/**
* Print the current time in an infinite loop on the main thread
*/
suspend fun printTimestampInfiniteLoop() {
while (true) {
log(LocalTime.now())
// Short delay between time logs to not fill up the terminal too quickly
delay(50)
}
}

Python:

import asyncio
import datetime

async def print_timestamp_infinite_loop():
"""
Print the current time in an infinite loop on the main thread
"""
while True:
logger.info(datetime.datetime.now(datetime.UTC).strftime("%H:%M:%S.%f"))
# Short delay between time logs to not fill up the terminal too quickly
await asyncio.sleep(0.05)

Note that these functions are both coroutine functions. In Kotlin, the function is defined with suspend, and in Python with async. By calling the delay function (which is a suspending function) in Kotlin, and by calling await asyncio.sleep() in Python, our code pauses execution. It doesn’t block the current thread: it pauses execution control so other coroutines can be executed while we wait for 50 ms. After 50ms, execution returns to our function, and we resume: we continue one more iteration in our while loop.

Step 3 — Read a file

Now let’s read a file chunk by chunk.

In Kotlin, we’ll do this using the IO Dispatcher. This means that the code reading the file will be executed on a separate thread, not the main thread.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import java.io.File

/**
* Read a potentially huge file. Process it progressively (chunk by chunk).
*/
suspend fun readFile() =
withContext(Dispatchers.IO) {
log("Before reading file")
var totalBytes = 0
File(args[0]).forEachBlock { buffer, bytesRead ->
totalBytes += bytesRead
}
log("Done reading file, $totalBytes bytes")
}

Note that forEachBlock performs blocking I/O while it reads the entire file.

In Python, we can use the aiofiles package to read the file using the Python asyncio library:

async def read_file():
"""
Read the contents of a potentially huge file, chunk by chunk.
"""
logger.info("Before reading file")
async with aiofiles.open(sys.argv[1], mode="rb") as file:
total_bytes = 0
while bytes := await file.read(100000):
total_bytes += len(bytes)
logger.info(f"Done reading file, {total_bytes} bytes")

Note that we call await for each chunk of file that we read. This allows the code to pause and thus relinquish control to another task while we wait for the operating system to read the bytes from the disk. Note that we do not indicate anywhere that our code should execute on a specific thread. It will execute on the main thread, but it won’t block the main thread. We’ll dig more into this later.

Step 4 — Launch the two functions “simultaneously”

Let’s define a main function which launches these two units of code (printing time in a loop, and reading a file) in a non-sequential way, and waits for them both to finish. To do this in Kotlin, we use awaitAll to wait for two Deferred instances (for our suspend functions), and in Python we use asyncio.gather to wait for two Awaitable instances (our async functions).

Kotlin:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll

suspend fun main(scope: CoroutineScope) {
val deferredPrintTimestampInfiniteLoop: Deferred<Unit> = scope.async { printTimestampInfiniteLoop() }
val deferredReadFile: Deferred<Unit> = scope.async { readFile() }
awaitAll(
deferredPrintTimestampInfiniteLoop,
deferredReadFile,
)
}

Python:

import asyncio

async def main():
coro_print_timestamp_infinite_loop = print_timestamp_infinite_loop()
coro_read_file = read_file()
await asyncio.gather(
coro_print_timestamp_infinite_loop,
coro_read_file,
)

Note: we’ll see in the next article, with examples using HTTP requests, how to use structured concurrency instead of awaitAll/asyncio.gather.

And finally, we need an entry point in our scripts, to launch our main function. In both Kotlin and Python, we can’t execute coroutines directly from the top-level of a script. We need a bridge between the async and sync worlds. In Kotlin, this is the runBlocking function, which executes a suspend function and waits for it to finish. In Python, it’s the asyncio.run function, which also executes a coroutine and waits for it to complete.

Kotlin:

import kotlinx.coroutines.runBlocking

runBlocking {
main(scope = this)
}

Python:

import asyncio

asyncio.run(main())

Full scripts

Let’s put it all together now.

Our kotlin script, read_file.main.kts, looks like this now:

@file:DependsOn("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.io.File
import java.time.LocalTime

/**
* Print a message to the screen, prefixed by the current thread name.
*/
fun log(message: Any) {
val logPrefix = Thread.currentThread().name
println("$logPrefix: $message")
}

/**
* Print the current time in an infinite loop on the main thread
*/
suspend fun printTimestampInfiniteLoop() {
while (true) {
log(LocalTime.now())
// Short delay between time logs to not fill up the terminal too quickly
delay(50)
}
}

/**
* Read a potentially huge file. Process it progressively (chunk by chunk).
*/
suspend fun readFile() =
withContext(Dispatchers.IO) {
log("Before reading file")
var totalBytes = 0
File(args[0]).forEachBlock { buffer, bytesRead ->
totalBytes += bytesRead
}
log("Done reading file, $totalBytes bytes")
}

suspend fun main(scope: CoroutineScope) {
val deferredPrintTimestampInfiniteLoop: Deferred<Unit> = scope.async { printTimestampInfiniteLoop() }
val deferredReadFile: Deferred<Unit> = scope.async { readFile() }
awaitAll(
deferredPrintTimestampInfiniteLoop,
deferredReadFile,
)
}

runBlocking {
main(scope = this)
}

And our python script, read_file.py, is as follows:

import asyncio
import datetime
import logging
import sys

import aiofiles

logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
logger = logging.getLogger()


async def print_timestamp_infinite_loop():
"""
Print the current time in an infinite loop on the main thread
"""
while True:
logger.info(datetime.datetime.now(datetime.UTC).strftime("%H:%M:%S.%f"))
# Short delay between time logs to not fill up the terminal too quickly
await asyncio.sleep(0.05)


async def read_file():
"""
Read the contents of a potentially huge file, chunk by chunk.
"""
logger.info("Before reading file")
async with aiofiles.open(sys.argv[1], mode="rb") as file:
total_bytes = 0
while bytes := await file.read(100000):
total_bytes += len(bytes)
logger.info(f"Done reading file, {total_bytes} bytes")


async def main():
coro_print_timestamp_infinite_loop = print_timestamp_infinite_loop()
coro_read_file = read_file()
await asyncio.gather(
coro_print_timestamp_infinite_loop,
coro_read_file,
)


asyncio.run(main())

Running the scripts ⚡️

Let’s see what happens when we run them.

% kotlin read_file.main.kts data/huge_file.txt
main: 19:08:22.334194
DefaultDispatcher-worker-1: Before reading file
main: 19:08:22.386223
main: 19:08:22.437327
main: 19:08:22.488366
main: 19:08:22.540473
main: 19:08:22.591919
DefaultDispatcher-worker-1: Done reading file, 560000000 bytes
main: 19:08:22.642569
main: 19:08:22.693449
^C
% python read_file.py data/huge_file.txt
MainThread: 18:10:13.240763
MainThread: Before reading file
MainThread: 18:10:13.291492
MainThread: 18:10:13.341917
MainThread: 18:10:13.392638
MainThread: Done reading file, 560000000 bytes
MainThread: 18:10:13.442893
MainThread: 18:10:13.494154
^C

In both cases, the infinite loop continued to print the timestamp while the file was being read, which is exactly what we want.

We can notice, however, one difference. In the Python example, all logs indicate execution on the main thread, whereas in the Kotlin example, the file reading is done on a separate worker thread. One reason for putting file reading on a separate worker thread in Kotlin, is that the lines of code which read each chunk from the file don’t pause execution control: the file reading api is not a coroutine api. In Python, however, each read of a chunk is a call to a coroutine function in the aiofiles module. This function pauses execution control while it waits for data.

Emulating the Kotlin behavior in Python

We can make a blocking version of our read_file coroutine, by removing the async and await keywords, and by using the standard library open function, instead of the aiofiles.open function. Let’s call it read_file_blocking:

def read_file_blocking():
"""
Read the contents of a potentially huge file, chunk by chunk.
"""
logger.info("Before reading file")
with open(sys.argv[1], mode="rb") as file:
total_bytes = 0
while bytes := file.read(4096):
total_bytes += len(bytes)
logger.info(f"Done reading file, {total_bytes} bytes")

To mimic the execution on the Kotlin’s IO Dispatcher, we can adapt our read_file function to submit our read_file_blocking function to a thread pool executor:

async def read_file():
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(thread_name_prefix="IO") as pool:
return await loop.run_in_executor(pool, read_file_blocking)

Note that this function uses async/await keywords, and the event loop from the asyncio library. This function provides a bridge between asyncio and threading apis. run_in_executor submits the function to be run in the thread pool executor, and returns an awaitable that we can await from inside our coroutine. While we await, the execution of our coroutine is suspended.

The rest of the Python script is the same as before. We can execute it now:

% python read_file.py data/huge_file.txt
MainThread: 19:23:57.909288
IO_0: Before reading file
MainThread: 19:23:57.960380
MainThread: 19:23:58.010814
MainThread: 19:23:58.061164
MainThread: 19:23:58.111472
MainThread: 19:23:58.161957
MainThread: 19:23:58.213249
MainThread: 19:23:58.264203
IO_0: Done reading file, 560000000 bytes
MainThread: 19:23:58.315637
MainThread: 19:23:58.366420
MainThread: 19:23:58.417175
^C

We see that the file reading is done on a second thread, whose name is IO_0. The “IO_” part of the thread name comes from the thread_name_prefix we provided when creating the ThreadPoolExecutor.

If we don’t care about which thread pool our blocking function runs on, we can simplify the read_file function to use asyncio.to_thread to choose a thread for us:

async def read_file():
return await asyncio.to_thread(read_file_blocking)

The asyncio library provides a variety of apis to bridge to threads.

In fact, the aiofiles library, which we used in the first version of our Python script, itself uses a thread pool under the hood. So in the end, the file reading is not so different from our Kotlin version after all: both perform the actual disk reads on a separate thread. The main difference is that execution control in the Python example paused for each chunk read, whereas in our initial Kotlin version, it paused only after reading the entire file.

Trying to emulate the Python behavior in Kotlin

We can adapt our Kotlin script to emulate the behavior of the Python script: it can read the file chunks in the IO Dispatcher, pausing execution for each chunk.

To do this, we can look at how the forEachBlock function in the kotlin coroutines library is implemented. The source is here. We can take inspiration from it to adapt our readFile() function.

/**
* Read a potentially huge file. Process it progressively (chunk by chunk).
*/
suspend fun readFile() {
val arr = ByteArray(100000)
var totalBytes = 0
File(args[0]).inputStream().use { input ->
log("Before reading file")
do {
// Read the chunk in another thread
val size = withContext(Dispatchers.IO) {
input.read(arr)
}
if (size <= 0)
break
totalBytes += size
} while (true)
log("Done reading file, $totalBytes bytes")
}
}

We now use the IO Dispatcher for reading each chunk, not around the whole file read. This allows the system to switch over to any other coroutines which may be ready to execute, between chunk reads. In our case, this is the time logger coroutine. Our bytes counting, and logging before and at the end of the file reading, are now done on the main thread.

We can test our modification:

% kotlin read_file.main.kts data/huge_file.txt
main: 21:38:32.960539
main: Before reading file
main: 21:38:33.011651
main: 21:38:33.061834
main: 21:38:33.112022
main: 21:38:33.162178
main: 21:38:33.212363
main: 21:38:33.262520
main: Done reading file, 560000000 bytes
main: 21:38:33.316462
main: 21:38:33.371806
main: 21:38:33.424305
main: 21:38:33.475333
main: 21:38:33.528290
main: 21:38:33.582270
main: 21:38:33.636989
^C

We see that the our file logging and the time logging are both executed on the main thread, and that the time logging continues to execute while the file reading is in progress.

Performance impact of chunk size on thread switching 🏃‍♀️

When the entire file is read continuously in a separate thread, the chunk size does not seem to impact the time it takes to read the file. However, when we pause, and switch execution control to another thread after each chunk, we see a direct impact of the chunk size on the time it takes to read the entire file.

The smaller the chunk size, the more often the program switches threads, and the longer it takes to read the file.

For example, here are some reading times for a 560 Mb file for different scenarios, on a machine with 64 Gb of RAM and an Intel Core i9 processor. The time indicated is the time to read the entire file.

+-------------------+------------+--------+--------+
| Pause execution | Chunk size | Python | Kotlin |
+-------------------+------------+--------+--------+
| After entire file | 4096 | <1s | <1s |
| After entire file | 1000000 | <1s | <1s |
| With each chunk | 4096 | 22s | 3s |
| With each chunk | 1000000 | <1s | <1s |
+-------------------+------------+--------+--------+

What about coroutines not based on threads?

For reading files, while aiofiles uses threads under the hood, aiofile (without the s at the end!) takes advantage of libaio, which provides kernel-based async i/o for Linux. However, even aiofile uses threading where libaio isn’t available.

Recap 📝

We explored examples in Kotlin and Python of using coroutines to read a file without blocking the main thread, with examples of the pre and post processing of the file being done in the main or background thread, and with the file reading function pausing either for each chunk read, or only once after reading the whole file.

Cheat sheet 🤓

Here’s a cheat sheet analogy table of Kotlin and Python coroutine concepts explored in this article. The concepts may not be identical in the two languages.

+------------------+------------------+---------------------------------+
| Kotlin | Python | Concept |
+------------------+------------------+---------------------------------+
| suspend | async | Identifies a function as a |
| | | coroutine function. |
| | | |
| - | await | Used when calling a coroutine |
| | | function from within another |
| | | coroutine function. |
| | | |
| awaitAll() | asyncio.gather() | Wait for multiple coroutine |
| | | functions to complete. |
| | | |
| runBlocking | asyncio.run() | Run coroutine code from a |
| | | synchronous entry point. |
| | | |
| delay() | asyncio.sleep() | Coroutine function to sleep for |
| | | a specified duration. |
| | | |
| withContext | await | Run coroutine code on |
| (Dispatchers.IO) | asyncio. | another thread. |
| | to_thread(func) | |
+------------------+------------------+---------------------------------+

More reading

Kotlin coroutines documentation.

Python asyncio documentation.

--

--