Transform JSON to CSV from Google bucket using a Dataflow Python pipeline

Alex Fragotsis
Analytics Vidhya
Published in
4 min readMay 31, 2020

In this article, we will try to transform a JSON file into a CSV file using dataflow and python

First, we’ll need a service account, give it the “Dataflow Worker” role and don’t forget to export it as a JSON at the end so we can use it later.

After downloading the service account JSON we need to set it up as an environment variable

export GOOGLE_APPLICATION_CREDENTIALS="/Users/afragotsis/Downloads/my_project.json"

Assuming this is the JSON we want to parse is this format

{
"product": {
"id": 1234567890,
"title": "My Super Awesome Product",
"vendor": "Vendor Test",
"product_type": "Type Test",
"created_at": "2020-05-11T16:07:45-04:00",
"updated_at": "2020-05-26T14:32:09-04:00",
"options": [
{
"id": 1234567890
}
]
}
}

Now we’ll write the pipeline. Stage 1 will read the JSON file directly from the file in GCS bucket, we’re using smart_open for this.

from google.cloud import storage
from smart_open import open

class ReadFile(beam.DoFn):

def __init__(self, input_path):
self.input_path = input_path

def start_bundle(self):
self.client = storage.Client()

def process(self, something):
clear_data = []
with open(self.input_path) as fin:
for line in fin:
data = json.loads(line)
product = data.get('product')

if product and product.get('id'):
product_id = str(product.get('id'))
vendor = product.get('vendor')
product_type = product.get('product_type')
updated_at = product.get('updated_at')
created_at = product.get('created_at')
product_options = product.get('options')

option_ids = []
if product_options:
for option in product_options:
option_ids.append(option.get('id'))

clear_data.append([product_id, vendor, product_type, updated_at, created_at, option_ids])

yield clear_data

yield clear_data passes the array we created to the next stage of the pipeline.

Stage 2 will get as input the cleared data and write the CSV file to the output path using Pandas

import pandas as pd
from google.cloud import storage
class WriteCSVFIle(beam.DoFn):

def __init__(self, bucket_name):
self.bucket_name = bucket_name

def start_bundle(self):
self.client = storage.Client()

def process(self, mylist):
df = pd.DataFrame(mylist, columns={'product_id': str, 'vendor': str, 'product_type': str, 'updated_at': str, 'created_at': str, 'option_ids': str})

bucket = self.client.get_bucket(self.bucket_name)
bucket
.blob(f"csv_exports.csv")
.upload_from_string(df.to_csv(index=False), 'text/csv')

Write the actual Dataflow pipeline and save the file as DataflowTest.py

class DataflowOptions(PipelineOptions):

@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input_path', type=str, default='gs://alex_dataflow_temp/input.json')
parser.add_argument('--output_bucket', type=str, default='alex_dataflow_temp')


def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
dataflow_options = pipeline_options.view_as(DataflowOptions)

with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| 'Start' >> beam.Create([None])
| 'Read JSON' >> beam.ParDo(ReadFile(dataflow_options.input_path))
| 'Write CSV' >> beam.ParDo(WriteCSVFIle(dataflow_options.output_bucket))
)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

Finally, we need to create the requirements file with all the libraries we’ve used in our pipeline.

requirements.txt

google-cloud-storage==1.28.1
pandas==1.0.3
smart-open==2.0.0

Before generating the template file we need to create the temp and staging folders in our GCS bucket.

Now we can generate the Dataflow Template running the following command in the terminal from within the folder we have DataflowTest.py.

python DataflowTest.py \
--runner DataflowRunner \
--project my_project \
--staging_location gs://alex_dataflow_staging/ \
--temp_location gs://alex_dataflow_temp/ \
--template_location gs://alex_dataflow_template/MyTemplate \
--requirements_file requirements.txt \
--save_main_session True

The staging and temp locations are going to be used from Dataflow, template location is the bucket where the template is going to be stored.

** If for any reason the above command doesn’t work we might need to change and instead of using the requirements file we need to use the setup file.

setup.py

import setuptools

REQUIRED_PACKAGES = [
'google-cloud-storage==1.28.1',
'pandas==1.0.3',
'smart-open==2.0.0'
]

PACKAGE_NAME = 'my_package'
PACKAGE_VERSION = '0.0.1'

setuptools.setup(
name=PACKAGE_NAME,
version=PACKAGE_VERSION,
description='My setup file',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
)

and the dataflow template export command will look like this. Note that in the setup_file variable we need to put the full path on your local machine

python DataflowTest.py \
--runner DataflowRunner \
--project my_project \
--staging_location gs://alex_dataflow_staging/ \
--temp_location gs://alex_dataflow_temp/ \
--template_location gs://alex_dataflow_template/MyTemplate \
--setup_file /Users/afragotsis/Documents/github/.../setup.py \
--save_main_session True

After running the job we can find the template file in our GCS bucket.

Now we’re ready to run the Dataflow Job. Select “Dataflow” on the Google Console and then “Create Job from Template”, give the name “my_test_job” and then on the “Dataflow Template” select “Custom Template”. Browse and select the template on the path we’ve uploaded it, finally write the temp folder path uri.

after running we should see this in the dataflow execution console

and finally the CSV file in the temp folder

see full code here https://github.com/aFrag/PythonDataflow/blob/master/JsonToCsv.py

After you created your templates see how you can schedule them to run automatically with Airflow here

--

--