Conducto for Data Science

Data Stores

Matt Jachowski
Conducto
Published in
5 min readApr 20, 2020

--

Data science pipelines necessarily generate data, plots, or intermediate results that need to be stored for some amount of time. You cannot simply persist these files on the local filesystem, because each command runs in a container with it’s own filesystem that disappears when the container exits. And, in cloud mode, containers run on different machines, so there is no shared filesystem to mount. So, Conducto supports a few different approaches that work in a containerized world.

Explore our live demo, view the source code for this tutorial, or clone the demo and run it for yourself.

git clone https://github.com/conducto/demo.git
cd demo/data_science
python data_stores.py --local

Alternatively, download the zip archive here.

Your Own Data Store

There are many standard ways to store persistent data: databases, AWS S3, and in-memory caches like redis, just to name a few. An exec node can run any shell command, so it is easy to use any of these approaches. Here is a trivial example that sets AWS credentials and writes to S3 with the AWS CLI.

image = co.Image("python:3.8-alpine", reqs_py=["awscli"]
env = {
"AWS_ACCESS_KEY_ID": "my_access_key_id",
"AWS_SECRET_ACCESS_KEY": "my_secret_key"
}
s3_command = "aws s3 cp my_file s3://my_s3_bucket/"
s3_exec_node = co.Exec(s3_command, image=image, env=env)

Note that in a real pipeline, you would want to store your AWS credentials as secrets.

Use co.data.pipeline / conducto-data-pipeline

co.data.pipeline is a pipeline-local key-value store. This data is only visible to your pipeline and persists until your pipeline is deleted. It is useful for writing data in one pipeline step, and reading it in another. In local mode, pipeline data lives on your local filesystem. In cloud mode, pipeline data lives in AWS S3.

co.data.pipeline has both a python interface and command line interface as conducto-data-pipeline. Here is the condensed interface. Our demo prints the command line usage to show the full interface.

usage: conducto-data-pipeline [-h] <method> [< --arg1 val1 --arg2 val2 ...>]

methods:
delete (name)
exists (name)
get (name, file)
gets (name, byte_range:List[int]=None)
list (prefix)
put (name, file)
puts (name)
url (name)
cache-exists (name, checksum)
clear-cache (name, checksum=None)
save-cache (name, checksum, save_dir)
restore-cache (name, checksum, restore_dir)

One useful application is performing and summarizing a parameter search. In this example, we try different parameterizations of an algorithm in parallel. Each one stores its results using co.data.pipeline.puts(). Once all of the parallel tasks are done, it reads the results using co.data.pipeline.gets() and prints a summary.

Here is the pipeline specification. Each pipeline node is bolded for clarity.

# Location to store data.
data_dir = "demo/data_science/pipeline_data"
# Image installs python, R, and conducto.
output = co.Serial(image=image)
# Parameter search over 3 parameters in nested for loops.
output["parameter_search"] = ps = co.Parallel()
for window in [25, 50, 100]:
ps[f"window={window}"] = w = co.Parallel()
for mean in [.05, .08, .11]:
w[f"mean={mean}"] = m = co.Parallel()
for volatility in [.1, .125, .15, .2]:
m[f"volatility={volatility}"] = co.Exec(
f"python temp_data.py --window={window} "
f"--mean={mean} --volatility={volatility} "
f"--data-dir={data_dir}"
)
# Summarize parameter search results.
output["summarize"] = co.Exec(f"Rscript temp_data.R {data_dir}")

This results in the following pipeline, where I have drilled down to an arbitrary step of the parameter search.

View of the pipeline pane for the parameter search example pipeline.

Any Exec node shows the command being run for a single step of the parameter search.

The node pane shows the command being run for a single step of the parameter search.

The script being run for each step of the parameter search is temp_data.py and can be viewed here. In particular, this is the code it uses to store results to co.data.pipeline.

# Save result to Conducto's pipeline data store
path = "{}/mn={:.2f}_vol={:.2f}_win={:03}".format(
data_dir, mean, volatility, window
)
data = json.dumps(output).encode()
co.data.pipeline.puts(path, data)

In contrast, the summarize steps runs temp_data.R, which can be viewed here, and uses the the command line interface conducto-data-pipeline.

# Use `conducto-data-pipeline list` command to get all the files.
cmd = sprintf("conducto-data-pipeline list --prefix=%s", argv$dir)
files = fromJSON(system(cmd, intern=TRUE))
names(files) <- gsub(".*/", "", files)
datas = lapply(files, function(f) {
# Call `conducto-data-pipeline gets` to get an individual dataset.
cmd = sprintf("conducto-data-pipeline gets --name=%s", f)
fromJSON(system(cmd, intern=TRUE))
})

Use co.data.user / conducto-data-user

co.data.user is a user-scoped persistent key-value store. This is just like co.data.pipeline, but data is visible in all pipelines and persists beyond the lifetime of your pipeline. You are responsible for manually clearing your data when you no longer need it. In local mode, user data lives on your local filesystem. In cloud mode, user data lives in AWS S3.

co.data.user has both a python interface and command line interface as conducto-data-user. Here is the condensed interface. Our demo prints the command line usage to show the full interface.

usage: conducto-data-user [-h] <method> [< --arg1 val1 --arg2 val2 ...>]

methods:
delete (name)
exists (name)
get (name, file)
gets (name, byte_range:List[int]=None)
list (prefix)
put (name, file)
puts (name)
url (name)
cache-exists (name, checksum)
clear-cache (name, checksum=None)
save-cache (name, checksum, save_dir)
restore-cache (name, checksum, restore_dir)

One useful application in data science is storing downloaded data. In this example, we download data from the Bitcoin blockchain. This can be time-consuming, so we want to avoid downloading the same data twice. By storing the data in co.data.user, we pull it once and persist it across pipelines.

# Image installs python and conducto.
with co.Serial(image=image) as out:
out["download_20-11"] = \
co.Exec("python btc.py download --start=-20 --end=-11")
out["download_15-6"] = \
co.Exec("python btc.py download --start=-15 --end=-6")
out["download_10-now"] = \
co.Exec("python btc.py download --start=-10 --end=-1")

Notice that this example contains three “download” nodes with overlapping ranges. They each download their range and skip any blocks that are already downloaded.

The code using co.data.user is in btc.py, which you can view here. This is a relevant section of the download function, with co.data.user usage bolded.

for height in range(start, end + 1):
path = f"conducto/demo/btc/height={height}"
# Check if `co.data.user` already has this block.
if co.data.user.exists(path):
print(f"Data already exists for block at height {height}")
data_bytes = co.data.user.gets(path)
_print_block(height, data_bytes)
continue
print(f"Downloading block at height={height}")
data = _download_block(height)
# Put the data into `co.data.user`.
data_bytes = json.dumps(data).encode()
co.data.user.puts(path, data_bytes)

If you download the demo, you can run this pipeline and see that it takes some time to download the data. But, if you click the Reset button and re-run the pipeline, you will see that it runs much faster. This is expected, because all of the data, aside from any new data generated since the pipeline last ran, is already in user data. Select any of the download nodes and look at the timeline in the node pane to see how long your first and second runs took.

The timeline shows that the first run took 1 minute and 88 MB of memory. The second run took 2.7 seconds and 47 MB of memory because the data was already in co.data.user.

That’s it! Now, with the information you learned in Your First Pipeline, Execution Environment, Environment Variables and Secrets, Node Parameters, Easy and Powerful Python Pipelines, and here, you can create arbitrarily complex data science pipelines.

--

--