Conducto for Data Science
Weather Data Pipeline
This tutorial teaches how to make a simple Conducto pipeline to acquire raw data and visualize it.
Upon completion, you will understand how to use the following minimal components of Conducto.
co.Exec
,co.Serial
, andco.Parallel
node classes,co.main()
to make your pipeline executable,co.data
as an object store, and- Markdown to display data prettily
View the source code, or clone the demo and run it for yourself.
git clone https://github.com/conducto/demo.git
cd demo/data_science/examples
python weather_data.py --local
Alternatively, download the zip archive here.
Define Your Pipeline
The task is to visualize some data provided by the United States Energy Information Administration.
To start, create an empty python file, let’s call it data_science.py
, and import conducto.
import conducto as co
Step 1: Download data with a shell command
We will break down the problem into small steps. Ultimately, each command we want to run will be a Conducto Exec node that runs a shell command.
The first step is to download a dataset and save it to a data store. The shell command is a two-parter: part 1 uses curl
to download the file; part 2 unzips and saves it to co.data.user, Conducto’s object store that is backed by local disk and scoped to your user.
cmd = f"""
curl http://api.eia.gov/bulk/STEO.zip > steo.zip
unzip -cq steo.zip | conducto-data-user puts --name {USER_DATA_PATH}
"""
We wrap this command in an Exec node.
download_node = co.Exec(cmd)
Step 2: Visualize with a Python function
The second step loads the data, applies some transformations, and plots the results. Make a Python function to do this — don’t worry about the details, we’ll get to them later — and put co.main
at the end of your script:
def display(dataset):
# ...if __name__ == "__main__":
co.main()
co.main
allows you to call any of your file’s functions from the command line, like this.
$ python data_science.py display --dataset='Heating Degree Days'
Put it together with Parallel and Serial nodes
A Parallel node has children that it runs at the same time. Use this to run three visualization steps, each of which is an Exec node, in parallel.
display_node = co.Parallel()
for dataset in DATASETS:
display_node[dataset] = co.Exec(
f"python data_science.py --dataset='{dataset}'"
)
A Serial node has children that run one after the other, stopping on the first error. Use this to first run the download step and then the three visualization steps.
with co.Serial(image=IMG) as output:
output["Download"] = download_node
output["Display"] = display_node
You saw earlier how co.main
let you call methods from the command line. It also has special handling for methods that return a pipeline. To leverage this, wrap your pipeline in a function that returns the top-level node.
def run() -> co.Serial:
with co.Serial(image=IMG) as output:
# ...
return output
Conducto requires that you write a type hint to indicate the node return type of the function. Do not worry if type hints are new to you. Simply ensure that the first line of your function includes -> co.[NodeClass]
, like this:
def run() -> co.Serial:
Execute Your Pipeline
Executing your pipeline is easy. Call run
(the name of your function above) from the command line with no additional arguments to pretty print your pipeline as a sanity check.
$ python data_science.py run
/
├─0 Download curl http://api.eia.gov/...
└─1 Display
├─ Heating python data_science.py display --dataset='Heating'
├─ Cooling python data_science.py display --dataset='Cooling'
└─ Electric python data_science.py display --dataset='Electric'
To execute the pipeline on your local machine add --local
. Local mode is always free and your code and data never leave your machine. Run this and your browser will open and connect to the newly running pipeline.
python data_science.py --local
Coming soon, you will be able to effortlessly run the same pipeline in the cloud too.
python data_science.py --cloud
Interact With Your Pipeline
The script will automatically open the URL printed by the script in your browser. You can view and run your pipeline,
Implement the Python Function
In step 2 we defined a Python function without implementing it. Let’s dig deeper to learn how to interact with the Conducto object store and display pretty results in the Conducto UI.
The Python function loads text from co.data.user, extracts a few datasets, and plots them. Consult the demo for the full implementation, which uses pandas
, numpy
, and matplotlib
, but here is a summary.
Object Store
First, we read in the data from co.data.user and subset down to a few datasets.
data_text = co.data.user.gets(USER_DATA_PATH)
all_data = [json.loads(line) for line in data_text.splitlines()]
The datasets are time series. We use numpy to take the per-month average, and combine them into a single pandas DataFrame.
We graph the averages on a single plot using matplotlib and save it to local disk.
Then, we upload the image to co.data.pipeline, Conducto’s object store that is backed by local disk and scoped to the current pipeline. The Exec node runs in a Docker container and any local changes disappear when it finishes, so saving the image to co.data.pipeline allows us to view it from the app.
dataname = f"conducto/demo/data_science/{dataset}.png"
co.data.pipeline.put(dataname, filename)
Markdown
Finally, we use markdown to display the image and the DataFrame. Get a link to the image using co.data.pipeline.url()
, and use the DataFrame’s .to_markdown()
to render it. By default, Exec node stdout and stderr display raw text, so wrap this in <ConductoMarkdown>
tags for pretty formatting.
print(f"""
<ConductoMarkdown>
![img]({co.data.pipeline.url(dataname)})
{df.to_markdown()}
</ConductoMarkdown>
""")
How Much More Data Do You Need?
This was a simple example, but once you add in Execution Environment, Environment Variables and Secrets, Data Stores, Node Parameters, and Easy and Powerful Python Pipelines, you can easily express the most complex of data science pipelines in Conducto.
In my previous job, the predecessor to Conducto was the secret sauce that enabled our algorithmic trading team to run an ultra-productive data science and machine learning effort that has driven billions of dollars in revenue for a decade. Simply put, Conducto multiplied the impact of each team member by a lot.
How much more data do you need? Get started with Conducto now. Local mode is always free and is only limited by the cpu and memory on your machine. Cloud mode gives you immediate scale. Use the full power of python to write pipelines with ease. And, experience easy and painless error resolution.
Appendix: Full Code
See the full code for this tutorial on GitHub, and here’s a simplified version:
import collections, conducto as co, json, re
USER_DATA_PATH = "conducto/demo/data_science/steo.txt"DOWNLOAD_COMMAND = f"""
curl http://api.eia.gov/bulk/STEO.zip > steo.zip
unzip -cq steo.zip | conducto-data-user puts --name {USER_DATA_PATH}
""".strip()
DATASETS = {
"Heating Degree Days" : r"^STEO.ZWHD_[^_]*\.M$",
"Cooling Degree Days" : r"^STEO.ZWCD_[^_]*.M$",
"Electricity Generation": r"^STEO.NGEPGEN_[^_]*\.M$"
}# co.Image is explained in a later article.
IMG = co.Image("python:3.8", copy_dir=".", reqs_py=["conducto", "pandas", "matplotlib", "tabulate"])
def run() -> co.Serial:
"""
Pipeline that downloads data from the US EIA and visualizes it.
"""
with co.Serial(image=IMG) as output:
# First download some data from the US EIA.
output["Download"] = co.Exec(DOWNLOAD_COMMAND)
# Then make a few different visualizations of it.
output["Display"] = co.Parallel()
for dataset in DATASETS.keys():
output["Display"][dataset] = co.Exec(
"python data_science.py display "
f"--dataset='{dataset}'"
)
return output
def display(dataset):
"""
Read in the downloaded data, extract the specified
datasets, and plot them.
"""
data_text = co.data.user.gets(USER_DATA_PATH)
all_data = [json.loads(line) for line in data_text.splitlines()]
regex = DATASETS[dataset]
subset_data = [d for d in all_data if "series_id" in d
and re.search(regex, d["series_id"])]
import matplotlib.pyplot as plt
# ... # Save to disk, and then to co.data.pipeline
filename = "/tmp/image.png"
dataname = f"conducto/demo/data_science/{dataset}.png"
plt.savefig(filename)
co.data.pipeline.put(dataname, filename)
# Print out results as markdown
print(f"""
<ConductoMarkdown>
![img]({co.data.pipeline.url(dataname)})
{df.transpose().round(2).to_markdown()}
</ConductoMarkdown>
""")
if __name__ == "__main__":
co.main()