Conducto for Data Science

Weather Data Pipeline

Jonathan Marcus
Conducto
Published in
6 min readApr 9, 2020

--

This tutorial teaches how to make a simple Conducto pipeline to acquire raw data and visualize it.

Upon completion, you will understand how to use the following minimal components of Conducto.

View the source code, or clone the demo and run it for yourself.

git clone https://github.com/conducto/demo.git
cd demo/data_science/examples
python weather_data.py --local

Alternatively, download the zip archive here.

Define Your Pipeline

The task is to visualize some data provided by the United States Energy Information Administration.

To start, create an empty python file, let’s call it data_science.py, and import conducto.

import conducto as co

Step 1: Download data with a shell command

We will break down the problem into small steps. Ultimately, each command we want to run will be a Conducto Exec node that runs a shell command.

The first step is to download a dataset and save it to a data store. The shell command is a two-parter: part 1 uses curl to download the file; part 2 unzips and saves it to co.data.user, Conducto’s object store that is backed by local disk and scoped to your user.

cmd = f"""
curl http://api.eia.gov/bulk/STEO.zip > steo.zip
unzip -cq steo.zip | conducto-data-user puts --name {USER_DATA_PATH}
"""

We wrap this command in an Exec node.

download_node = co.Exec(cmd)

Step 2: Visualize with a Python function

The second step loads the data, applies some transformations, and plots the results. Make a Python function to do this — don’t worry about the details, we’ll get to them later — and put co.main at the end of your script:

def display(dataset):
# ...
if __name__ == "__main__":
co.main()

co.main allows you to call any of your file’s functions from the command line, like this.

$ python data_science.py display --dataset='Heating Degree Days'

Put it together with Parallel and Serial nodes

A Parallel node has children that it runs at the same time. Use this to run three visualization steps, each of which is an Exec node, in parallel.

display_node = co.Parallel()
for dataset in DATASETS:
display_node[dataset] = co.Exec(
f"python data_science.py --dataset='{dataset}'"
)

A Serial node has children that run one after the other, stopping on the first error. Use this to first run the download step and then the three visualization steps.

with co.Serial(image=IMG) as output:
output["Download"] = download_node
output["Display"] = display_node

You saw earlier how co.main let you call methods from the command line. It also has special handling for methods that return a pipeline. To leverage this, wrap your pipeline in a function that returns the top-level node.

def run() -> co.Serial:
with co.Serial(image=IMG) as output:
# ...
return output

Conducto requires that you write a type hint to indicate the node return type of the function. Do not worry if type hints are new to you. Simply ensure that the first line of your function includes -> co.[NodeClass], like this:

def run() -> co.Serial:

Execute Your Pipeline

Executing your pipeline is easy. Call run (the name of your function above) from the command line with no additional arguments to pretty print your pipeline as a sanity check.

$ python data_science.py run
/
├─0 Download curl http://api.eia.gov/...
└─1 Display
├─ Heating python data_science.py display --dataset='Heating'
├─ Cooling python data_science.py display --dataset='Cooling'
└─ Electric python data_science.py display --dataset='Electric'

To execute the pipeline on your local machine add --local. Local mode is always free and your code and data never leave your machine. Run this and your browser will open and connect to the newly running pipeline.

python data_science.py --local

Coming soon, you will be able to effortlessly run the same pipeline in the cloud too.

python data_science.py --cloud

Interact With Your Pipeline

The script will automatically open the URL printed by the script in your browser. You can view and run your pipeline,

Explore the nodes in the left-hand pipeline pane. Your pipeline begins in the Pending state, so click the Run button (boxed in red) to begin executing the nodes.
View output for any Exec node in the right-hand node pane.

Implement the Python Function

In step 2 we defined a Python function without implementing it. Let’s dig deeper to learn how to interact with the Conducto object store and display pretty results in the Conducto UI.

The Python function loads text from co.data.user, extracts a few datasets, and plots them. Consult the demo for the full implementation, which uses pandas, numpy, and matplotlib, but here is a summary.

Object Store

First, we read in the data from co.data.user and subset down to a few datasets.

data_text = co.data.user.gets(USER_DATA_PATH)
all_data = [json.loads(line) for line in data_text.splitlines()]

The datasets are time series. We use numpy to take the per-month average, and combine them into a single pandas DataFrame.

We graph the averages on a single plot using matplotlib and save it to local disk.

Then, we upload the image to co.data.pipeline, Conducto’s object store that is backed by local disk and scoped to the current pipeline. The Exec node runs in a Docker container and any local changes disappear when it finishes, so saving the image to co.data.pipeline allows us to view it from the app.

dataname = f"conducto/demo/data_science/{dataset}.png"
co.data.pipeline.put
(dataname, filename)

Markdown

Finally, we use markdown to display the image and the DataFrame. Get a link to the image using co.data.pipeline.url(), and use the DataFrame’s .to_markdown() to render it. By default, Exec node stdout and stderr display raw text, so wrap this in <ConductoMarkdown> tags for pretty formatting.

print(f"""
<ConductoMarkdown>
![img]({co.data.pipeline.url(dataname)})

{df.to_markdown()}
</ConductoMarkdown>
""")
Voila. A graph and a data table, all within the Conducto UI.

How Much More Data Do You Need?

This was a simple example, but once you add in Execution Environment, Environment Variables and Secrets, Data Stores, Node Parameters, and Easy and Powerful Python Pipelines, you can easily express the most complex of data science pipelines in Conducto.

In my previous job, the predecessor to Conducto was the secret sauce that enabled our algorithmic trading team to run an ultra-productive data science and machine learning effort that has driven billions of dollars in revenue for a decade. Simply put, Conducto multiplied the impact of each team member by a lot.

How much more data do you need? Get started with Conducto now. Local mode is always free and is only limited by the cpu and memory on your machine. Cloud mode gives you immediate scale. Use the full power of python to write pipelines with ease. And, experience easy and painless error resolution.

Appendix: Full Code

See the full code for this tutorial on GitHub, and here’s a simplified version:

import collections, conducto as co, json, re

USER_DATA_PATH = "conducto/demo/data_science/steo.txt"
DOWNLOAD_COMMAND = f"""
curl http://api.eia.gov/bulk/STEO.zip > steo.zip
unzip -cq steo.zip | conducto-data-user puts --name {USER_DATA_PATH}
""".strip()

DATASETS = {
"Heating Degree Days" : r"^STEO.ZWHD_[^_]*\.M$",
"Cooling Degree Days" : r"^STEO.ZWCD_[^_]*.M$",
"Electricity Generation": r"^STEO.NGEPGEN_[^_]*\.M$"
}
# co.Image is explained in a later article.
IMG = co.Image("python:3.8", copy_dir=".", reqs_py=["conducto", "pandas", "matplotlib", "tabulate"])


def run() -> co.Serial:
"""
Pipeline that downloads data from the US EIA and visualizes it.
"""
with co.Serial(image=IMG) as output:
# First download some data from the US EIA.
output["Download"] = co.Exec(DOWNLOAD_COMMAND)

# Then make a few different visualizations of it.
output["Display"] = co.Parallel()
for dataset in DATASETS.keys():
output["Display"][dataset] = co.Exec(
"python data_science.py display "
f"--dataset='{dataset}'"
)
return output


def display(dataset):
"""
Read in the downloaded data, extract the specified
datasets, and plot them.
"""
data_text = co.data.user.gets(USER_DATA_PATH)
all_data = [json.loads(line) for line in data_text.splitlines()]

regex = DATASETS[dataset]
subset_data = [d for d in all_data if "series_id" in d
and re.search(regex, d["series_id"])]

import matplotlib.pyplot as plt
# ...
# Save to disk, and then to co.data.pipeline
filename = "/tmp/image.png"
dataname = f"conducto/demo/data_science/{dataset}.png"
plt.savefig(filename)
co.data.pipeline.put(dataname, filename)

# Print out results as markdown
print(f"""
<ConductoMarkdown>
![img]({co.data.pipeline.url(dataname)})

{df.transpose().round(2).to_markdown()}
</ConductoMarkdown>
""")


if __name__ == "__main__":
co.main()

--

--

Jonathan Marcus
Conducto

CEO and co-founder at Conducto. Former quant developer @JumpTrading. Likes board games, data science, and HPC infrastructure.