Exploring coroutines for HTTP requests, in Kotlin and Python

Carmen Alvarez
17 min readJan 2, 2024

--

Why this article? 🤔

After exploring coroutines for file reading, in Kotlin and Python, I thought I’d do a dive into how to use coroutines for network requests.

Requirements 📋

Consider a program which demonstrates using coroutines to make HTTP requests without blocking the main thread. It:

  • Prints the current time to the console, every few milliseconds, in a continuous loop. This allows us to verify that the main thread remains unblocked by HTTP requests.
  • Makes two HTTP requests sequentially.
  • After the two sequential requests are done, makes four HTTP requests in parallel.
  • When the four parallel HTTP requests have all completed, stops all logging and terminates the program.

Example execution logs

Here’s what the program output could look like, logging timestamps every 100ms, and doing requests to httpbin.org, to simply respond with information echoing our requests (we don’t really care about the content of the responses for the purposes of this article).

21:19:45.706621
Starting request <https://httpbin.org/get?id=sequential_1>
21:19:45.807538
21:19:45.908119
21:19:46.009530
21:19:46.110828
21:19:46.211967
Finished request <https://httpbin.org/get?id=sequential_1>
Starting request <https://httpbin.org/get?id=sequential_2>
21:19:46.312446
Finished request <https://httpbin.org/get?id=sequential_2>
Starting request <https://httpbin.org/get?id=parallel_a>
Starting request <https://httpbin.org/get?id=parallel_b>
Starting request <https://httpbin.org/get?id=parallel_c>
Starting request <https://httpbin.org/get?id=parallel_d>
21:19:46.413839
Finished request <https://httpbin.org/get?id=parallel_a>
21:19:46.514443
21:19:46.615969
21:19:46.717377
Finished request <https://httpbin.org/get?id=parallel_d>
Finished request <https://httpbin.org/get?id=parallel_c>
21:19:46.818859
21:19:46.920149
Finished request <https://httpbin.org/get?id=parallel_b>
DONE

We can see:

  • Timestamps are logged in a regular interval (every 100ms, without interruption).
  • We see “Starting request” and corresponding “Finished request” logs, with timestamps in between them, indicating that the main thread wasn’t blocked during the time it took to perform the HTTP request.
  • We see logs in order for sequential requests: “Starting”, then “Finished” for the request with “id=sequential_1”. Followed by “Starting”, then “Finished” for the request with “id=sequential_2”.
  • For the parallel requests, we see the “Starting” requests all before any “Finished request”. We also notice that the “Finished request” logs don’t arrive for the requests in the same order as “Starting” logs. We start requests in the order, a, b, c, d, and receive the responses in the order a, d, c, b. We also see a few timestamp logs interspersed with request logs for the parallel requests.

Concepts we’ll explore 💡

In order to create the program per the requirements, we’ll need:

  • an HTTP client library with coroutine support in Kotlin and Python,
  • to configure the HTTP client to log requests and responses,
  • to be able to launch jobs/tasks sequentially, then in parallel,
  • to be able to cancel a job/task: so that we can interrupt our infinite loop timestamp logging once all http responses have been received.

Let’s build this brick by brick.

Building the script step by step 🏗️

In Kotlin, we can use a kts script, http_request_multi.main.kts, and in Python, we can invoke a Python script, http_request_multi.py.

Step 1 — Logging

To start, we’ll configure logging to include the thread name in each log. We’ll see in our examples that all our logs are on the main thread.

In our Kotlin kts script, we can define a logging function:

/**
* Print a message to the screen, prefixed by the current thread name.
*/
fun log(message: Any) {
val logPrefix = Thread.currentThread().name
println("$logPrefix: $message")
}

In Python, we can use the built-in logger:

logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
logger = logging.getLogger()

Step 2 — Continuously print the current time to the console

Let’s define a function which continuously prints the current time every few milliseconds.

Kotlin:

/**
* Print the current time in an infinite loop on the main thread
*/
suspend fun printTimestampInfiniteLoop() {
while (true) {
// 1. Log a timestamp in a loop to indicate that the main thread is
// active.
log(LocalTime.now())
// 2. Short delay between time logs to not fill up the terminal too
// quickly.
delay(100)
// 3. Don't need to catch CancellationException to exit:
// https://kotlinlang.org/docs/exception-handling.html#cancellation-and-exceptions
}
}

Python:

async def print_timestamp_infinite_loop():
"""
Print the current time in an infinite loop on the main thread
"""
while True:
# 1. Log a timestamp in a loop to indicate that the main thread is
# active.
logger.info(datetime.datetime.now(datetime.UTC).strftime("%H:%M:%S.%f"))
# 2. Short delay between time logs to not fill up the terminal too
# quickly.
try:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
# 3. Exit the loop if a task for this coroutine function is
# cancelled.
break

The steps for the logging are:

  1. In a while loop, log the current time.
  2. Suspend execution for 100 ms, giving a chance to other coroutine code to execute.
  3. In Python only: handle the CancelledError: When a task for this coroutine is canceled, we catch the CancelledError raised by the asyncio library, and stop the infinite loop. If we don’t catch the CancelledError, the execution will terminate, but we’ll see a traceback error in the console. In Kotlin, explicitly catching the exception isn’t needed to avoid a stacktrace, as CancellationExceptions are handled as part of job canceling: they don’t result in errors.

Before we use our timestamp logger, we have to create the bricks to perform HTTP requests.

Step 3 — Configure an HTTP client

Kotlin:

/**
* Create an http client with request/response logging enabled.
*/
fun createHttpClient(): HttpClient {
// 1. Define a logging function for just before we send out the request
fun logRequestBegin(request: HttpRequestBuilder) {
log("Starting request ${request.url}")
}

// 2. Define a logging function for just after we receive the response
fun logRequestEnd(response: HttpClientCall) {
log("Finished request ${response.request.url}")
}
// 3. Create the client
val client = HttpClient(CIO)

// 4. Configure logging on the client
client.plugin(HttpSend).intercept { request ->
logRequestBegin(request)
val response = execute(request)
logRequestEnd(response)
response
}
return client
}

Python:

def create_http_client() -> AsyncContextManager[aiohttp.ClientSession]:
"""
Create an http client with request/response logging enabled.
"""

# 1. Define a logging function for just before we send out the request
async def log_request_begin(session, context, params: TraceRequestStartParams):
logger.info(f"Starting request <{params.url}>")

# 2. Define a logging function for just after we receive the response
async def log_request_end(session, context, params: TraceRequestEndParams):
logger.info(f"Finished request <{params.url}>")

trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(log_request_begin)
trace_config.on_request_end.append(log_request_end)

# 3. Create the client
return aiohttp.ClientSession(
# 4. Configure logging on the client
trace_configs=[trace_config],
)

In Kotlin, we use the Ktor client, and in Python we use AIOHTTP. Both libraries expose APIs for making HTTP requests using coroutines. In our examples, we will be using the APIs to execute GET requests.

We will call the library functions from the main thread, and will receive the responses on the main thread. However, the actual blocking network I/O is not performed on the main thread. We’ll see more about this later.

To create our client, with logging configured:

  1. We define a function to log the url of the request before we execute the request.
  2. We define a function to log the url of the request as soon as we receive the response.
  3. We instantiate the client.
  4. We hook our request/response logging functions to the client.

Note that the logging functions in Python (AIOHTTP) are async, but the logging functions in Kotlin (Ktor) are not suspend.

Step 4 — Function to make a single request

We now create a suspend/async function which uses our client to execute a request, and logs the first few bytes of the response.

Kotlin:

/**
* Execute an http request and log the beginning of the response
*/
suspend fun executeRequest(
client: HttpClient,
url: String,
) {
// 1. Use our client to execute the request and get the response
// Note that get() is suspend.
val response = client.get(url)

// 2. Extract the response as text.
// Note that bodyAsText() is suspend.
val content = response.bodyAsText()

// 3. Log the beginning of the response.
log("Got Response: ${content.substring(0..40)}...")
}

Python:

async def execute_request(client: aiohttp.ClientSession, url: str):
"""
Execute an http request and log the beginning of the response
"""
# 1. Use our client to execute the request and get the response
async with client.get(url) as response:
# 2. Extract the response as text.
content = await response.text()

# 3. Log the beginning of the response.
logger.info(f"Got response: {content[0:40]}...")

The steps for executing a request are very similar in both languages:

  1. We use the get function of the client to make the request. In Kotlin this is a suspend function, and in Python it’s an async function that we must await.
  2. We extract the content of the response as text. This too is a suspend/async function.
  3. Finally, we log the first few bytes of the response text.

Although our code reads as if it were continuous/blocking, the use of coroutines with the suspend/async functions get()/get() and bodyAsText()/text() allows other code to be executed on the main thread while we wait for and read the response from the remote server.

Step 5— Function to execute a few requests, sequentially then in parallel

Now that we know how to execute one request, let’s combine a few requests: let’s execute two requests sequentially, then four requests in parallel.

Kotlin:

/**
* Execute two sequential requests, followed by 4 parallel requests.
*/
suspend fun doHttpRequests() {
// 1. Create our client
createHttpClient().use { client ->
coroutineScope {
// 2. Launch two requests sequentially
executeRequest(client, "https://httpbin.org/get?id=sequential_1")
executeRequest(client, "https://httpbin.org/get?id=sequential_2")
// 3. Launch four requests in parallel, and suspend until they complete
joinAll(
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_a") },
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_b") },
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_c") },
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_d") },
)
log("DONE")
}
}
}

Python:

async def do_http_requests():
"""
Execute two sequential requests, followed by 4 parallel requests.
"""
# 1. Create our client
async with create_http_client() as client:
# 2. Launch two requests sequentially
await execute_request(client, "https://httpbin.org/get?id=sequential_1")
await execute_request(client, "https://httpbin.org/get?id=sequential_2")

# 3. Launch four requests in parallel, and await until they complete
await asyncio.gather(
execute_request(client, "https://httpbin.org/get?id=parallel_a"),
execute_request(client, "https://httpbin.org/get?id=parallel_b"),
execute_request(client, "https://httpbin.org/get?id=parallel_c"),
execute_request(client, "https://httpbin.org/get?id=parallel_d"),
)
logger.info("DONE")

The flow is similar in both languages:

  1. Create an instance of our client. Note that in Kotlin the client is Closeable, and in Python the client is an async context manager. Both approaches have the same purpose here: to automatically close the client upon exiting the block.
  2. Execute two sequential requests. Remember, in Kotlin, our executeRequest() function is suspend, and in Python execute_request() is async. In Kotlin, we call this function directly, and in Python, we call it with await. In both cases, this means that our function (doHttpRequests/do_http_requests) will suspend execution while waiting for the first sequential request to terminate. Only when the first request terminates, will we start the second sequential request. And while we suspend execution while waiting for the response, other coroutine code in our program can execute (in particular, our function which logs timestamps — we’ll see how it’s launched later).
  3. Execute four parallel requests. Here the syntax is a bit different in Kotlin and Python, but the concept is similar. In Kotlin, we use launch to launch executeRequest() for each of the four parallel requests. Launch returns a Job, and returns this job immediately. In Python, the equivalent approach is to call the async function execute_request() without using await. The function immediately returns a Coroutine instance. Once we have a Job in Kotlin, or a Coroutine in Python, we use a function to wait for all four of these “awaitables” to complete. In Kotlin, we use joinAll, and in Python asyncio.gather. Once all four parallel requests have completed, we log “DONE”.

Step 6— Execute our requests while we also log on the main thread

We have our function to continuously log timestamps to the main thread. We have our function to perform multiple requests. Let’s combine those two now.

Kotlin:

suspend fun main() =
// 1. Create a coroutine scope which will wait
// for all jobs to complete.
coroutineScope {
// 2. Launch both the infinite timestamp loop and the requests, as jobs
val jobPrintTimestampInfiniteLoop: Job = launch { printTimestampInfiniteLoop() }
val jobDoHttpRequests: Job = launch { doHttpRequests() }

// 3. Once our requests are done, cancel the infinite loop logging timestamps
jobDoHttpRequests.invokeOnCompletion {
jobPrintTimestampInfiniteLoop.cancel()
}
// 4. Wait for everything to complete
// (thanks to the coroutine scope)
}

Python:

async def main():
# 1. Create a TaskGroup which will wait
# for all tasks to complete.
async with asyncio.TaskGroup() as tg:
# 2. Launch both the infinite timestamp loop and the requests, as tasks
task_print_timestamp_infinite_loop = tg.create_task(
print_timestamp_infinite_loop()
)
task_do_http_requests = tg.create_task(do_http_requests())

# 3. Once our requests are done, cancel the infinite loop logging timestamps
task_do_http_requests.add_done_callback(
lambda _: task_print_timestamp_infinite_loop.cancel()
)

# 4. Wait for everything to complete
# (thanks to the TaskGroup async context manager)

The steps in Kotlin and Python are analogous, even though the terminology differs a little bit. In our main function:

  1. Create a coroutine scope (Kotlin) and TaskGroup (Python). This will wait for all jobs/tasks created inside it to complete. This is an example of structured concurrency.
  2. We create jobs (Kotlin) and tasks (Python) for our two functions: logging timestamps, and doing HTTP requests. We continue to the next step without yet waiting for these tasks to execute.
  3. We add a hook to the end of the “do http requests” task: to cancel the “infinite loop” logging timestamps. Once all HTTP requests have completed, and their responses logged, this hook will be executed to terminate our timestamp logger.
  4. The code waits for the jobs/tasks to complete before exiting the coroutine scope / TaskGroup.

Step 7— Entry point

We create an entry point in our script to launch our main function. Since our main function is a suspend/async function, we need to use an API that bridges between the blocking and asynchronous worlds. We must use a blocking function as the entry point of our script.

Kotlin:

runBlocking {
main()
}

Python:

asyncio.run(main())

In Kotlin, this is the runBlocking function, which executes a suspend function and waits for it to finish. In Python, it’s the asyncio.run function, which also executes a coroutine and waits for it to complete.

Full scripts

Let’s put it all together now.

Our Kotlin script, http_requests_multi.main.kts, looks like this now:

@file:DependsOn("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
@file:DependsOn("io.ktor:ktor-client-cio-jvm:2.3.7")
@file:DependsOn("io.ktor:ktor-client-core-jvm:2.3.7")

import io.ktor.client.HttpClient
import io.ktor.client.call.HttpClientCall
import io.ktor.client.engine.cio.CIO
import io.ktor.client.plugins.HttpSend
import io.ktor.client.plugins.plugin
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.client.request.get
import io.ktor.client.statement.bodyAsText
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.time.LocalTime

/**
* Print a message to the screen, prefixed by the current thread name.
*/
fun log(message: Any) {
val logPrefix = Thread.currentThread().name
println("$logPrefix: $message")
}

/**
* Print the current time in an infinite loop on the main thread
*/
suspend fun printTimestampInfiniteLoop() {
while (true) {
// 1. Log a timestamp in a loop to indicate that the main thread is
// active.
log(LocalTime.now())
// 2. Short delay between time logs to not fill up the terminal too
// quickly.
delay(100)
// 3. Don't need to catch CancellationException to exit:
// https://kotlinlang.org/docs/exception-handling.html#cancellation-and-exceptions
}
}

/**
* Create an http client with request/response logging enabled.
*/
fun createHttpClient(): HttpClient {
// 1. Define a logging function for just before we send out the request
fun logRequestBegin(request: HttpRequestBuilder) {
log("Starting request ${request.url}")
}

// 2. Define a logging function for just after we receive the response
fun logRequestEnd(response: HttpClientCall) {
log("Finished request ${response.request.url}")
}
// 3. Create the client
val client = HttpClient(CIO)

// 4. Configure logging on the client
client.plugin(HttpSend).intercept { request ->
logRequestBegin(request)
val response = execute(request)
logRequestEnd(response)
response
}
return client
}

/**
* Execute an http request and log the beginning of the response
*/
suspend fun executeRequest(
client: HttpClient,
url: String,
) {
// 1. Use our client to execute the request and get the response
// Note that get() is suspend.
val response = client.get(url)

// 2. Extract the response as text.
// Note that bodyAsText() is suspend.
val content = response.bodyAsText()

// 3. Log the beginning of the response.
log("Got Response: ${content.substring(0..40)}...")
}

/**
* Execute two sequential requests, followed by 4 parallel requests.
*/
suspend fun doHttpRequests() {
// 1. Create our client
createHttpClient().use { client ->
coroutineScope {
// 2. Launch two requests sequentially
executeRequest(client, "https://httpbin.org/get?id=sequential_1")
executeRequest(client, "https://httpbin.org/get?id=sequential_2")
// 3. Launch four requests in parallel, and suspend until they complete
joinAll(
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_a") },
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_b") },
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_c") },
launch { executeRequest(client, "https://httpbin.org/get?id=parallel_d") },
)
log("DONE")
}
}
}

suspend fun main() =
// 1. Create a coroutine scope which will wait
// for all jobs to complete.
coroutineScope {
// 2. Launch both the infinite timestamp loop and the requests, as jobs
val jobPrintTimestampInfiniteLoop: Job = launch { printTimestampInfiniteLoop() }
val jobDoHttpRequests: Job = launch { doHttpRequests() }

// 3. Once our requests are done, cancel the infinite loop logging timestamps
jobDoHttpRequests.invokeOnCompletion {
jobPrintTimestampInfiniteLoop.cancel()
}
// 4. Wait for everything to complete
// (thanks to the coroutine scope)
}

runBlocking {
main()
}

And our Python script, http_request_multi.py, looks like this:

import asyncio
import datetime
import logging
from typing import AsyncContextManager

import aiohttp
from aiohttp import TraceRequestEndParams, TraceRequestStartParams

logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
logger = logging.getLogger()


async def print_timestamp_infinite_loop():
"""
Print the current time in an infinite loop on the main thread
"""
while True:
# 1. Log a timestamp in a loop to indicate that the main thread is
# active.
logger.info(datetime.datetime.now(datetime.UTC).strftime("%H:%M:%S.%f"))
# 2. Short delay between time logs to not fill up the terminal too
# quickly.
try:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
# 3. Exit the loop if a task for this coroutine function is
# cancelled.
break


def create_http_client() -> AsyncContextManager[aiohttp.ClientSession]:
"""
Create an http client with request/response logging enabled.
"""

# 1. Define a logging function for just before we send out the request
async def log_request_begin(session, context, params: TraceRequestStartParams):
logger.info(f"Starting request <{params.url}>")

# 2. Define a logging function for just after we receive the response
async def log_request_end(session, context, params: TraceRequestEndParams):
logger.info(f"Finished request <{params.url}>")

trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(log_request_begin)
trace_config.on_request_end.append(log_request_end)

# 3. Create the client
return aiohttp.ClientSession(
# 4. Configure logging on the client
trace_configs=[trace_config],
)


async def execute_request(client: aiohttp.ClientSession, url: str):
"""
Execute an http request and log the beginning of the response
"""
# 1. Use our client to execute the request and get the response
async with client.get(url) as response:
# 2. Extract the response as text.
content = await response.text()

# 3. Log the beginning of the response.
logger.info(f"Got response: {content[0:40]}...")


async def do_http_requests():
"""
Execute two sequential requests, followed by 4 parallel requests.
"""
# 1. Create our client
async with create_http_client() as client:
# 2. Launch two requests sequentially
await execute_request(client, "https://httpbin.org/get?id=sequential_1")
await execute_request(client, "https://httpbin.org/get?id=sequential_2")

# 3. Launch four requests in parallel, and await until they complete
await asyncio.gather(
execute_request(client, "https://httpbin.org/get?id=parallel_a"),
execute_request(client, "https://httpbin.org/get?id=parallel_b"),
execute_request(client, "https://httpbin.org/get?id=parallel_c"),
execute_request(client, "https://httpbin.org/get?id=parallel_d"),
)
logger.info("DONE")


async def main():
# 1. Create a TaskGroup which will wait
# for all tasks to complete.
async with asyncio.TaskGroup() as tg:
# 2. Launch both the infinite timestamp loop and the requests, as tasks
task_print_timestamp_infinite_loop = tg.create_task(
print_timestamp_infinite_loop()
)
task_do_http_requests = tg.create_task(do_http_requests())

# 3. Once our requests are done, cancel the infinite loop logging timestamps
task_do_http_requests.add_done_callback(
lambda _: task_print_timestamp_infinite_loop.cancel()
)

# 4. Wait for everything to complete
# (thanks to the TaskGroup async context manager)


asyncio.run(main())

Running the scripts ⚡️

Let’s see what happens when we run them.

Kotlin:

% kotlin kotlin/http_request_multi.main.kts
main: 23:51:29.436511
main: Starting request https://httpbin.org/get?id=sequential_1
main: 23:51:29.573537
main: 23:51:29.675082
main: 23:51:29.776763
main: 23:51:29.881834
main: 23:51:29.982193
main: 23:51:30.083741
main: 23:51:30.185381
main: Finished request https://httpbin.org/get?id=sequential_1
main: Got Response: {
"args": {
"id": "sequential_1"
...
main: Starting request https://httpbin.org/get?id=sequential_2
main: 23:51:30.470539
main: 23:51:30.575804
main: 23:51:30.682949
main: 23:51:30.785547
main: 23:51:30.887299
main: Finished request https://httpbin.org/get?id=sequential_2
main: Got Response: {
"args": {
"id": "sequential_2"
...
main: Starting request https://httpbin.org/get?id=parallel_a
main: Starting request https://httpbin.org/get?id=parallel_b
main: Starting request https://httpbin.org/get?id=parallel_c
main: Starting request https://httpbin.org/get?id=parallel_d
main: 23:51:30.991328
main: 23:51:31.094226
main: 23:51:31.195293
main: Finished request https://httpbin.org/get?id=parallel_a
main: Got Response: {
"args": {
"id": "parallel_a"
},...
main: 23:51:31.297012
main: Finished request https://httpbin.org/get?id=parallel_d
main: Got Response: {
"args": {
"id": "parallel_d"
},...
main: Finished request https://httpbin.org/get?id=parallel_c
main: Got Response: {
"args": {
"id": "parallel_c"
},...
main: Finished request https://httpbin.org/get?id=parallel_b
main: Got Response: {
"args": {
"id": "parallel_b"
},...
main: DONE

Python:

% python python/http_request_multi.py      
MainThread: 22:53:38.964975
MainThread: Starting request <https://httpbin.org/get?id=sequential_1>
MainThread: 22:53:39.065433
MainThread: 22:53:39.165765
MainThread: 22:53:39.266281
MainThread: 22:53:39.366913
MainThread: 22:53:39.468264
MainThread: Finished request <https://httpbin.org/get?id=sequential_1>
MainThread: Got response: {
"args": {
"id": "sequential_1"
...
MainThread: Starting request <https://httpbin.org/get?id=sequential_2>
MainThread: Finished request <https://httpbin.org/get?id=sequential_2>
MainThread: Got response: {
"args": {
"id": "sequential_2"
...
MainThread: Starting request <https://httpbin.org/get?id=parallel_a>
MainThread: Starting request <https://httpbin.org/get?id=parallel_b>
MainThread: Starting request <https://httpbin.org/get?id=parallel_c>
MainThread: Starting request <https://httpbin.org/get?id=parallel_d>
MainThread: 22:53:39.568565
MainThread: Finished request <https://httpbin.org/get?id=parallel_a>
MainThread: Got response: {
"args": {
"id": "parallel_a"
}...
MainThread: 22:53:39.668997
MainThread: 22:53:39.769671
MainThread: 22:53:39.870366
MainThread: Finished request <https://httpbin.org/get?id=parallel_c>
MainThread: Finished request <https://httpbin.org/get?id=parallel_d>
MainThread: Got response: {
"args": {
"id": "parallel_d"
}...
MainThread: Got response: {
"args": {
"id": "parallel_c"
}...
MainThread: 22:53:39.971809
MainThread: 22:53:40.073280
MainThread: 22:53:40.174812
MainThread: Finished request <https://httpbin.org/get?id=parallel_b>
MainThread: Got response: {
"args": {
"id": "parallel_b"
}...
MainThread: DONE

In both examples, we see that:

✔ all logs are on the main thread.

✔ the timestamps are logged continuously, interspersed with request logging.

✔ sequential requests are logged in order.

✔ parallel requests are started in order, and responses are received in a different order. All responses are received.

Recap 📝

  • We explored examples in Kotlin and Python of using coroutines to make HTTP requests without blocking the main thread.
  • We explored APIs to launch several HTTP requests in parallel.
  • We used jobs in Kotlin, and tasks in Python, to cancel an “infinite loop” logging coroutine at the end of the HTTP request processing.
  • Finally, we saw an example of structured concurrency using a coroutine scope in Kotlin and a TaskGroup in Python.

Cheat sheet 🤓

Here’s a cheat sheet analogy table of Kotlin and Python concepts explored in this article. The concepts may not be identical in the two languages.


+----------------------+-------------------+-----------------------------+
| Kotlin | Python | Concept |
+----------------------+-------------------+-----------------------------+
| delay() | asyncio.sleep() | Coroutine function to sleep |
| | | for a specified duration. |
| | | |
| suspend fun coro(){} | async def coro: | Identifies a function as a |
| | | coroutine function. |
| | | |
| coro() | await coro() | Call suspend/async |
| | | function coro and suspend |
| | | until it completes. |
| | | |
| launch {coro() } | coro() | Create a Job/Awaitable |
| | | from a suspend/async |
| | | function coro. |
| | | |
| joinAll() | asyncio.gather() | Explicitly wait for |
| | | multiple coroutine |
| | | functions to complete. |
| | | |
| coroutineScope {} | with asyncio. | Structured concurrency |
| | TaskGroup(): | to wait for jobs/tasks |
| | | to complete. |
| | | |
| invokeOnCompletion | add_done_callback | Execute code on |
| | | job/task completion |
| | | |
| Job.cancel() | Task.cancel() | Cancel a coroutine |
| | | job/task |
| | | |
| Closeable/ | Context manager/ | Does cleanup when |
| use | with | the block exits. |
| | | |
| runBlocking | asyncio.run() | Run coroutine code from a |
| | | synchronous entry point. |
+----------------------+-------------------+-----------------------------+

Further topics to explore 🔍

Other HTTP libraries

httpx is another popular Python library for making HTTP requests using async apis.

Ktor offers other engines for different platforms, including using OkHttp, Apache, Curl, …

It could be interesting to compare these different libraries/engines.

Internals of Ktor and AIOHTTP

In the previous post about using coroutines to read files, we saw that aiofiles uses a thread pool executor under the hood. I tried to dig into AIOHTTP and Ktor to understand if they used threads for blocking network I/O. The source code was a bit complex to digest 🤯, and I didn’t find any detailed description in the documentation about how they work under the hood. I’ll try to give an approximate (not guaranteed to be 100% correct! 🫠) explanation based on what I could figure out from debugging.

AIOHTTP:

From what I understand, threads are generally not explicitly used in the AIOHTTP code base, except for some specific tasks like DNS resolution in some cases, some compression, or some types of request payloads (but not Json or form fields, two types I tested). Instead, AIOHTTP uses the event loop provided by asyncio. In particular, AIOHTTP calls asyncio’s create_connection, registering a ResponseHandler, whose method data_received is called when data is ready. The asyncio documentation says that its default event loop on Unix systems is based on select, and on Windows is based on IOCP. The asyncio event loop, in each iteration of its loop, uses the selector api to check if there is new data to read. When data is available, it is read in the main thread, but in non-blocking mode, because the socket created in create_connection itself is created in non-blocking mode. The man page for recv explains a bit how non-blocking data is read (on Unix anyway).

Ktor:

Our Kotlin examples use the Ktor CIO Engine, on the JVM. I had a difficult time trying to understand in detail how Ktor works. With breakpoint debugging, I saw that, similarly to Python, it uses apis from the JVM for select. I could see a couple of differences compared to AIOHTTP:

  • Ktor performs the socket reading/writing on a background thread, using the I/O Dispatcher, while Python does non-blocking socket reading/writing on the main thread.
  • In Python, AIOHTTP relies on the asyncio library to call select in the main event loop, dispatching to registered ResponseHandlers. In Kotlin, from what I could see, the select is called from Ktor itself, for the given HTTP request, and not in the event loop of the Kotlin coroutines library.

Conclusion regarding the internals of these libraries: After reading and debugging their source code, I’m very happy these libraries exist and that we don’t have to create our own network I/O coroutines implementations 😅

More reading

Ktor client documentation.

AIOHTTP client documentation.

For Kotlin: Structured Concurrency for Coroutines: Unraveling the Fundamentals, by Rohit Singh

--

--