Creating an Elasticsearch to BigQuery Data Pipeline

Zohar Fischer
Inside Bizzabo
Published in
5 min readSep 26, 2019
Fancy pipeline

TL;DR: Creating an Elasticsearch => Dataflow => BigQuery data pipeline with Airflow in Kotlin and Python is simultaneously simple and extremely difficult. Here’s the code: https://github.com/bizzabo/elasticsearch_to_bigquery_data_pipeline

Why?

Good question. I was unable to find any description of an Elasticsearch to BigQuery pipeline online, only the opposite direction.
The purpose of the pipeline is to enable data exploration and drill down using tools which require the data to be stored in RDB form.

Networking

AWS
The Elasticsearch cluster storing the data I needed to get to is hosted on AWS EC2, and as you know BigQuery is a GCP product. That proved to be the first of many complications.
I wanted to reach the Elasticsearch cluster from outside the network so I used Route53 (AWS’s DNS service) to create a Record Set to bind a domain name to the underlying Elasticsearch cluster.

Creating a new record set in Route 53

But that wasn’t enough, the Elasticsearch cluster is located behind a firewall and is inaccessible from outside of the network. Hello there second complication.
To tackle this, I needed to know which IP addresses were going to access the Elasticsearch cluster. Let’s head over to GCP to see what we can do there.

GCP
Dataflow workers are created and run on ad-hoc VMs and cannot be bound to pre-specified VMs. This poses a problem because it is impossible to control which IP addresses the worker VMs will use to access the Elasticsearch cluster. Third complication of the bunch.
The solution I found was to create a new GPC VPC network dedicated solely to the pipeline.

Creating a new GCP VPC

The created VPC needed to be linked to a NAT gateway (Cloud NAT). A new NAT gateway was created and configured to use a number of static IP addresses for any egress traffic.

Creating a new NAT gateway. After creating a Cloud Router, static IP addresses may be allocated to the NAT

Linking the VPC with the NAT gateway would ensure any traffic originating from the VPC would be routed through the NAT gateway and out to the Internet through the NAT’s static IP addresses. This was done via Cloud Router.

Creating a new Cloud Router

Now that I knew which IP addresses would be used by Dataflow to connect to Elasticsearch, they need to be added to the correct Security group in AWS.
Back in EC2, I added a new Security group to the relevant Elasticsearch instances. The security group was configured to accept inbound TCP traffic from the static IP addresses used by Dataflow.

Creating a new rule in a Security group

To recap:
1. Dataflow workers run on ad-hoc VMs.
2. Running Dataflow on a VPC guarantees that the created workers and their VMs are created inside said VPC.
3. This ensures that all of the worker’s outgoing connections will be routed through your static IPs.
Steps:
1. Create a VPC for Dataflow
2. Create a NAT gateway with static IP addresses
3. Link the NAT to the VPC via Cloud Router

Pipeline Overview

I wrote the pipeline in Kotlin using Google BigQuery and Apache Beam libraries.
The code basically does 3 things:
1. Reads documents from Elasticsearch
2. Transforms the Elasticsearch documents to BigQuery rows
3. Writes the transformed documents to BigQuery
The rest (argument loading and mapping, query writing and logging) is fluff.

Reading from Elasticsearch

Reading from Elasticsearch

After creating the pipeline, fetching documents from Elasticsearch and injecting them into the pipeline is quite straightforward.
Reading from Elasticsearch is done using ElasticsearchIO (part of Apache Beam SDK) which requires a number of properties (source, index, type, connection timeout, socket and retry timeout, batch size) and finally a query.

Applying Transformations to the Data

Applying Transformations to the Data

The next step would be to convert the retrieved Elasticsearch document, which are in JSON format, into something that resembles a DB row.
This is achieved by mapping the returned elements according to the required schema. The JSON is parsed using Gson and any relevant keys and their values are extracted and set in the returned TableRow object.

Writing Data to BigQuery

Writing Data to BigQuery

Writing to BigQuery is the last step in this pipeline. The TableRow object created in the previous step must match the schema of the table we will be writing to.
Writing to BigQuery is done using BigQueryIO (also part of Apache Beam SDK), which requires the table schema (fields).

Dataflow job row

Dataflow

Each time the pipeline is run, a job is added to Dataflow. Clicking on a job reveals a treasure trove of information. Every aspect of the job is visible: a short summary, autoscaling, metrics, pipeline options and logs:

Airflow (Bonus Round)

Now that the pipeline is working, you might want to run it at regular intervals. The way I chose to do this was Apache Airflow (I won’t go into the steps required to set Airflow up, as instructions are readily available online).
The Airflow DAG requires the same arguments that would otherwise be provided directly to the pipeline, as well as the location of the jar that should be run.

Recommendations

I am not a data engineer and I fell down this rabbit hole with limited knowledge of GCP and absolutely zero knowledge of Apache Beam or Apache Airflow.
If you are new to this, like I was, I recommend starting small and working your way up. Try creating a simple pipeline, one that reads from a file and writes directly to BigQuery (without a transformation) and once that works flawlessly I would start adding complexity to the pipeline.
Also, GCP is finicky and trying to read and write data across regions and networks causes plenty of problems. If you can, I would recommend simplifying your network as much as possible before writing your first line of code.

I’ve uploaded the all of my code (Dataflow and Airflow) to a public GitHub repo with a comprehensive readme which goes into much more detail than this post does: https://github.com/bizzabo/elasticsearch_to_bigquery_data_pipeline
You are welcome to use and build upon it.
Good luck!

--

--

Zohar Fischer
Inside Bizzabo

Hands on team leader experienced in leading multi-disciplinary teams & delivering high-performance, scalable systems. Father to two Vizslas Starbuck and Boomer.