Coding a batch processing pipeline with Google Dataflow and Apache Beam

Handerson Contreras
Analytics Vidhya
Published in
12 min readMay 4, 2019

Context

Some months ago I had a requirement to process some information coming from different sources and then put everything in Bigquery to enable the bussines take data-driven decisions.

The information was distributed in different databases. so, I had three different instances of the Google Datastore, I had one of CouchDb which was running on Compute Engine and I had one of Firebase.

For people that don’t know how CouchDb works. Well, mainly it is characterized by its API and it works based on map reduce algorithms. So, the only way to consume data from couchdb is through its API using HTTP requests.

So, if you think about it. We are talking about a common use case for dataflow giving a look to the feature image at the beginning of the story. The only difference is that firebase and couchdb would be two more components in the ingest column.

Challenges

To create the pipeline due that the company works with Google Cloud Platform mostly. They decided to use Google Dataflow, and as you may imagine I was going to have some challenges:

  1. First of all, the Authentication through different Google projects.

2. The hardest thing to do it was going to be the connection to CouchDb. Dataflow uses a library with strong support for python called Apache Bean to transform and manipulate the data. So, to build connections to databases Apache Bean already has some Drivers. But unfortunately for the case of CouchDb it does not have a native connector to retrieve the data.

3. It was my first experience working with Dataflow. So, I didn’t know anything about the main concepts of Apache Bean and Dataflow.

Facing Challenges

So, because I had these problems. Today I want to share some peaces of code about what I did to retrieve the data from couchdb using a custom connector built with apache beam, then I will show you how to format the data to the wished format to then write the final data to Bigquery.

1. First Challenge: Enabling Authentication:

For the authentication there was no too much problem, because Google Dataflow supports native authentication through different google projects. The only thing I had to do, it was enable the access for the Dataflow agent from the IAM and Admin menu in the Google cloud console, in the project where the datastore instances are located.

So, for example. Let say I have two google projects. One with projectId 501078922177 where dataflow runs as appengine flex instance and another one where the Datastore is located with projectId 576371170178: So, I want to have access to the Datastore from the other one where Dataflow is located.

What I have to do in the IAM and Admin of the project 576371170178 is grant the Dataflow user as a Cloud Datastore owner. The dataflow user it has a format as you will see below. but, you can find it in the project after you enable the Dataflow service in your Google project.

501078922177-compute@developer.gserviceaccount.com -> Dataflow user

576373870178-compute@developer.gserviceaccount.com -> Datastore user

2. Second Challenge: Working with Dataflow:

Dataflow is one of the biggest services offered by Google to transform and manipulate data with support for stream and batch processing. The main library provided to code on Dataflow is Apache beam which it has support not only for Google Dataflow, but also for another similar services that I will mention later. Apache beam is also available for java, python and Go.

Before starting to share the code, I would suggest you to read about some key terms about Beam and Dataflow: pcollection, inputs, outputs, pipeline and transformations. We will see most of them in the code later but you can read more about here.

Now, Lets get started coding:

Our example will be done using Flask with python to create an http trigger to fire the batch pipeline execution, but you can also use pubsub as a trigger.

Our Folder Structure is like this:

pipelines
__init__.py
api
__init__.py
endpoints
__init__.py
http_trigger.py
tools
pipe_options.py
__init__.py
couch
__init__.py
beam
__init__.py # Beam connector to retrieve data from CouchDb
helper
services
__init__.py
paginator.py
tools
__init__.py
datastore.py
transforms
__init__.py
couch_data.py
main.py
requirements.txt
app.yaml
setup.py
appengine_config.py
README.md

The first step is to create a main.py file:

2.1-> main.py: Basically defines the Flask App

"""main.py"""from __future__ import absolute_importimport os
from flask import Flask
from pipelines.api.endpoints.http_trigger import *
APP = Flask(__name__)if __name__ == '__main__':
print('serving on 0.0.0.0:8000')
APP.run(host='0.0.0.0', port=8000, debug=True

2.2 -> pipelines/api/tools/pipe_options.py:

On this file we will define the pipeline options about how the pipeline is going to be executed; the job name, the number of workers, the runner that can be one of the following:

In our example, we are going to use DataflowRunner due our project is going to be running on Google Dataflow. If you want to dive more in available runners for Apache Beam read more here.

"""pipelines/api/tools/pipe_options.py"""def get_pipeline_options(job_name, runner='DataflowRunner',m
max_num_workers=10):
"""Build pipeline options for profile_api."""

project = 'PROJECT_ID' #replace with your projectId
bucket = 'BUCKET' # replace with your cloudstorage bucket

pipe_options = {
'project': project,
'staging_location': 'gs://%s/dataflow' % bucket,
'runner': runner,
'setup_file': './setup.py',
'job_name': '%s-%s' % (project, job_name),
'max_num_workers': max_num_workers,
'temp_location': 'gs://%s/temp' % bucket
}
return pipe_options

2.3 -> pipelines/api/endpoints/http_trigger.py -> Flask route

"""pipelines/api/endpoints/http_trigger.py"""from __future__ import absolute_importfrom pipelines.api.tools.pipe_options import get_pipe_options
from pipelines.transform_couchdata import transform_couchdata
from main import APP@APP.route('/services/v1/pipeline_trigger')
def migrate_session_data():
pipe_options = get_pipeline_options('transform-couchdata')
transform_couchdata(pipe_options)
return 'Done', 200

See that the parameter transform-couchdata is our job name and migrate_data_fromcouchdb is the function that execute our batch process.

2.4 -> pipelines/couch/services/paginator.py

The paginate decorator you will see bellow, help us to retrieve the data from Couchdb.

To create our connector we are going to take advantage of one feature available on CouchDb, which are the views.

How do the views works on CouchDb?

Basically a view on CouchDb exposes the data stored on Couchdb to be consumed through http requests. Please read more about couchdb views here.

Lets suppose that we have built a view on couchdb, which we are able to consume throughout http requests from http://yourcouchdburl:5984/<database>/_design/<design>/_view/<view_name>, and that exposes the data in the following format:

{
"total_rows": "1000000",
"offset": "",
"rows": [
{
"id": "1344fdfa-f057-460d-906f-3421ddf..0000785e-f057-460d-906f-822ad7d43969..career",
"key": "0000785e-f057-460d-906f-822ad7d43969",
"value": "Software Engineering"
},
...
]
}

As you see a view is basically a web service that exposes data in json format. In the last example the id property has a format client..user_id..doc_name, so we will split it using the double points as separator, to then transform it to the format bellow, to then write it to a table in Bigquery named UserCareer:

{
"client": "1344fdfa-f057-460d-906f-3421ddf",
"user_id": "0000785e-f057-460d-906f-822ad7d43969",
"value": "Software Engineering"
}

How can we take that data to process it into our pipeline and then transform it?

Remember that the main idea of Dataflow is to execute hundreds of parallel process among hundreds of servers to speed up the process. So, imagine that we are going to do hundreds of requests to the couchdb view with different parameters to paginate and everything this, almost at the same time. Then, we are going to merge the different responses using a Beam transformation to put everything in the same pcollection. This process is commonly based in the concept of distributed programming. So, the following python decorator helps to do http requests.

"""pipelines/couch/service/paginator.py"""import logging
import requests
import urllib
def paginate(func):
"""Paginator."""
def func_wrapper(self, *args, **kwargs):
"""
Request a couchdb view.
This decorator can be used to decorate
methods part of the Requester class.
"""
couch_host = 'your couchdb host url or ip'
couch_port = '5984' # default port
database = self.database
design = self.design
view = self.view
limit = kwargs.get('limit', self.default_limit)
skip = kwargs.get('skip', self.default_skip)
params = {
'limit': limit,
'skip': skip
}
view_url = \
'{host}:port}/{database}/_design/{design}/_view/{view}?\
{params}'.format(
host=couch_host, port=couch_port,
database=database, design=design,
view=view, params=urllib.urlencode(params))
try: req = requests.get(view_url) if req.status_code != 200 and req.status_code != 204: raise Exception(
'Couch response with error, %s' % req.status_code)
response = req.json()
total_rows = response.get('total_rows')
offset = response.get('offset')
result = response.get('rows')
more = True

if total_rows == offset:
more = False

setattr(self, 'total_rows', total_rows)
setattr(self, 'offset', offset)
setattr(self, 'result', result)
setattr(self, 'more', more)
return func(self, *args, **kwargs) except ValueError, err:
msg = 'Invalid response in: %s: %s' % (view_url, err)
logging.error(msg)
raise ValueError(msg)
return func_wrapper

2.5 -> pipelines/couch/services/__init__.py

In this file, I’m going to define a class which will be useful to make http requests to couchdb api.

"""pipelines/couch/service/__init__.py"""import logging
import urllib
import requests
from pipelines.couch.services.paginator import paginateclass Requester(object):
"""Implement methods to retrieve documents from CouchDb
using http calls."""
def __init__(self, database, design, view, limit=10, skip=0):
self.database = database
self.design = design
self.view = view
self.default_limit = limit
self.default_skip = skip
@paginate
def request_view(self, *args, **kwargs):
"""Request the user-interests view using limit and skip
parameters"""
result = self.result
total_rows = self.total_rows
more = self.more
logging.debug(total_rows)

return result, total_rows, more
class Couch(object):
"""Implement methods to manage documents from couch"""
def __init__(self):
self.couch_host = 'http: # your ip or dns'
self.couch_port = '5984' # default couchdb port
def get_document(self, database, document):
"""Request a couchdb document"""
document_url = '{host}:{port}/{database}/{document}'.format(
host=self.couch_host,
port=self.couch_port,
database=database,
document=document
)
try: req = requests.get(document_url)
if req.status_code != 200 and req.status_code != 204:
raise Exception(
'Couch response with error, %s' % req.status_code)
response = req.json() return response except ValueError, err:
msg = 'Invalid response in: %s: %s' % (document_url, err)
logging.error(msg)
raise ValueError(msg)

2.6 -> pipelines/transform_couchdata.py

This file contains the pipeline definition. As you will see, the key peace of code is the ReadFromCouchDb class which is defined in the file pipelines/couch/beam/__init__.py that you will see in the item 2.7. Then the other key on this files is the custom transformation defined in the file pipelines/transforms/couch_data.py that you will see in the item 2.8. Basically this last one trasform the data to the format we need to write it in Bigquery. At the end ReadFromCouchDb returns a pcollection which is a set of items with tuples in format key-value. Then with FormatCouchDataFn we convert the data to the format client, user_id, doc_name.

"""pipelines/transform_couchdata.py"""from __future__ import absolute_importimport logging
import time
import apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import ParDo
from pipelines.couch import services
from pipelines.couch.beam import ReadFromCouchDb
COUCHDB_DATA_SCHEMA = ( # Bigquery schema
'user_id:STRING, client:STRING, value:STRING')
def transform_couchdata(pipe_options):
pipe = beam.Pipeline(
options=PipelineOptions.from_dictionary(pipe_options))
couchdb_data = (pipe | 'read data from couch' >>
ReadFromCouchDb(services.Requester(
'database', 'design', 'couchdb_view')))

formatted_couchdb_data = (
couchdb_data | 'Format Data' >> ParDo(FormatCouchDataFn()))
output_couchdb_table = '{project}:Dataset.UserCareer'.format(
project=project)
formatted_couchdb_data |
'Write Couchdb data to BigQuery' >> beam.io.WriteToBigQuery(
output_couchdb_table, # table name in bigquery
schema=COUCHDB_DATA_SCHEMA,
create_disposition=\
beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)

2.7 -> pipelines/couch/beam/__init__.py

"""pipelines/couch/beam/__init__.py"""from __future__ import print_function, absolute_importimport loggingfrom apache_beam.io import iobase
from apache_beam.transforms import (
PTransform, Create, ParDo, DoFn, GroupByKey, FlatMap)
from apache_beam.transforms.util import Values
from pipelines.couch import services
__all__ = ['ReadFromCouchDb']class ReadFromCouchDb(PTransform):
"""Read data from couchdb."""
_NUM_QUERY_SPLITS_MAX = 50000
_NUM_QUERY_SPLITS_MIN = 12
def __init__(self, request, batch_size=20):
"""Constructor."""
super(ReadFromCouchDb, self).__init__()
if not request:
ValueError('database cannot be empty')
self._request = request
self._batch_size = batch_size
def expand(self, pcollection):
"""Read From Couch expand"""
requests = (pcollection.pipeline
| 'UserQuery' >> Create(['1'])
| 'CouchRequest' >> ParDo(ReadFromCouchDb.SplitQueryFn(
self._request, self._batch_size)))
shared_requests = (requests
| GroupByKey()
| Values()
| 'Flatten' >> FlatMap(lambda x: x))
documents = (shared_requests
| 'Read' >> ParDo(ReadFromCouchDb.ReadFn()))
return documents
class SplitQueryFn(DoFn):
"""Split query"""
def __init__(self, request, batch_size):
self._request = request
self._batch_size = batch_size
def process(self, element):
"""Splitting Requests"""
result, total, more = self._request.request_view(1, 0)
batch_size = self._batch_size
skip = 0
number_of_splits = total / batch_size
requested_splits = []
logging.debug(
'total: %s and more: %s, number_of_splits:%s',
total, more, number_of_splits)
for split in range(1, number_of_splits):
requested_splits.append(
(split, services.Requester(
self._request.database,
self._request.design,
self._request.view,
limit=batch_size,
skip=skip)))
skip += batch_size return requested_splitsclass ReadFn(DoFn):
"""Make request to couch"""
def __init__(self):
super(ReadFromCouchDb.ReadFn, self).__init__()

def process(self, request):
"""Execute the request for each Requester object."""
results, total, more = request.request_view()
documents = []
if results:
for document in results:
documents.append((document.get('id'), document))
logging.debug('total: %s, more: %s', total, more) return documents

This file is one the most important and where I invest a considerable amount of time. Here we are defining our couchdb connector to retrieve the data from couchdb. What it does is basically request to couchdb several times appending every result into a pcollection that is returned at the end.

2.8 -> pipelines/transforms/couchdb_data.py

A useful class of Beam is the DoFn class, which allows define a transformation to then return a pcollection. So, the following class define a transformation named FormatGoalFn.

"""pipelines/transforms/couchdb_data.py"""import loggingfrom apache_beam.transforms import DoFn
class FormatGoalFn(DoFn):
def process(self, element):
"""Process user goals before write to bigquery."""
doc_id, document = element
splited_doc_id = doc_id.split('..')
client_id = splited_doc_id[0]
user_id = splited_doc_id[1]
document_value = document.get('value')
value = {
'client_id': client_id,
'profile_id': profile_id,
'career_name': document_value
}

yield value # used to add a row to the PCollection

2.9 -> pipelines/app.yaml

service: couchdataflow # the name of our appengine flex service
runtime: python
env: flex # runs on appengine flexible
entrypoint: gunicorn -b :$PORT main:APPruntime_config:
python_version: 2

2.10 -> pipelines/requirements.txt

Flask==0.12.2
apache-beam[gcp]==2.5.0
gunicorn==19.9.0
google-cloud-dataflow==2.5.0
google-cloud-storage==1.10.0
httplib2==0.11.3
requests

2.11 -> pipelines/setup.py

What is this file for? I told you earlier that you may imagine that our pipeline is going to be executed in multiples server. So, you can deduce that our pipeline needs to be a python package.

So, this file defines a python package built using setuptools to be installable in any machine running python 2.

from distutils.command.build import build as _buildimport subprocess
import setuptools
# This class handles the pip install mechanism.class build(_build): # pylint: disable=invalid-name
"""A build command class that will be
invoked during package install. The package built
using the current setup.py will be staged and
later installed in the worker using
`pip install package'. This class will be
instantiated during install for this specific
scenario and will trigger running the
custom commands specified."""
sub_commands = _build.sub_commands + [('CustomCommands', None)] # Some custom command to run during setup.
# The command is not essential for this workflow.
# It is used here as an example.
# Each command will spawn a child process.
# Typically, these commands will include
# steps to install non-Python packages. For instance,
# to install a C++-based library libjpeg62 the following
# two commands will have to be added:
['apt-get', 'update'],
['apt-get', '--assume-yes', install', 'libjpeg62'],
# First, note that there is no need to use the
# sudo command because the setup script runs with
# appropriate access.
# Second, if apt-get tool is used then the first
# command needs to be 'apt-get
# update' so the tool refreshes itself and initializes
# links to download
# repositories. Without this initial step the
# other apt-get install commands
# will fail with package not found errors.
# Note also --assume-yes option which
# shortcuts the interactive confirmation. # The output of custom commands (including failures)
# will be logged in the worker-startup log.
CUSTOM_COMMANDS = [['echo', 'Custom command worked!']]class CustomCommands(setuptools.Command):
"""A setuptools Command class able to run arbitrary commands."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print 'Running command: %s' % command_list
p = subprocess.Popen(
command_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Can use communicate(input='y\n'.encode())
# if the command run requires some confirmation.
stdout_data, _ = p.communicate() print 'Command output: %s' % stdout_data if p.returncode != 0:
raise RuntimeError('Command %s failed: exit code: %s' %\
(command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
REQUIRED_PACKAGES = []setuptools.setup(
name='dataflow_python_pipeline',
version='0.0.1',
description='DataFlow Python Pipeline package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
cmdclass={'build': build, 'CustomCommands': CustomCommands},
include_package_data=True,
zip_safe=False
)

3. Third Challenge: It was my first experience working with Dataflow

I’m pretty sure that work with Dataflow and its concepts can be very hard, overall because when I did this project it there was a very small documentation about it.

So, the first thing I did it was to read so much as I could do it. Everything regarding to Google Dataflow and Apache Beam.

But, the main thing I remembered in that moment, it was the importance and value of the unit tests. So, what I did it was to clone the Apache Beam repository, then read and analyze the unit tests again and again. That’s the main thing I suggest for people starting on this.

Thanks so much.

I hope this story being useful for you, please feel free of reach me out to handerson.contreras@gmail.com for questions or comments or even better please comment here.

Conclusions:

No matters which service are you using right now to host you applications, what it matters is the value you add to the product. So, before decide to migrate or upgrade to a new service or new technology you should consider the added value instead of popularity or trends.

As a software developer is pretty nice be in touch with latest technologies, but we have to think in business value before you.

References:

 by the author.

--

--

Handerson Contreras
Analytics Vidhya

Fullstack Software Engineer | GDG Cloud Tegucigalpa co-organizer