Conducto for Data Science
Easy and Powerful Python Pipelines
You can build pipelines out of commands in any language with Conducto, but we have some extra support for python that allows you to easily glue python functions together into rich and dynamic pipelines.
- Pass a python function to
co.Exec
. - Lazily define your pipeline at runtime with
co.Lazy
. - Use Markdown to display rich output in an Exec node. (This is not specific to python.)
This example does a parallel word count over a randomly generated list of words. The algorithm is simple but illustrates a common pattern in data science.
- Get the data.
- Do parallelized analysis over the data.
- Aggregate the results.
Explore our live demo, view the source code for this tutorial, or clone the demo and run it for yourself.
git clone https://github.com/conducto/demo.git
cd demo/data_science
python easy_python.py --local
Alternatively, download the zip archive here.
Pass a Python Function to co.Exec
Conducto can automatically call python functions from the shell so you do not have to build your own command-line interface. Instead of calling co.Exec
with a shell command, pass it a function and its arguments.
In this example, we want to execute this function in an Exec node.
def gen_data(path: str, count: int):
words = _get_words(count)
text = b"\n".join(words) + b"\n"
co.temp_data.puts(path, text)
So, we pass the gen_data
function and its arguments to co.Exec
.
co.Exec(gen_data, WORDLIST_PATH, count=50000)
This auto-generates the shell command below for the Exec node. Note that the conducto
executable is largely just a wrapper for python
.
conducto easy.py gen_data \
--path=conducto/demo_data/wordlist --count=50000
Requirements
Conducto needs to be able to find this function in the image that the Exec node runs. Therefore, the Exec node must run with a co.Image
that has copy_dir
, copy_url
, or path_map
set. Also:
- The image must include the file with the function.
- The function name cannot start with an underscore (_).
- The image must install conducto.
- You must set typical node parameters like
image
,env
,doc
, etc. outside of the constructor, either in a parent node or by setting the fields directly.
Function Arguments
All arguments are serialized to the command line, so only pass parameters and paths. Large amounts of data should be passed via a data store like co.temp_data
instead.
Arguments can be basic python types (int
, float
, etc.), date
/time
/datetime
, or lists thereof. Conducto infers types from the default arguments or from type hints, and deserializes accordingly.
Lazy Pipeline Definition
Data science pipelines often benefit from dynamically defining the pipeline structure based on the properties of data that only become evident as you being analyzing it. For example, you may not know the size of your data until you download it, which determines how you want to chunk your parallel analysis for maximum efficiency.
Conducto empowers you to lazily define your pipeline such that new nodes can be defined as the pipeline runs. Simply write a function that returns a Parallel or Serial node that represents a new subtree to add to the pipeline, and call it with co.Lazy
.
The parallel_word_count
node defines a pipeline to chunk and analyze the input data in parallel. This is the parallel_word_count
function declaration. Importantly, it is type-hinted to return a Parallel node.
def parallelize(
wordlist_path, result_dir, top: int, chunksize: int
) -> co.Parallel:
The lazy node is generated by assigning the node to the result of co.Lazy
:
output["parallel_word_count"] = co.Lazy(
parallelize, WORDLIST_PATH, RESULT_DIR, top=15, chunksize=1000
)
co.Lazy
produces two nodes inside the parallel_word_count
Serial node.
The first Generate
node is an Exec node that calls the parallelize
funcion and prints out the pipeline that it returns. This is the command it runs:
conducto easy.py parallelize \
--wordlist_path=conducto/demo_data/wordlist \
--result_dir=conducto/demo_data/results \
--top=15 --chunksize=1000
Once the Generate
node finishes and returns its new pipeline subtree, the subtree is deserialized into an Execute
node, which then runs.
Requirements
co.Lazy
has all the same limitations as co.Exec(func)
that you saw above. Additionally, the function must be type hinted to return a Parallel or Serial node, as in def func() -> co.Parallel
.
When to use it
The demo pipeline uses co.Lazy
to dynamically parallelize over input data, but there are many other common uses:
- Processing streaming data in batches: When processing a new batch, use
co.Lazy
to filter out data that has already been processed, and only generate nodes for new data. Use the same logic to backfill data. - Relational mapping: To join relational data, simply use a for loop. When joining datasets
A
andB
, iterate overA
at runtime and create Exec nodes that run in parallel. Each node looks up the rows inB
that correspond to itsA
value. You have full control over the parallelism and can debug any failed or incorrect mappings. - Time-consuming pipeline generation logic: Sometimes, even figuring out the work to do can take a while. Use
co.Lazy
to parallelize pipeline creation and get it out of the critical path.
These uses can arise multiple times in the same pipeline. co.Lazy
is fully nestable, so you can handle them all and lazily generate as sophisticated a pipeline as you need.
Markdown to Display Rich Output
The goal of data science pipelines is often to produce human-understandable results. While you are always free to send data to external visualization tools, Conducto supports using Markdown to display tables, links, and graphs in your node’s output. Note that this is not specific to python and can be used by any commands.
Simply print <ConductoMarkdown>...</ConductoMarkdown>
in your stdout/stderr, and Conducto will render the Markdown between the tags.
The summarize
node in the demo summarizes the results of the parallel_word_count
step using a graph and a table. This is the relevant output code from the summarize
function.
print("<ConductoMarkdown>")
print(f"![img]({url})")
print()
print("rank | word | count")
print("-----|------|------")
for rank, (word, count) in enumerate(summary.most_common(top), 1):
print(f"#{rank} | {word} | {count}")
print("</ConductoMarkdown>")
And this is the output as rendered in the node pane.