Conducto for Data Science

Your First Data Science Pipeline

Matt Jachowski
Conducto
Published in
5 min readApr 16, 2020

--

In this tutorial, you will learn how to define, execute, and interact with a simple Conducto pipeline.

Upon completion, you will understand how to use the following minimal API.

Explore our live demo, view the source code for this tutorial, or clone the demo and run it for yourself.

git clone https://github.com/conducto/demo.git
cd demo/data_science
python first_pipeline.py --local

Alternatively, download the zip archive here.

Define Your Pipeline

In Conducto, you express your pipeline as a series of commands that need to be executed in serial and/or parallel. Our python API exposes a minimal set of Node classes to get this done quickly and painlessly. Then, you have the full power of python to nest these nodes for arbitrarily complex pipelines.

First, you need to import conducto.

import conducto as co

Then, you start building your pipeline with nodes.

Exec Node

An exec node simply wraps a shell command.

plot = co.Exec("python plot.py --dataset heating")

Serial Node

A serial node specifies that a series of sub-nodes must happen in one after another. If one of the sub-nodes fails, execution stops and the entire serial node is marked as failed.

steps = co.Serial()
steps["download"] = co.Exec(download_command)
steps["plot"] = co.Exec("python plot.py --dataset heating")

Note that the definition of download_command is omitted for clarity. See the source code in the demo for the full details.

Parallel Node

A parallel node specifies that a series of sub-nodes can occur in parallel. All nodes are executed, and if any nodes fail, the entire parallel node is marked as failed.

plot = co.Parallel()
plot["heating"] = co.Exec("python plot.py --dataset heating")
plot["cooling"] = co.Exec("python plot.py --dataset cooling")

Nesting

Serial and parallel nodes may contain any node type, not just exec nodes. This allows the creation of non-trivial pipelines.

pl = co.Serial()
pl["download"] = co.Exec(download_command)
pl["plot"] = co.Serial()
pl["plot"]["heating"] = co.Exec("python plot.py --dataset heating")
pl["plot"]["cooling"] = co.Exec("python plot.py --dataset cooling")

Easy to do, but perhaps more verbose than you prefer. We can use python to make it nicer.

with co.Serial() as pl:
co.Exec(download_command, name="download")
with co.Serial(name="plot"):
co.Exec("python plot.py --dataset heating", name="heating")
co.Exec("python plot.py --dataset cooling", name="cooling")

Image

Of course, your commands will only be able to run in an execution environment with:

  • your software dependencies installed,
  • a copy of your own code present, and
  • any necessary environment variables set

Conducto achieves this by running each of your exec commands inside of a docker container, which is defined by an image that you help to configure. Read full details in the Execution Environment and Environment Variables and Secrets tutorials. But for now, we will skip over these details, and just provide an appropriate image for our example. This particular image includes python and some packages to manipulate data, and copies over your local ./code directory. Note that the . is relative to the location of the pipeline script.

dockerfile = "./docker/Dockerfile.first"
image = co.Image(dockerfile=dockerfile, copy_dir="./code")
with co.Serial(image=image) as pipeline:
# ...

Main

Now that you have a pipeline specified, make it executable. First, wrap your pipeline in a function that returns the top-level node.

def download and plot() -> co.Serial:
dockerfile = "./docker/Dockerfile.first"
image = co.Image(dockerfile=dockerfile, copy_dir="./code")
with co.Serial(image=image) as pipeline:
co.Exec(download_command, name="download")
with co.Parallel(name="plot"):
# ...
return pipeline

Conducto requires that you write a type hint to indicate the node return type of the function. Do not worry if type hints are new to you. Simply ensure that the first line of your function includes -> co.[NodeClass], like this:

def download_and_plot() -> co.Serial:

Finally, define the main function of your python script.

def download_and_plot() -> co.Serial:
dockerfile = "./docker/Dockerfile.first"
image = co.Image(dockerfile=dockerfile, copy_dir="./code")
with co.Serial(image=image) as pipeline:
co.Exec(download_command, name="download")
with co.Parallel(name="plot"):
# ...
return pipeline
if __name__ == "__main__":
co.main(default=download_and_plot)

Execute Your Pipeline

Executing your pipeline is easy. First, if you want to spot-check your pipeline, run your script with no arguments.

python first_pipeline.py

You will see a pipeline serialization like this.

/
├─0 download set -ex\ncurl http://...
└─1 plot
├─ heating python plot.py --dataset heating
└─ cooling python plot.py --dataset cooling

To execute the pipeline on your local machine, which is always free, run this. Note that in local mode, your code never leaves your machine.

python first_pipeline.py --local

Coming soon, you will be able to effortlessly run the same pipeline in the cloud too.

python first_pipeline.py --cloud

Interact With Your Pipeline

The script will print a URL and pop it open in your browser. You can view your pipeline,

The pipeline summary is the row at the top, the pipeline pane is on the left, and the node pane is on the right. The pipeline pane shows your pipeline, with parallel, serial, and exec nodes getting unique icons.

run it and quickly identify pipeline status,

Press the Run button in the upper left of the pipeline pane. See the execution status of each node: Pending, Queued, Running, Done, Errored, and Killed.

examine the output of any exec node,

View the command, execution params, stdout, and stderr of a node in the right hand node pane. Stdout can even include plots!

and rapidly and painlessly debug errors. Collaborate with anyone else in your org by sharing the URL.

Put your pipeline to sleep when you are finished with it. Its state, logs, and data are stored for 7 days. During this period you can wake it up. After 7 days, it is deleted.

The “zzz” icon in the pipeline summary puts the pipeline to sleep. When no pipelines are selected you see a list of available ones. Click the “alarm clock” button on a sleeping pipeline to get a wakeup command to run into a local shell.

How Much More Data Do You Need?

This was a simple example, but once you add in Environment Variables and Secrets, Data Stores, Node Parameters, and Easy and Powerful Python Pipelines, you can easily express the most complex of data science pipelines in Conducto.

In my previous job, the predecessor to Conducto was the secret sauce that enabled our algorithmic trading team to run an ultra-productive data science and machine learning effort that has driven billions of dollars in revenue for a decade. Simply put, Conducto multiplied the impact of each team member by a lot.

How much more data do you need? Get started with Conducto now. Local mode is always free and is only limited by the CPU and memory on your machine. Cloud mode gives you immediate scale. Use the full power of python to write pipelines with ease. And, experience painless debugging and easy error resolution.

--

--