Getting Started with Conducto

Introduction to Conducto Pipelines

Matt Jachowski
Conducto
Published in
5 min readApr 19, 2020

--

A pipeline is a sequence of commands that must be executed in a specific order. Some steps can happen concurrently, while other steps must happen one after another.

Conducto is a tool for writing, executing, visualizing, and debugging pipelines. At its most basic level, Conducto makes it trivial to chain together sequences of shell commands into pipelines using a simple python interface.

Explore the live demo, view the source code for this tutorial, or clone the demo and run it for yourself.

git clone https://github.com/conducto/demo.git
cd demo
python demo.py islands --local

Alternatively, download the zip archive here.

Boilerplate

In this introduction, we will build a simple pipeline of echo commands. First, create a empty python file (mine is called demo.py), then add this standard Conducto boilerplate code.

import conducto as co# We will add more code here.if __name__ == "__main__":
co.main()

Nodes

You can conceptualize a pipeline as a sequence of commands that happen in parallel (at the same time), or in serial (one after the other). Conducto exposes three Node classes that directly map onto these ideas: Exec, Parallel, and Serial. Note that the code below is just for illustration purposes and should not be copied into your python file.

An Exec Node is a shell command.

exec_node = co.Exec("echo hello world")

A Parallel Node holds other nodes that can be executed in parallel.

parallel_node = co.Parallel()
parallel_node["task1"] = co.Exec("echo whistle")
parallel_node["task2"] = co.Exec("echo while you work")

A Serial Node holds other nodes that must be executed in serial.

serial_node = co.Serial()
serial_node["task1"] = co.Exec("echo first do this")
serial_node["task2"] = co.Exec("echo then do that")

Pipeline Specification

Pipeline Function

A pipeline is specified in a function that returns the root node of a tree that combines Exec, Parallel, and Serial Nodes. So, let us go back to our file, and create an empty pipeline function. Here, we begin by defining a pipeline function named islands.

import conducto as codef islands() -> co.Serial:
return None
if __name__ == "__main__":
co.main()

The islands function is annotated with a type hint indicating that it will return a Serial Node. It is ok if you are not familiar with type hints. Just ensure that your pipeline function signature always ends with -> co.[NodeType].

Pipeline Definition

Now we can actually define our pipeline. We are going to define a toy pipeline that prints the nickname of each Hawaiian island, starting with the southernmost island and moving north. Islands in the same county will be grouped into either a Parallel or Serial node. In pseudocode, the pipeline should look like:

hawaii -> echo big island
maui county:
maui -> echo valley isle
lanai -> echo pineapple isle
molokai -> echo friendly isle
kahoolawe -> echo target isle
oahu -> echo gathering place
kauai county:
kauai -> echo garden isle
niihau -> echo forbidden isle

We can easily translate this into python using Node objects. Note that the choice of Parallel and Serial Nodes for maui_county and kauai_county below is arbitrary.

pipeline = co.Serial()
pipeline["hawaii"] = co.Exec("echo big island")
pipeline["maui_county"] = co.Parallel()
pipeline["maui_county"]["maui"] = co.Exec("echo valley isle")
pipeline["maui_county"]["lanai"] = co.Exec("echo pineapple isle")
pipeline["maui_county"]["molokai"] = co.Exec("echo friendly isle")
pipeline["maui_county"]["kahoolawe"] = co.Exec("echo target isle")
pipeline["oahu"] = co.Exec("echo gathering place")pipeline["kauai_county"] = co.Serial()
pipeline["kauai_county"]["kauai"] = co.Exec("echo garden isle")
pipeline["kauai_county"]["niihau"] = co.Exec("echo forbidden isle")

This is straightforward, but I believe that the pipeline structure is even clearer when we leverage python’s with statement. This code is an equivalent way to express our pipeline.

with co.Serial() as pipeline:
pipeline["hawaii"] = co.Exec("echo big island")
with co.Parallel(name="maui_county") as maui_county:
maui_county["maui"] = co.Exec("echo valley isle")
maui_county["lanai"] = co.Exec("echo pineapple isle")
maui_county["molokai"] = co.Exec("echo friendly isle")
maui_county["kahoolawe"] = co.Exec("echo target isle")
pipeline["oahu"] = co.Exec("echo gathering place") with co.Serial(name="kauai_county") as kauai_county:
kauai_county["kauai"] = co.Exec("echo garden isle")
kauai_county["niihau"] = co.Exec("echo forbidden isle")

Now, we can put this code into our islands function from before, return the root pipeline node, and we are done.

import conducto as codef islands() -> co.Serial:
with co.Serial() as pipeline:
pipeline["hawaii"] = co.Exec("echo big island")
with co.Parallel(name="maui_county") as maui_county:
maui_county["maui"] = co.Exec("echo valley isle")
maui_county["lanai"] = co.Exec("echo pineapple isle")
maui_county["molokai"] = co.Exec("echo friendly isle")
maui_county["kahoolawe"] = co.Exec("echo target isle")
pipeline["oahu"] = co.Exec("echo gathering place")
with co.Serial(name="kauai_county") as kauai_county:
kauai_county["kauai"] = co.Exec("echo garden isle")
kauai_county["niihau"] = co.Exec("echo forbidden isle")
return pipeline
if __name__ == "__main__":
co.main()

Pipeline Execution

The python file contains our full pipeline specification. Now, we can execute it. First, run the script with the --help option.

python demo.py --help

You will see a message like the one below. You can see that Conducto recognizes our pipeline function from the bolded text.

usage: demo.py [-h] <method> [< --arg1 val1 --arg2 val2 ...>]
[--cloud] [--local] [--run] [--sleep-when-done]
methods that return conducto pipelines:
islands () -> Serial
optional arguments:
-h, --help show this help message and exit
--version show conducto package version

Now, execute the script in local mode, which means that the entire pipeline will execute on your local machine. In a future release, you will also be able to execute the same script in cloud mode for immediate scale.

python demo.py islands --local

This should open a new browser window or tab to conducto.com where can see the pipeline. If this does not happen, copy the printed URL into your browser.

The left-hand side of the screen is called the pipeline pane and has a toolbar with icons at the top. Click the View button to expand the pipeline and see the pipeline tree we have created. Click the Run button to execute the pipeline.

This is the pipeline pane. Click View to expand the pipeline tree and Run to execute the pipeline.

This interactive tree representation gives you a useful visual summary of the pipeline. You can see that Exec, Parallel, and Serial Node types are indicated by unique icons.

Notice how closely the pipeline tree in the web app mirrors our python specification.

Pipeline specification and visualization mirror each other.

Finally, click on one of the Exec nodes and examine the execution details. It contains useful information like the command, duration, memory used, return code, and stdout.

Summary

Now you have written and executed a simple pipeline in Conducto. I hope you are already imagining how Conducto can enable you to easily write and execute your own pipelines.

In my previous job, the predecessor to Conducto was the secret sauce that enabled our algorithmic trading team to run an ultra-productive data science and machine learning effort that has run for a decade and driven billions of dollars in revenue. So it stands to reason that Conducto is great for data science.

But, pipelines are everywhere, and when we switched our internal CI/CD pipeline from CircleCI to Conducto, we immediately became more productive. Try Conducto for CI/CD if you do not love your current solution.

--

--