Conducto for Data Science

Easy and Powerful Python Pipelines

Matt Jachowski
Conducto
Published in
5 min readApr 21, 2020

--

You can build pipelines out of commands in any language with Conducto, but we have some extra support for python that allows you to easily glue python functions together into rich and dynamic pipelines.

This example does a parallel word count over a randomly generated list of words. The algorithm is simple but illustrates a common pattern in data science.

  1. Get the data.
  2. Do parallelized analysis over the data.
  3. Aggregate the results.

Explore our live demo, view the source code for this tutorial, or clone the demo and run it for yourself.

git clone https://github.com/conducto/demo.git
cd demo/data_science
python easy_python.py --local

Alternatively, download the zip archive here.

Pass a Python Function to co.Exec

Conducto can automatically call python functions from the shell so you do not have to build your own command-line interface. Instead of calling co.Exec with a shell command, pass it a function and its arguments.

In this example, we want to execute this function in an Exec node.

def gen_data(path: str, count: int):
words = _get_words(count)
text = b"\n".join(words) + b"\n"
co.temp_data.puts(path, text)

So, we pass the gen_data function and its arguments to co.Exec.

co.Exec(gen_data, WORDLIST_PATH, count=50000)

This auto-generates the shell command below for the Exec node. Note that the conducto executable is largely just a wrapper for python.

conducto easy.py gen_data \
--path=conducto/demo_data/wordlist --count=50000

Requirements

Conducto needs to be able to find this function in the image that the Exec node runs. Therefore, the Exec node must run with a co.Image that has copy_dir, copy_url, or path_map set. Also:

  • The image must include the file with the function.
  • The function name cannot start with an underscore (_).
  • The image must install conducto.
  • You must set typical node parameters like image, env, doc, etc. outside of the constructor, either in a parent node or by setting the fields directly.

Function Arguments

All arguments are serialized to the command line, so only pass parameters and paths. Large amounts of data should be passed via a data store like co.temp_data instead.

Arguments can be basic python types (int, float, etc.), date/time/datetime, or lists thereof. Conducto infers types from the default arguments or from type hints, and deserializes accordingly.

Lazy Pipeline Definition

Data science pipelines often benefit from dynamically defining the pipeline structure based on the properties of data that only become evident as you being analyzing it. For example, you may not know the size of your data until you download it, which determines how you want to chunk your parallel analysis for maximum efficiency.

Conducto empowers you to lazily define your pipeline such that new nodes can be defined as the pipeline runs. Simply write a function that returns a Parallel or Serial node that represents a new subtree to add to the pipeline, and call it with co.Lazy.

The parallel_word_count node defines a pipeline to chunk and analyze the input data in parallel. This is the parallel_word_count function declaration. Importantly, it is type-hinted to return a Parallel node.

def parallelize(
wordlist_path, result_dir, top: int, chunksize: int
) -> co.Parallel:

The lazy node is generated by assigning the node to the result of co.Lazy:

output["parallel_word_count"] = co.Lazy(
parallelize, WORDLIST_PATH, RESULT_DIR, top=15, chunksize=1000
)

co.Lazy produces two nodes inside the parallel_word_count Serial node.

The Generate and Execute nodes are auto-generated by co.Lazy. Note that the Execute node is an empty parallel node, because the Generate node that populates it has not run yet.

The first Generate node is an Exec node that calls the parallelize funcion and prints out the pipeline that it returns. This is the command it runs:

conducto easy.py parallelize \
--wordlist_path=conducto/demo_data/wordlist \
--result_dir=conducto/demo_data/results \
--top=15 --chunksize=1000

Once the Generate node finishes and returns its new pipeline subtree, the subtree is deserialized into an Execute node, which then runs.

The output of the Generate node is the pipeline definition for the Execute node, which can then run.

Requirements

co.Lazy has all the same limitations as co.Exec(func) that you saw above. Additionally, the function must be type hinted to return a Parallel or Serial node, as in def func() -> co.Parallel.

When to use it

The demo pipeline uses co.Lazy to dynamically parallelize over input data, but there are many other common uses:

  • Processing streaming data in batches: When processing a new batch, use co.Lazy to filter out data that has already been processed, and only generate nodes for new data. Use the same logic to backfill data.
  • Relational mapping: To join relational data, simply use a for loop. When joining datasets A and B, iterate over A at runtime and create Exec nodes that run in parallel. Each node looks up the rows in B that correspond to its A value. You have full control over the parallelism and can debug any failed or incorrect mappings.
  • Time-consuming pipeline generation logic: Sometimes, even figuring out the work to do can take a while. Use co.Lazy to parallelize pipeline creation and get it out of the critical path.

These uses can arise multiple times in the same pipeline. co.Lazy is fully nestable, so you can handle them all and lazily generate as sophisticated a pipeline as you need.

Markdown to Display Rich Output

The goal of data science pipelines is often to produce human-understandable results. While you are always free to send data to external visualization tools, Conducto supports using Markdown to display tables, links, and graphs in your node’s output. Note that this is not specific to python and can be used by any commands.

Simply print <ConductoMarkdown>...</ConductoMarkdown> in your stdout/stderr, and Conducto will render the Markdown between the tags.

The summarize node in the demo summarizes the results of the parallel_word_count step using a graph and a table. This is the relevant output code from the summarize function.

print("<ConductoMarkdown>")
print(f"![img]({url})")
print()
print("rank | word | count")
print("-----|------|------")
for rank, (word, count) in enumerate(summary.most_common(top), 1):
print(f"#{rank} | {word} | {count}")
print("</ConductoMarkdown>")

And this is the output as rendered in the node pane.

Show a graph and a table in stdout using Markdown.

That’s it! By now you should know how to construct some powerful data science pipelines with Conducto. If you think you missed anything, check out our recommended reading list here.

--

--