Building a data pipeline with Apache Beam and Elasticsearch (7.3 and 6.8) on Google Cloud Platform (Part 1)

Anna Epishova
Google Cloud - Community
7 min readAug 12, 2019
Photo by Rodion Kutsaev on Unsplash

This is a three-part series aiming to help data engineers to start building data pipelines using Apache Beam Java SDK and ElasticSearch on Google Cloud Platform. The assumption is that the reader is somewhat familiar with GCP and the basic principles of creating data pipelines but is a novice Java developer.

The resulting pipeline will contain PubSub + DataFlow + Elasticsearch + Kibana. That is, you’ll ingest messages from PubSub, then use DaraFlow to process those messages and insert them into Elasticsearch, and finally visualize data in Kibana.

In this first part of the series, you’ll do the following:

  • Install Elasticsearch and Kibana on GCP.
  • Have a look at Elasticsearch 7.3 and 6.8 and some of their differences.

In the second part you will:

  • Create and build an Apache Beam pipeline.
  • Install and configure IntelliJ to work with Beam Java SDK and use Maven and IntelliJ to build and run the pipeline.

Finally, in the third part of the series you’ll examine the pipeline source code and learn how it works.

Install Elasticsearch and Kibana on a VM instance

Although, for Elasticsearch there is a ready-to-go solution available on GCP Marketplace, you will install ElasticSearch and Kibana manually on a virtual machine. At the time of writing this post, there is a free click do deploy Elasticsearch Cluster which sets up a kubernetes cluster and installs Elasticsearch 6.3 on it. However, it’s a relatively old version of Elasticsearch, and it requires an old version of Kibana, which is less powerful and advanced in comparison to the most recent one.

You’ll begin by creating a new VM instance. Open Compute Engine tab and click Create Instance. The default settings will work fine for this simple example but before clicking Create at the bottom of the page, enable HTTP/HTTS traffic for the new VM. This will add two network tags to our VM: http-server and https-server.

GCP screenshot describing virtual machene settings
Create a new VM instance using the default settings and having HTTP and HTTPS traffic enabled

Next, navigate to VPC Network, choose Firewall Rules, and create two firewall rules to allow ingress TCP traffic to Elasticsearch port 9200 and Kibana port 5601. To define the compute instances to which the new rule applies you can specify network tags (http-server and https-server in our example). For simplicity, allow incoming traffic from all instances by setting Source IP ranges to 0.0.0.0/0. The below screenshot shows how to define the first rule. For the second rule just change the port number to 5601.

GCP screenshot to create a new firewall rule
Create a firewall rule to allow ingress TCP traffic to Elasticsearch port 9200

Elasticsearch requires Java installed on our virtual machine. So, after the VM was provisioned, SSH to it and install Java:

sudo apt-get update
sudo apt-get install default-jre

Now, you are ready to install Elasticsearch. Here is how to install Elasticsearch 7.3:

wget -qO — https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -sudo apt-get install apt-transport-httpsecho “deb https://artifacts.elastic.co/packages/7.x/apt stable main” | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.listsudo apt-get update && sudo apt-get install elasticsearch

Installation steps for version 6.x are very similar just replace the third command above by the following:

echo “deb https://artifacts.elastic.co/packages/6.x/apt stable main” | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list

Next, configure Elasticsearch:

sudo vi /etc/elasticsearch/elasticsearch.yml

Uncomment and set network.host: 0.0.0.0 . If you installed Elasticsearch 6.x the above change in elasticsearch.yml would suffice but for version 7.3 one more thing is required. Uncomment and set cluster.initial_master_nodes: node-1

Next, restart Elasticsearch:

sudo service elasticsearch restart

Wait a minute and test if Elasticsearch is up and running by sending an HTTP request to it:

curl localhost:9200

If the service works, the response should contain “You Know, for Search”. If you don’t see it, you can check if any process listens 9200:

sudo netstat -a -p | grep 9200

Check if Elasticseach is running:

ps aux | grep elastic*

And look incide the logs:

sudo ls -l /var/log/elasticsearch/

For example, if you forgot to set cluster.initial_master_nodes for 7.3 you would see something like this in /var/log/elasticsearch/elasticsearch.log:

node validation exception
[1] bootstrap checks failed
[1]: the default discovery settings are unsuitable for production use; at least one of [discovery.seed_hosts, discovery.seed_providers, cluster.initial_master_nodes] must be configured

Now, you are ready to create your first index in Elasticsearch, and populate it with a test document. The easiest way to run the following commands is by using GCP Cloud Shell. First, define the variables:

export ELASTIC_CLUSTER=http://x.x.x.x:9200
export ELASTIC_INDEX=citibikes

For version 6.8 you would need to add one more variable:

export ELASTIC_TYPE=trips

Here x.x.x.x is the external IP address of our newly created VM and issuing curl $ELASTIC_CLUSTER from GCP cloud shell should return “You Know, for Search”.

Note: when you reconnect to GCP cloud shell, the environment variables which you set up in the expired session, are not there any longer and should be defined again. If you expect to spend some time on working with the examples described in this post, it may be advantageous to install Google Cloud sdk on your local machine and run all commands from there.

As was mentioned at the beginning of this post, the pipeline will store messages in Elasticsearch. For doing so, you’ll use an index called $ELASTIC_INDEX. Here is another difference that was introduced in version 7.0. Starting from this version mapping types were deprecated. It changes the API calls as you’ll see later in this post. This is why $ELASTIC_TYPE was defined only for version 6.x.

In order to visualize data properly in Kibana you need to define a schema for the index. It can be done by defining a mapping in Elasticsearch. Without this step all document fields in the index will be recognized by Elasticsearch as strings and will not be available for aggregation. In this example, you’ll ingest messages from a public BigQuery dataset `bigquery-public-data.new_york_citibike.citibike_trips`.

Use the following json to define a mapping for 7.x:

curl -X PUT "$ELASTIC_CLUSTER/$ELASTIC_INDEX" -H 'Content-Type: application/json' -d'
{
"mappings": {
"properties": {
"tripduration": { "type": "integer" },
"starttime": {"type": "date"},
"stoptime": {"type": "date"},
"start_station_id": { "type": "integer" },
"start_station_name": { "type": "text" },
"start_station_location": { "type": "geo_point" },
"end_station_id": { "type": "integer" },
"end_station_name": { "type": "text" },
"end_station_location": { "type": "geo_point" },
"bikeid": { "type": "integer" },
"usertype": { "type": "keyword" },
"birth_year": {"type": "integer"},
"gender": { "type": "keyword" }
}
}
}'

And this one for 6.x:

curl -X PUT "$ELASTIC_CLUSTER/$ELASTIC_INDEX" -H 'Content-Type: application/json' -d'
{
"mappings": {
"'$ELASTIC_TYPE'": {
"properties": {
"tripduration": { "type": "integer" },
"starttime": {"type": "date"},
"stoptime": {"type": "date"},
"start_station_id": { "type": "integer" },
"start_station_name": { "type": "text" },
"start_station_location": { "type": "geo_point" },
"end_station_id": { "type": "integer" },
"end_station_name": { "type": "text" },
"end_station_location": { "type": "geo_point" },
"bikeid": { "type": "integer" },
"usertype": { "type": "keyword" },
"birth_year": {"type": "integer"},
"gender": { "type": "keyword" }
}
}
}
}'

Fields defined as text will be indexed and available for full text search. Note single quotes around $ELASTIC_TYPE variable. Without them shell will not expand the variable.

Test Elasticsearch

To check if Elasticsearch works properly you can insert a test message using the following command for 7.x:

curl -X POST “$ELASTIC_CLUSTER/$ELASTIC_INDEX/_doc/” -H ‘Content-Type: application/json’ -d’{ “tripduration”: 202, “starttime”: “2015–08–06T10:20:39”, “stoptime”: “2015–08–06T10:21:41”, “start_station_id”: 520, “start_station_name”: “W 52 St & 5 Ave”, “start_station_location”: {“lat”: 40.85992262, “lon”: -73.87648516}, “end_station_id”: “520”, “end_station_name”: “W 52 St & 5 Ave”, “end_station_location”: {“lat”: 40.85992262, “lon”: -73.87648516}, “bikeid”: 200, “usertype”: “Customer”, “birth_year”: 1990, “gender”: “male” }’

For 6.x, you need to provide $ELASTIC_TYPE instead of _doc in 7.x:

curl -X POST “$ELASTIC_CLUSTER/$ELASTIC_INDEX/$ELASTIC_TYPE/” -H ‘Content-Type: application/json’ -d’{ “tripduration”: 202, “starttime”: “2015–08–06T10:20:39”, “stoptime”: “2015–08–06T10:21:41”, “start_station_id”: 520, “start_station_name”: “W 52 St & 5 Ave”, “start_station_location”: {“lat”: 40.85992262, “lon”: -73.87648516}, “end_station_id”: “520”, “end_station_name”: “W 52 St & 5 Ave”, “end_station_location”: {“lat”: 40.85992262, “lon”: -73.87648516}, “bikeid”: 200, “usertype”: “Customer”, “birth_year”: 1990, “gender”: “male” }’

The above commands do not provide a document ID, so it will be set up automatically by Elasticsearch. If everything worked fine, the output of these commands should contain information about the created document and “successful”:1 entry.

Now, when Elasticsearch works, you can install Kibana on the same VM. Note that you already installed the public signing key, apt-transport-https, and saved the repository definition as part of installing Elasticsearch. So, the remaining step is to ssh to the VM and run:

sudo apt-get install kibana

Next, configure Kibana:

sudo vi /etc/kibana/kibana.yml

You need to set server.port: 5601 and server.host: “0.0.0.0” there. Next, restart the service:

sudo service kibana start

In your browser type http://x.x.x.x:5601/ where x.x.x.x is the external IP address of our VM, and you should see the Kibana welcoming page.

Next, you can use Kibana to find the test document that was inserted into Elasticsearch on the previous step. To do so, go to Management tab, then Index Pattern, and click Create Index Pattern. Here, you can specify the indexes that you’d like to explore in Kibana. Create citibikes* pattern with no time filters.

Screenshot of creating index in Kibana
Create index pattern citibikes* in Kibana

Next, if you go to Discover page and choose the index pattern youshould see the test message.

Now, when you have Elasticsearch and Kibana up and running you are ready to move to the next part of this post. The second part will:

  • Describe how to create and build an Apache Beam pipeline.
  • Demonstrate how to install and configure IntelliJ to work with Beam Java SDK and use Maven and IntelliJ to build and run your pipeline.

--

--