Pachyderm for data scientists

Finally, version control for your data.

Pachyderm regards itself a language agnostic data pipeline

Pachyderm is the Git for your data. It tracks data revisions and associated transformations. It clarifies the data lineage and transformation dependencies. It actually runs the transformations, ensuring all data sets are always up to date.

Working towards reproducibility & traceability

Reproducibility is an important aspect of data science. You need to be able to reproduce every number or prediction you produce. After proving really good performance to your business clients, you need to be able to reproduce it the day after.

Traceability is another important aspect of data science. To be able to explain the cause of predictions to business, the full trace from raw data to final prediction should be clear. The data pipeline is the preparation to the modeling, so you better know what happens there. For example, when the performance drops, you want to know which parts were changed.

Reproducibility and traceability can be a challenge though. There can be quite some moving parts; the data sets, your code, the environment, and the model (including model type, parameters and meta-parameters). Getting everything right can already be a challenge on your own, let alone with a team.

So how could you get a reproducibility and traceability without too much effort?


Welcome Pachyderm!

Pachyderm delivers version control for data, using language-agnostic data pipelines. It doesn’t just version the data, it manages the transformations, and ensures all data is always up to date. This gives a clear data lineage. You can trace any data set back to their sources, including the date and version of your transformations. The main reason to use Pachyderm is, according to Pachyderm:

“Data scientists should be able to focus on data science, not infrastructure” — Pachyderm

So in which situation would you need Pachyderm?

When working with data sets, one is actually creating a network of transformations. One data set can be used by multiple transformers, and each transformer can use multiple data sets.

Now, let’s assume those data sets are stored in a central repository. When a data set is updated, ideally all depending data sets are automatically updated. Thus all transformers should also run in that central environment.

Next, assume different colleagues are all contributing to that network of data transformations. Thus including decentral updates.

And to not break existing networks, you not only want to have feature branches of your code, but also feature branches of the resulting data sets.

How would one manage that?

That is the situation Pachyderm targets:

  • It holds all your data in a central accessible location
  • It updates all depending data sets when data is added to or changed in a data set
  • It can run any transformation, as long as it runs in a Docker, and accepts a file as input and outputs a file as result
  • It versions all your data
  • It handles both modified data and newly added fractions of data
  • It can keep branches of your data sets when you are testing new transformation pipelines

In the data science process from raw data source to a serving model, Pachyderm focuses on data transformations and model serving. Model serving is regarded as always have a scored version of your data (so not the typical REST endpoint)

Pachyderm in the data science process: mainly relevant to run your data transformations and to score your data in batches.

The remainder of this post consists of two parts.

  • First we will put Pachyderm in action.
  • Next, we’ll evaluate Pachyderm’s functionality, with some conclusions about which expectations are met.

Pachyderm in action

Let’s setup a local Pachyderm cluster. The example here is on Max OS, using homebrew, for other operating systems one can refer to the Pachyderm documentation.

We will now put Pachyderm in action by:

  • Installing the prerequisites
  • Installing Pachyderm
  • Putting data in Pachyderm
  • Creating a pipeline in Pachyderm
  • Processing new data with the Pipeline
  • Updating the pipeline

Prerequisites

We start with MiniKube, a local Kubernetes cluster:

$ brew cask install minikube
==> Satisfying dependencies
All Formula dependencies satisfied.
==> Downloading https://storage.googleapis.com/minikube/releases/v0.28.2/minikube-darwin-amd64
######################################################################## 100.0%
==> Verifying SHA-256 checksum for Cask 'minikube'.
==> Installing Cask minikube
==> Linking Binary 'minikube-darwin-amd64' to '/usr/local/bin/minikube'.
🍺 minikube was successfully installed!

Then Kubernetes’ command line interface:

$ brew install kubectl

After which we can start the MiniKube cluster:

$ minikube start
Starting local Kubernetes v1.10.0 cluster...
Starting VM...
Downloading Minikube ISO
160.27 MB / 160.27 MB [============================================] 100.00% 0s
Getting VM IP address...
Moving files into cluster...
Downloading kubeadm v1.10.0
Downloading kubelet v1.10.0
Finished Downloading kubelet v1.10.0
Finished Downloading kubeadm v1.10.0
Setting up certs...
Connecting to cluster...
Setting up kubeconfig...
Starting cluster components...
Kubectl is now configured to use the cluster.
Loading cached images from config file.

Is our cluster running?

$ kubectl api-versions
admissionregistration.k8s.io/v1beta1
apiextensions.k8s.io/v1beta1
apiregistration.k8s.io/v1
apiregistration.k8s.io/v1beta1
apps/v1
apps/v1beta1
apps/v1beta2
authentication.k8s.io/v1
authentication.k8s.io/v1beta1
authorization.k8s.io/v1
authorization.k8s.io/v1beta1
autoscaling/v1
autoscaling/v2beta1
batch/v1
batch/v1beta1
certificates.k8s.io/v1beta1
events.k8s.io/v1beta1
extensions/v1beta1
networking.k8s.io/v1
policy/v1beta1
rbac.authorization.k8s.io/v1
rbac.authorization.k8s.io/v1beta1
storage.k8s.io/v1
storage.k8s.io/v1beta1
v1

Yes it is, so let’s continue.

Installing Pachyderm

With a running Kubernetes cluster we’re ready to deploy the Pachyderm services in our cluster.

First add the HomeBrew tap:

$ brew tap pachyderm/tap
==> Tapping pachyderm/tap
Cloning into '/usr/local/Homebrew/Library/Taps/pachyderm/homebrew-tap'...
remote: Counting objects: 11, done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 11 (delta 5), reused 3 (delta 0), pack-reused 0
Unpacking objects: 100% (11/11), done.
Tapped 5 formulae (42 files, 29.2KB).

Then we can install the components locally:

$ brew install pachyderm/tap/pachctl@1.7
==> Installing pachctl@1.7 from pachyderm/tap
==> Downloading https://github.com/pachyderm/pachyderm/releases/download/v1.7.7/pachctl_1.7.7_darwin_amd64.zip
==> Downloading from https://github-production-release-asset-2e65be.s3.amazonaws.com/23653453/615d2200-b1c4-11e8-812f-222bad0b9676?X-Amz
######################################################################## 100.0%
🍺 /usr/local/Cellar/pachctl@1.7/v1.7.7: 3 files, 50.0MB, built in 10 seconds

And subsequently deploy it to our Kubernetes cluster:

$ pachctl deploy local
No config detected. Generating new config...
No UserID present in config. Generating new UserID and updating config at /Users/gerben/.pachyderm/config.json
serviceaccount/pachyderm created
clusterrole.rbac.authorization.k8s.io/pachyderm created
clusterrolebinding.rbac.authorization.k8s.io/pachyderm created
deployment.apps/etcd created
service/etcd created
service/pachd created
deployment.apps/pachd created
service/dash created
deployment.apps/dash created
secret/pachyderm-storage-secret created

Pachyderm is launching. Check its status with "kubectl get all"
Once launched, access the dashboard by running "pachctl port-forward"

Is it running?

$ kubectl get all
NAME READY STATUS RESTARTS AGE
pod/dash-5d974d8668-f9nc8 2/2 Running 0 1m
pod/etcd-66858555cd-hcr4w 1/1 Running 0 1m
pod/pachd-5d5c8759b8-8fg2k 1/1 Running 0 1m

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/dash NodePort 10.111.248.26 <none> 8080:30080/TCP,8081:30081/TCP 1m
service/etcd NodePort 10.97.89.252 <none> 2379:32379/TCP 1m
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 9m
service/pachd NodePort 10.111.26.206 <none> 650:30650/TCP,651:30651/TCP,652:30652/TCP,999:30999/TCP 1m

NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
deployment.apps/dash 1 1 1 1 1m
deployment.apps/etcd 1 1 1 1 1m
deployment.apps/pachd 1 1 1 1 1m

NAME DESIRED CURRENT READY AGE
replicaset.apps/dash-5d974d8668 1 1 1 1m
replicaset.apps/etcd-66858555cd 1 1 1 1m
replicaset.apps/pachd-5d5c8759b8 1 1 1 1m

Yes it is (if AVAILABLE and READY are not yet 1, you have to wait a little longer).

Now we can start the Pachyderm daemon and dashboard:

$ pachctl port-forward
CTRL-C to exit
NOTE: kubernetes port-forward often outputs benign error messages, these should be ignored unless they seem to be impacting your ability to connect over the forwarded port.
Forwarding the dash (Pachyderm dashboard) UI port to http://localhost:30080 ...
Forwarding the pachd (Pachyderm daemon) port...

If you then go to the mentioned url, you will see a nice dashboard showing the status of all your (currently none) processes:

Pachyderm dashboard

Using Pachyderm — Create a repository

As we now have a working Pachyderm cluster, we can start using it.

As test case we’ll use the Iris dataset, divided in 3 parts. Just as code in Git, a file in Pachyderm lives in a repo. So we first create a repository:

pachctl create-repo iris

Next we add our file (first part of iris dataset), to the iris dataset, on the master branch:

pachctl put-file iris master /raw/iris_1.csv  -f data/raw/iris_1.csv

Now let’s see what we’ve created. First get overview of all repo’s:

$ pachctl list-repo
NAME CREATED SIZE
iris 2 minutes ago 4.492KiB

And we can retrieve the commits made to it:

$ pachctl list-commit iris
REPO ID PARENT STARTED DURATION SIZE
iris d804137d942a4916864b803fe3f0be0f <none> 26 seconds ago Less than a second 4.492KiB

and we can list the contents, of a certain branch:

$ pachctl list-file iris master
NAME TYPE SIZE
raw dir 4.492KiB

The Pachdash dashboard also shows statistics about the repo. The cloud button allows you to manually upload data (it supports gcs://, as://, s3://, gs://, wasb:// or http://).

Using Pachyderm — create a pipeline

In Pachyderm there are repositories (holding the data) and pipelines (holding the transformations). Pipelines read from any repository, and have their own associated repository to hold their output.

A pipeline can be any arbitrary program, as long as it reads from a file, and writes to a file. It does have to be implemented as a Docker image. Pachyderm will then mount the source & destination repositories into the running image, allowing you to interact with the files. Pachyderm will ensure the files are retrieved from the repository and are moved into the final repository.

As pipeline we’ll use a python based command line script which uses a pickle file of a sklearn pipeline to create predictions. Pachyderm will make new or updates files available under the pfs mount. In our case, we’ll configure that to be /pfs/iris. Data is expected to be written to /pfs/out. Knowing that, we can create a simple python application, which we will store in dsprod/models/train_model.py:

This will run in a Docker image, containing also the conda environment (created from a environment.yml) and the persisted model models/model.p. It’s too verbose to paste the full source code here, it can be found on Github. The Docker image is defined by the following Dockerfile:

We want to run our python file from within the conda environment. To have a simple entry point, the following runit.sh is used:

We now will create the Docker image and push it to Docker Hub, such that Pachyderm will be able to download it:

$ docker build . --tag gerbeno/iris:`date +%s`
...
Successfully tagged gerbeno/iris:1538732911
$ docker login
...
$ docker push gerbeno/iris:1538732911

Using Pachyderm — deploy pipeline

To be able to deploy the pipeline to Pachyderm, we need to tell Pachyderm which docker image to use, and which repository should be monitored for new source data. Output is always sent to the repository associated with the pipeline, having the name of the pipeline. Note that the pipeline spec is really flexible and allows a lot of configuration. In our case the following simple definition suffices:

The docker tag is added such that we can exactly specify which image is to be used. Using latest is possible, but then we won’t be able to force retrieval of a new docker. We would also lose reproducibility as the actual image depends on the moment of deployment. Therefore we use an explicit tag. Now we can finally deploy our transformation:

$ pachctl create-pipeline -f iris.json

It doesn’t give any feedback, so let’s ask Kubernetes if our Docker is running. The output shows that we now have one pod running our pipeline:

$ kubectl get all
NAME READY STATUS RESTARTS AGE
pod/dash-5d974d8668-f9nc8 2/2 Running 6 26d
pod/etcd-66858555cd-hcr4w 1/1 Running 3 26d
pod/pachd-5d5c8759b8-8fg2k 1/1 Running 3 26d
pod/pipeline-iris-classifier-v2-rjm6b 2/2 Running 0 6m

NAME DESIRED CURRENT READY AGE
replicationcontroller/pipeline-iris-classifier-v2 1 1 1 6m

The Pachdash also shows the changed pipeline. It changed in 3 aspects:

  • The iris repo shows that it’s input to the iris_classifier.
  • A new arrow is created, representing the iris_classifier
  • There is a companion repository named iris_classifier holding the output of our pipeline.

The pipeline runs jobs when data is either added or modified. As we already have some input data in place, which hasn’t been processed by this pipeline yet, deploying a dependent pipeline immediately results in a job:

$ pachctl list-job
ID OUTPUT COMMIT STARTED DURATION RESTART PROGRESS DL UL STATE
a1c...388 iris_classifier/849...f22 34 seconds ago 6 seconds 0 1 + 0 / 1 4.492KiB 1.268KiB success

To get the logs from the job itself, we can simply ask Pachyderm, and it will give the stdout:

$ pachctl get-logs --job a1c352e12fb749e1ba3e6d9ab7c8b388
... - __main__ - INFO - Loading the model from models/model.p
... - __main__ - INFO - Predicting on /pfs/iris/raw/iris_1.csv

It’s also possible to get logging information from the pod using Kubernetes:

$ kubectl logs pod/pipeline-iris-classifier-v2-rjm6b user
{"pipelineName":"iris_classifier","workerId":"pipeline-iris-classifier-v2-rjm6b","master":true,"ts":"2018-10-08T19:51:52.807797579Z","message":"Launching worker master process"}
{"pipelineName":"iris_classifier","workerId":"pipeline-iris-classifier-v2-rjm6b","master":true,"ts":"2018-10-08T19:51:52.868557066Z","message":"waitJob: a1c352e12fb749e1ba3e6d9ab7c8b388"}

Using Pachyderm — new data

With the pipeline in place, it will immediately process any data that is added or modified in the repository. Note that the granularity of processing is the complete file, Pachyderm won’t diff the files. A new file can then be added, and it will then show 2 runs.

$ pachctl put-file iris master /raw/iris_2.csv  -f data/raw/iris_2.csv
$ pachctl list-job

Using Pachyderm — updating a pipeline

As the pipeline holds your latest greatest transformations, it’s destined to be changed. It’s also possible that a previous pipeline has some errors (like forgetting to chmod +x *.sh).

In that case one updates the docker image, and can push that as a replacement to Pachyderm. An issue is the explicit docker tag reference in iris.json. With a new docker, one should also need to update the reference. Therefore the documentation of Pachyderm shows that using --push-images one can have that done automatically. So let’s update our pipeline accordingly:

$ pachctl update-pipeline -f iris.json --push-images
error parsing auth: Failed to read authentication from dockercfg, try running `docker login`
$ docker login
Authenticating with existing credentials...
Login Succeeded
$ pachctl update-pipeline -f iris.json --push-images
error parsing auth: Failed to read authentication from dockercfg, try running `docker login`

Unfortunately that doesn’t work as expected, at least not on my machine. Let’s do it ourselves:

$ docker build . --tag gerbeno/iris:`date +%s`
Successfully tagged gerbeno/iris:[tag]

Now we have to update our pipeline configuration iris.json to refer to our new docker tag. Note that you can’t use a tag like latest, as Pachyderm will only download it once. Pachyderm will download the docker from DockerHub, so let’s push it there, and subsequently update the pipeline:

$ docker login
...
$ docker push gerbeno/iris:[tag]
$ pachctl update-pipeline -f iris.json

The new pipeline is then in place, and will run for any updates or additions in the source repository. If the previous version was incorrect (or just failed), the files are already processed. In that case you can reprocess all existing data as follows:

$ pachctl update-pipeline -f iris.json --reprocess

Review of Pachyderm

The previous hands on part showed the basic usage of Pachyderm, we created data repositories, added data to it, and (re)deployed pipelines to process the data. Let me summarize my personal findings about Pachyderm.

+It’s great how the data repositories are similar to Git repositories. One can also work with branches of data, which is a great way to support different team members working with and on the same data sets.

Less practical is how to deploy a new version of your pipeline to the cluster. It increases the development cycle, and it cannot be fully automated. The steps are:

  • Build docker image, with new explicit tag
  • Push docker image to DockerHub
  • Insert new docker tag into pipeline definition
  • Push pipeline definition to Pachyderm cluster
  • Verify it is up and running

Especially the manual handling of the docker tag will limit you in putting this all easily into a CI/CD pipeline.

+/-Working with Pachyderm will require some effort of the data scientists. Just as a data scientist needs to be able to work with Git, the data scientist now also needs to know how to work with this Git for data. While these kind of tools are common and accepted as productivity enhancers within the data engineering world, it is (unfortunately) often less accepted by mainstream data scientists. This will restrict the adoption rate among scientists without strong engineering skills.

+It is really nice how Pachyderm simplifies the DevOps operations related to data wrangling. This allows a data science team to be fully self serving, which is key to flexibility and productivity.

As for the future of Pachyderm, I would really like to see it integrated in data science platforms like Dataku, Azure ML, or SageMaker. While we all like modelling, a lot of time is spent on getting the data sets correct. Instead of managing our own new data warehouse, it would be great to run & deploy data pipelines in those environments. Enriched with the versioning and branching, it would really accelerate data science teams.

In short:

Currently Pachyderm is great for data science teams with an engineering background cooperating on complex data pipelines across projects.

About the author

Gerben Oostra is our lead data scientist & engineer at BigData Republic, a data science consultancy company in the Netherlands. We hire the best of the best in Data Science and Data Engineering. If you are interested in using machine learning techniques on practical use cases, feel free to contact us at info@bigdatarepublic.nl.


Addendum: some tricks

Running minikube on a laptop, which goes to hibernate once a while, one can end up with a Pachyderm in error state. To get a complete clean slate, run the following:

$ minikube delete && minikube start && pachctl deploy local && sleep 60 && kubectl get all
$ kubectl get all # repeat until everything is ready, then:
$ pachctl port-forward

If you get context deadline exceedederrors, check if your port-forward is running. It’s necessary for the pachctl commands to work.