Data at Hampshire ici

I have to submit a written report of my work this semester, so I thought I would make it a blog post to give it some more value and so I can could get my thoughts out publicly on what I have been up to.

TLDR: I have a setup working with Apache Spark and Parquet to store genetic programming results, analyzing them in Jupyter Notebooks. I tried Apache Drill and liked Spark better for my workflow. Also I wrote a bunch of code to export Parquet from Clojure and then ended up not using it, and letting Spark write the Parquet.

This semester I had an independent study with Lee Spector, at Hampshire College. As a part of his lab studying genetic programming, I set to work solving a problem that no one asked to be solved. This was how I framed it:

Statement of Objectives
​I want to standardize the way researchers in the Computational Intelligence Laboratory at Hampshire College share their experimental results within the group. ​The current lack of standards sets a high barrier to entry for new researchers, makes it labor intensive to analyze and share results, and limits cross collaboration between researchers. ​Collecting and parsing the results of experiments (especially batch jobs) is often aided by custom tooling (grep/python scripts/graph databases). Understanding and running these tools presents another hurdle for doing research. ​It is also difficult to understand one’s results in the context of past experiments because their data might not be available. Even if it is, the tools to analyze them are not standardized. ​Sharing our results to the group is an opaque and subjective process. We may not present the original command to run the experiment and will summarize the results in our own words. This prevents other group members from re-running the experiments or offering alternative hypothesis around the data.
Planned Activities
​I propose automatically (opt-in) uploading experiment results to a centralized data store and providing a GUI frontend for the results. This would make it easier to analyze and share your results as well as greater group transparency. I propose designing, implementing, and deploying three systems; the uploader, the data store, and the frontend. The uploaders must be easy to use, performant, and plug into the existing Clojush codebase. The data store must have sufficient space to store all of our runs for a reasonable cost. The frontend must support custom visualizations of the data from any subset of experiments. ​The whole system must also be secure to only the members of the group. Also, the data should be stored in a structured and homogeneous format, so that it can be visualized consistently.

I began by floating the idea to the group, and came up with a basic process:

  1. Figure out what questions we wanted to be able to answer with this system.
  2. Determine what data we needed to save in order to address these questions.
  3. Understand what type of system is best for storing and querying this type of data.
  4. Build that system!

Here I will try to document what my process has been and what the current status is of the system (note to self, write this up as I go next time! I probably would have been able to have some unneeded work if I was more aware of my general trajectory throughout the semester).

Initial ideas

Graph of individuals in one run

I wanted this system to supersede all existing reporting/analysis tools that we currently use. This includes graph’s that Nic McPhee has been making, visualizing the evolution of a run, with parent and children connected with edges, across the generations. Also, more traditional plots of different types of metadata across a population.

Age of an individual, versus it’s total error

I felt it was important to be able to run the gamut of these visualizations, so that this tool could serve as a democratizing force within the group. Currently, the knowledge on how to parse the output of these runs and make these sorts of graphs is siloed within the group, and is overall a rather labor intensive process. I hoped that if anyone in the group could run any of these analysis on any data that anyone collected, we would all benefit. Also it would let the people in the group who are good at designing these sorts of visualizations, multiply their effect by allowing anyone to replicate and change them for their own data.

Protocol Buffer schema for the data generated during a Clojush run

So we had to store at least enough data to be able to reconstruct these types of graphs. In order to understand what this data even looks like, I created a Protocol Buffer schema for all the data I could possibly think we would ever want that is generated during a Clojush run. I did this by basically tracing the -main Clojush function.

The first things I noticed is that this format is very nested. Each run has a number of generations, each that have a number of individuals, that each have a number of Plush genes. So far, most of our analysis has been per-generation, and we haven’t been able to do much per individual (besides things like the graph above) or per gene, but this data is still important. If it was accessible, then we would be able to do more in intricate analysis.

This nested structure, makes it a bit cumbersome to put in something like PostgreSQL. I would imagine making run, generation, individual, and gene tables that all refer to one another. This doesn’t seem very fun, for a couple of reasons. First, it would just be a whole lot of work to add or read a whole run to the database. Also, changing the schema would require migrating over the whole table. However, these two problems are both dwarfed by the real show stopper, which is the size of the data.

One of the reasons no one has done this yet in the lab, is that we just collect a lot of data. Let’s do some back of the envelope math to figure out how much data, uncompressed, a single run might be, using some standard parameters we use:

  • 32 bytes per PlushInstructionMap (16 for the UUID and 16 for the instruction)
  • 200 PlushInstructionMaps per individual
  • 1000 individuals per generation
  • 300 generations per run
  • = 1.92GB

We do maybe 10 of these a day. This just seems like too much data to put in a relational database. We make those pretty graphs above using Neo4J (a graph database), which is really nice for those sorts of visualizations, but again, it would be hard to store this much data. We have had trouble loading in just one large run into Neo4J.

But what about the cloud? I went into this hoping I could offload a lot of the complexity to some hosted service. In particular, I had been looking at Google BigQuery, which is their hosted product to manage “large-scale data analytics.” However, it quickly became clear that this would cost us quite a bit, and also we aren’t set up to bill cloud resources. According to Lee, it is easier to handle the finances of provisioning more physical servers than paying Google per month for some service.

Open source Google BigQuery?

BigQuery seems to be what I want, because it allows you to just through nested data at it as-is. I found Apache Drill as an open source alternative that also implemented support for nested data. I tried to listen to every podcast I could find about these types of services, and quickly found myself moving into some weird world of business intelligence. They have some “Magic Quadrant” that apparently everyone cares about? That shows you what products to use? The next hot technology bandwagon I hopped on was Parquet! It’s awesome! It’s like typed JSON, but it stores things much smaller and let’s you just pull one column into memory if you need it. More magic! Also it seems to be where we are going, especially since it’s cousin Apache Arrow, which deals with in memory representations of data, is being very actively worked on in Python and maybe will underlay Pandas 2.0!

So I started getting excited here, because throwing everything on disk has some nice advantages over a database. First, it is much easier to understand how much space your data is taking up, and where it is stored. It is also very easy to distribute your data, just copy it onto an external hard drive!

Great great great, all great. So, looking at the Apache Drill documentation on nested data, I decided to lay it out like this:

  • At the start of each run in Clojure, write a config/<uuid>.parquet. Store information about the parameters, version, and a user specified label.
  • At the end of each generation, write generation/<uuid>/<index>.parquet. Store the full population and any heuristics we generate.

That way, if a run fails halfway through we still have the initial data. Also, hopefully we won’t have to load every run into memory, if we are just looking for a certain UUID, or maybe all run’s with a certain label.

Clojure -> Parquet

Before I could try this out, I had to figure out how to write to Parquet from Clojure. One thing that tripped me up a bit about Parquet at first was that you don’t just write a raw Parquet file (even though it has nice data types and all). Instead, you write some Protocol Buffer or Avro schema and write a Parquet file, as if you were writing one of those files. I don’t quite get why they have this extra level of abstraction on top, but I guess for interoperability? So at first I thought I could use the abracad Clojure library to write Avro into Parquet files, but no.

Well, no big deal, it was probably better not to go through that extra level of abstraction anyway. Why not just write Parquet directly? So I dove straight into the Parquet Java implementation (pull out your IDEA kids!)

So I built a Parquet Clojure writer that was pretty neat (I thought). It automatically figured out the schema for your Clojure data structure and wrote it into a Parquet file. I also got to play with all of Clojure’s Java interop, type hints, and did some profiling so it wasn’t too horribly slow.

I started writing some Parquet files from Clojush and reading them with Apache Drill, and I quickly hit some problems. Since I was automatically generating the schema on the fly, it wasn’t consistent across runs. Since Parquet doesn’t support heterogenous data types in an array, if I encountered these I would turn them into a string. So on some runs, I might turn some array into numbers and another one into strings. Then, when I tried to load both of them into Apache Drill, it would fail with an incompatible schema error.

Also, I realized I didn’t enjoy writing SQL like BS with nested things in it. It got very complicated quickly.

Apache Spark?

Since I was sick of SQL like things, I tried playing around with Apache Spark. It can also load Parquet files, and query them like Apache Drill. Also, I can access it in Python! So I ditched Apache Drill and moved to Apache Spark.

It took me way too long to figure out that auto generating schemas was just a bad idea, when I wanted my data to be in a consistent format. But finally I got the message, and re-wrote my Clojure implementation to specify a schema manually and write to that schema. Oh, I also re-wrote the whole thing using macros, because I wanted it to be faster. So it would use the schema at compile time to generate Clojure code that would write that specific data structure to Parquet. Pretty fun.

Writing Parquet isn’t for me

But… then came the cold hard truth… It was still too slow! Not writing data, but reading it from Spark. It would take minutes to query just one run of data. Part of it was that my Parquet files ended up being only KBs large, which wasn’t good for Spark. It prefers a larger file size. I seem to read that around 1GB or at least a couple hundred MB per Parquet file is good. Also, it has it’s own Parquet metadata files it writes, so that it doesn’t have to read the whole Parquet file to understand the schema. Since I was writing the Parquet files, it didn’t have those metadata files to speed up the reads.

I tried rewriting them from Spark, and then reading them, and it was much faster. So I realized that I didn’t want to be writing my Parquet files from Clojure, but instead from Spark.

Side not on storage underlayer

At the same time, I also realized that I didn’t need to use Alluxio to store my data. I was using this instead of HDFS, because it’s docs were prettier, it was newer, and it seemed to be more actively developed. But it also isn’t as mature. So I moved directly to using HDFS. I also got his working in Docker Swarm, from a Docker Compose file, using Docker Stacks. So that I can just write that one docker-compose.yml file and have it work in development and in production. Getting HDFS and Alluxio working in Docker networking was very painful and I am sure I didn’t get it write.

Spark Structured Streaming

Back to the main story, how do I get Spark to write Parquet files, when all my data is sitting in Clojure process? Spark Structured Streaming to the rescue!

With this awesome alpha level feature, you can read data from a streaming source (socket, file, or Kafka queue) and process it just like if it was a non streaming Spark DataFrame.

So instead of Clojure writing Parquet, I just have it write JSON and then Spark reads that in, with a schema I provide, and writes out Parquet files. I first tried the Socket streaming, by writing a server in Go that relayed TCP sockets from Clojure clients that connected to Spark clients that listened. However, I was getting some memory errors with this, and since it listed sockets as only usable for testing, I switched to the file API. I wrote another Go server that again listened for TCP connections from Clojure and saved each lint sent as a JSON file with a unique name. Then in Clojure I just send each generation and config over that TCP connection as JSON.

This worked much better, allowing Spark to do what it’s good at (writing parquet files) and moving the heavy lifting off of Clojure. I also got to delete all my buggy macro code I spend hours writing!

Since joining two spark structured streams together isn’t supported, I decided to just embed my configuration into every generation and duplicate that data. I realized it was just very small in comparison to each generation, so didn’t add much space, and made later computation much easier, because I then didn’t have to join configuration’s with generations.

I was finally able to do some good work! I can load in all my generations:

generations = spark.read.parquet(output_generations_folder)

I can use Spark’s window function to get the last generation of each run (helpful to know if it has finished or not):

window_spec = pyspark.sql.window.Window \
.partitionBy(generations['config-uuid']) \
.orderBy(generations['index'].desc())
generations_w_rank = generations\
.withColumn("rank", pyspark.sql.functions.rank().over(window_spec))
last_generations = generations_w_rank \
.filter(
generations_w_rank.rank == 1
).drop("rank")

I can then export then sum over all the run’s, grouping by different configurations, and send this to Pandas, to see which configuration have more successes:

df = last_generations \
.select(
"index",
pyspark.sql.functions.from_json(
"config.argmap.genetic-operator-probabilities",
StructType([
StructField('genesis', PercentType())
])
).genesis.alias("genesis"),
"outcome",
"index",
"best.mean-test-error",
"config.argmap.age-mediated-parent-selection",
"config.argmap.age-combining-function",
"config.argmap.meta-error-categories"
) \
.replace("false", "[0,1]", ["age-mediated-parent-selection"]) \
.filter(
(pyspark.sql.functions.col("outcome") != 'continue') \
| (pyspark.sql.functions.col("index") > 164)
) \
.groupby(
"genesis",
"meta-error-categories",
"age-mediated-parent-selection",
"age-combining-function",
"outcome"
) \
.agg(
pyspark.sql.functions.collect_list("index").alias("# generations per run"),
pyspark.sql.functions.count("outcome").alias("number of runs"),
pyspark.sql.functions.collect_list("mean-test-error").alias("mean-test-errors"),
) \
.sort(
'genesis',
'meta-error-categories',
'age-mediated-parent-selection',
'age-combining-function'
) \
.toPandas()

Also, I can print the errors over one a variety of configurations:

error_df = generations \
.select(
pyspark.sql.functions.col("index").alias("generation"),
pyspark.sql.functions.from_json(
"config.argmap.genetic-operator-probabilities",
StructType([
StructField('genesis', PercentType())
])
).genesis.alias("genesis"),
"best.mean-error",
"config.argmap.age-mediated-parent-selection",
"config.argmap.age-combining-function",
"config.argmap.meta-error-categories",
"config-uuid",
) \
.replace("false", "[0,1]", ["age-mediated-parent-selection"]) \
.select(
"mean-error",
"config-uuid",
"generation",
pyspark.sql.functions.to_json(
pyspark.sql.functions.struct(
"age-mediated-parent-selection",
"age-combining-function",
"meta-error-categories",
"genesis",
)
).alias("config")
) \
.toPandas()
Chart(error_df).mark_line().encode(
x='generation',
y='mean-error',
color='config-uuid',
row='config'
)

Anyway, these graphics aren’t great. But the point is I can do all this from a Jupyter notebook, all declaratively within the Spark dataframe API, move it into Python, and it isn’t too slow!

Also, it uses very little hard drive space. It’s at least a 100x reduction from JSON, but haven’t run any numbers yet.

Current problems

There are a few show stoppers preventing me from bothering everyone at the lab for trying it out yet. The first is that the JSON files are not garbage collected. After spark processes them, they just sit on the machine taking up space and quickly fill the 50GB hard drive I provisioned on GCE. I have written a few versions of a cleanup script in Go that parses Spark’s checkpoint files and deletes used JSON files, but both seem to delete data before Spark reads it rendering the whole system useless (woops!) I hope that Spark will resolve this issue for me, but I will likely have to get something working in the short term.

The other thing is that I am currently running Spark in Jupyter Notebooks, but I should run it as it’s own Docker image. I want the Spark streaming transformation to be happening all the time, and it would be nice if it was it’s own Docker image. Otherwise, I have to worry about keeping this notebook up and running always.

Also, I think it should probably all run on Kubernetes instead of Docker Swarm, since they are putting a lot of work into getting that working better than I have it working.

I also wanna setup Jupyterhub, so that we can all have accounts and have our own files.

Also, security! Currently the server is running on a wide open TCP port. That isn’t good. Need to figure out some way to secure that and then access that port in Clojure.