Transform JSON to CSV from Google bucket using a Dataflow Python pipeline
In this article, we will try to transform a JSON file into a CSV file using dataflow and python
First, we’ll need a service account, give it the “Dataflow Worker” role and don’t forget to export it as a JSON at the end so we can use it later.
After downloading the service account JSON we need to set it up as an environment variable
export GOOGLE_APPLICATION_CREDENTIALS="/Users/afragotsis/Downloads/my_project.json"
Assuming this is the JSON we want to parse is this format
{
"product": {
"id": 1234567890,
"title": "My Super Awesome Product",
"vendor": "Vendor Test",
"product_type": "Type Test",
"created_at": "2020-05-11T16:07:45-04:00",
"updated_at": "2020-05-26T14:32:09-04:00",
"options": [
{
"id": 1234567890
}
]
}
}
Now we’ll write the pipeline. Stage 1 will read the JSON file directly from the file in GCS bucket, we’re using smart_open for this.
from google.cloud import storage
from smart_open import open
class ReadFile(beam.DoFn):
def __init__(self, input_path):
self.input_path = input_path
def start_bundle(self):
self.client = storage.Client()
def process(self, something):
clear_data = []
with open(self.input_path) as fin:
for line in fin:
data = json.loads(line)
product = data.get('product')
if product and product.get('id'):
product_id = str(product.get('id'))
vendor = product.get('vendor')
product_type = product.get('product_type')
updated_at = product.get('updated_at')
created_at = product.get('created_at')
product_options = product.get('options')
option_ids = []
if product_options:
for option in product_options:
option_ids.append(option.get('id'))
clear_data.append([product_id, vendor, product_type, updated_at, created_at, option_ids])
yield clear_data
yield clear_data passes the array we created to the next stage of the pipeline.
Stage 2 will get as input the cleared data and write the CSV file to the output path using Pandas
import pandas as pd
from google.cloud import storageclass WriteCSVFIle(beam.DoFn):
def __init__(self, bucket_name):
self.bucket_name = bucket_name
def start_bundle(self):
self.client = storage.Client()
def process(self, mylist):
df = pd.DataFrame(mylist, columns={'product_id': str, 'vendor': str, 'product_type': str, 'updated_at': str, 'created_at': str, 'option_ids': str})
bucket = self.client.get_bucket(self.bucket_name) bucket
.blob(f"csv_exports.csv")
.upload_from_string(df.to_csv(index=False), 'text/csv')
Write the actual Dataflow pipeline and save the file as DataflowTest.py
class DataflowOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input_path', type=str, default='gs://alex_dataflow_temp/input.json')
parser.add_argument('--output_bucket', type=str, default='alex_dataflow_temp')
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
dataflow_options = pipeline_options.view_as(DataflowOptions)
with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| 'Start' >> beam.Create([None])
| 'Read JSON' >> beam.ParDo(ReadFile(dataflow_options.input_path))
| 'Write CSV' >> beam.ParDo(WriteCSVFIle(dataflow_options.output_bucket))
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Finally, we need to create the requirements file with all the libraries we’ve used in our pipeline.
requirements.txt
google-cloud-storage==1.28.1
pandas==1.0.3
smart-open==2.0.0
Before generating the template file we need to create the temp and staging folders in our GCS bucket.
Now we can generate the Dataflow Template running the following command in the terminal from within the folder we have DataflowTest.py.
python DataflowTest.py \
--runner DataflowRunner \
--project my_project \
--staging_location gs://alex_dataflow_staging/ \
--temp_location gs://alex_dataflow_temp/ \
--template_location gs://alex_dataflow_template/MyTemplate \
--requirements_file requirements.txt \
--save_main_session True
The staging and temp locations are going to be used from Dataflow, template location is the bucket where the template is going to be stored.
** If for any reason the above command doesn’t work we might need to change and instead of using the requirements file we need to use the setup file.
setup.py
import setuptools
REQUIRED_PACKAGES = [
'google-cloud-storage==1.28.1',
'pandas==1.0.3',
'smart-open==2.0.0'
]
PACKAGE_NAME = 'my_package'
PACKAGE_VERSION = '0.0.1'
setuptools.setup(
name=PACKAGE_NAME,
version=PACKAGE_VERSION,
description='My setup file',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
)
and the dataflow template export command will look like this. Note that in the setup_file variable we need to put the full path on your local machine
python DataflowTest.py \
--runner DataflowRunner \
--project my_project \
--staging_location gs://alex_dataflow_staging/ \
--temp_location gs://alex_dataflow_temp/ \
--template_location gs://alex_dataflow_template/MyTemplate \
--setup_file /Users/afragotsis/Documents/github/.../setup.py \
--save_main_session True
After running the job we can find the template file in our GCS bucket.
Now we’re ready to run the Dataflow Job. Select “Dataflow” on the Google Console and then “Create Job from Template”, give the name “my_test_job” and then on the “Dataflow Template” select “Custom Template”. Browse and select the template on the path we’ve uploaded it, finally write the temp folder path uri.
after running we should see this in the dataflow execution console
and finally the CSV file in the temp folder
see full code here https://github.com/aFrag/PythonDataflow/blob/master/JsonToCsv.py
After you created your templates see how you can schedule them to run automatically with Airflow here