One of Prefect’s earliest and most innovative features is “task mapping”, which refers to the ability for each run of a workflow to spawn new tasks in response to the data produced by an upstream task. Task mapping allows users to build workflows that dynamically fan out to create branches capable of executing in parallel with a single line of code. Moreover, each dynamically generated task is managed by Prefect as a first class task object — they can undergo arbitrary state transitions (including retries) and even have their own page in the Prefect UI. Prefect’s unique ability to express such complex orchestration through our familiar, Pythonic API has unsurprisingly made it one of our most popular features! From simplifying for-loop code to dynamically processing tens of thousands of records, mapping has powered our users’ workflows in all sorts of interesting ways.
This post reviews Prefect mapping and highlights a recent development — depth first execution — that has unlocked the potential of mapping even further.
From the beginning, Prefect has sought to provide the building blocks necessary to make it easy for users to express unique workflows with a simple API that can deploy to execution layers of any scale — whether a single laptop or a distributed cloud environment. When your flow takes advantage of dynamic constructs like Prefect mapping or looping, you can watch a seemingly-simple, syntactically-sweet flow script spawn into a massively parallel and efficient workload as your execution layer scales up.
One of Prefect’s most powerful parallelization building blocks is mapping, an easy one-liner that spawns parallel tasks at runtime against an iterable of inputs. Multi-level maps ー when a map feeds into another map ー can result in deep parallel task trees with only a few lines of code.
Mapping is most convenient when you don’t necessarily know the “width” of your map ahead of time. Perhaps you want to run the same task code against all the files in an S3 bucket, but you don’t know how many files will be there when the flow runs. Your flow code will simply look like this:
If my bucket currently has only 3 files in it, the first task will get 3 keys, and the map on line 16 will result in 3 parallel tasks:
But if my bucket that day happens to have 7 files in it, the map will result in 7 parallel tasks:
All with just the one line of code on line 16! It is worth emphasizing that each of these tasks is a “true” Prefect task — they undergo independent state transitions (including retries), have their own triggers, and, if running against a Prefect backend, have their own dedicated task run pages.
We can take this further and keep processing each S3 file with additional discrete task steps. When we nest tasks called with
.map together, we can construct deep multi-level mapped task trees with little heavy lifting on the flow code side. Take a look at line 24 here:
With the maps chained together, the outputs from one level in the map will be passed down to the next level in the map. As you can see below, this generates a 3x7 grid on the fly during runtime given my 3 levels of map and my 7 files in my imaginary S3 bucket that day.
Mapping is great, and multi-level maps are powerful, but in prior versions of Prefect each mapped “level” would not begin until every task of the previous “level” had entered a Finished state. For heterogeneous workloads, where one task in a map level might take longer than its siblings, this would unnecessarily hold up progress down the other task trees.
Enter “DFE” — depth first execution. Not just a cute acronym, this highly-requested execution model is now automatically available on all Prefect pipelines that execute on Dask. Now, instead of the Prefect TaskRunner waiting on an entire map level’s resolution to complete before even trying to resolve the next one, it only waits long enough at the top of a multi-level map to determine how many branches there should be, and then submits each mapped child ー which could extend down as many levels as you want ー to the Dask executor as a collection of futures. You can check out the PR that refactored it all for the details, which also showcases the change with the following nice example we can walk through right now.
Here is a flow with a deterministic multi-level map. If you look at the nested call on line 20, you can see this one has two levels:
Since it has three inputs (hardcoded in line 8) and two levels, we can imagine it like a 2x3 grid:
The way this flow works is each task tree will ingest a hardcoded integer at the beginning, and pass that along down the multi-level map. Along the way, each task’s “work” is just to sleep for as many seconds as the input integer. Since the first task tree receives the integer 1, each of the tasks in its tree will only sleep for 1 second; since the third task receives the integer 10, its tasks will sleep for a whole 10 seconds each. The first task tree would complete the fastest if it was allowed to execute any downstream mapped tasks as soon as its upstream was complete.
But as you know, without DFE, the first layer of the map has to complete first before the pipeline even tries to decide what to do in the second level. In that case, you would see output like this:
sleeping for 1done sleeping for 1sleeping for 5done sleeping for 5sleeping for 10done sleeping for 10sleeping for 1done sleeping for 1...
If we visualize it with our grid, with dots representing each second ticking by, even if we have parallel execution on the first layer of the map we can’t move on to the second layer until everything in the first layer is done. Clearly the first branch of the task tree could have started sooner — in fact this entire branch would be totally done before the third task tree’s second level even started if we could start its second level immediately!
Compare that to running the same flow with DFE:
sleeping for 1sleeping for 5sleeping for 10done sleeping for 1sleeping for 1 # <--- the fastest task of the second level starts right away!done sleeping for 5sleeping for 5done sleeping for 10...
Now our shorter task trees can proceed as soon as their upstream map is complete.
DFE may sound easy — in fact, it was actually a feature of early versions of Prefect! — but generalizing it across every conceivable edge case our users encountered proved devilishly hard. Over a year of hands-on research went into designing this feature in a robust and scalable way.
Improved Dask payloads, too
DFE isn’t just great on the basis of the feature it allows on its own. Refactoring the mapping code to allow for depth first execution resulted in a quick series of performance improvements.
For example, several users in our Slack community had reported seeing warnings when running flows on Dask similar to
Large object of size __ detected in task graph, and unresponsive hanging times for their flows that followed, particularly for large maps. After some digging, it became apparent that the ease of massively parallelizing tasks using map was exacerbating the under-optimized serialization of Dask arguments by the Prefect pipeline. In one thread in our #prefect-community Slack channel, a back of the napkin analysis estimated that a reasonably large function submitted to Dask by Prefect might be around 1MiB per call, and for a large map ー such as one that parallelizes that same task 4600 times ー that means the Dask scheduler would need to serialize and transmit 4 GiB just to build the graph.
Thanks to the mapping refactor, some of this extra load in the Prefect pipeline was removed, and our beloved Dask expert @jcrist was able to immediately extend memory footprint gains to reduce the payload size Prefect sends to Dask to nearly 15x smaller! (Check out his PR here if you are interested in the details.)
What’s next for mapping: flat maps… and more
Now that DFE has been released with the 0.12.0 version of Prefect that went out last Wednesday, the Prefect Core team is looking towards other mapping feature enhancements that had been effectively blocked until this refactor was complete.
One in particular we want to pursue is the idea of a
flat_map construct to make it easier to chain together multi-level maps whose tasks return not just scalar values, but iterables of values that should be added to the next map level. Today, when a map level’s tasks return iterables instead of scalar values and pass them on to a downstream map level, the input is passed as the iterable itself — not a map of each of the values inside. An extra step is needed if you wish to “flatten” those iterables into one, so it can be consumed by a downstream mapped task to parallelize against all the inputs (and consequently fan out even further). As of now, this flattening step introduced by the user is effectively a
reduce function between the two levels of the map, which would immediately force the first level to abandon DFE and act like “breadth first execution” instead. Moreover, the
reduce has to be performed in-memory, which can be prohibitively expensive.
flat_map might allow a user to specify a map with increasing level widths, but with a simple API that doesn’t need an extra boilerplate task in between. A first-class, DFE capable implementation would still allow the expansion in the next level to proceed as fast as it can, without waiting to determine the entire width of the extended map level first. As a first step, Prefect’s flatten operators will allow users to avoid in-memory reduces by leveraging Prefect’s native integration with execution layers like Dask. (By the way, if you would like to follow along on progress or add to the discussion, the Github issue is here.)
Another example of Prefect’s innovation in mapping is the ability to use conditional and branching logic inside dynamic parallel pipelines. Prefect’s control flow operators provide a lightweight and Pythonic syntax for expressing complex logic, but incorporating it into mapped pipelines was historically difficult. As you can see in this PR, the new generation of mapping tools will allow users to take advantage of the full suite of Prefect operators without rewriting their code, even when used in a completely dynamic context.
flat_map are both critical pieces in our goal of providing the simplest, most powerful API for some of the most complex workloads being designed today. Both were directly asked for by the Prefect community, so we know that they will help make your work easier and reduce your negative engineering burden.
Please continue reaching out to us with your questions and feedback — we appreciate the opportunity to work with all of you!
- join our Slack community for ad-hoc questions
- attend our weekly meetup events for contributors, focused on the internals of Prefect
- follow us on Twitter for updates
- visit us on GitHub to open issues and pull requests
— The Prefect Team