Apache Beam Bites

Mohamed Esmat
Nov 7 · 2 min read

If you are aiming to read CSV files in Apache Beam, validate them syntactically, split them into good records and bad records, parse good records, do some transformation, and finally save the output back to parquet / avro, then I have some delicious java code bites for you, ready to be consumed.

This article assumes some background about Apache Beam.

Image credits goes to Business Insider

Bite 1 — Read multiple CSV files in one PCollection

If you are having the input dataset as multiple physical csv files in local hdfs or some cloud storage, how would you read them all into one PCollection object to process down the stream?!

read csv files into one PCollection object

Bite 2 — Split bad records and good records

If the csv data is likely to have syntax issues or bad records, then it worth validating the records against the expected schema and separate the bad records from the good ones.

Create TupleTag objects and apply transformation to create PCollectionTuple

Bite 3 — Build avro schema from kv pair of column name and type

If you are after writing the CSV records into parquet format using ParquetIO, then you will need to have a PCollection of GenericRecord, GenericRecord in turn requires an avro schema, this code bite will give an example on how to create an avro schema from a kv map of column name and type.

Create an avro schema from a kv pair of column name and type

Bite 4 — ParDo implementation to parse and validate csv lines

A fourth code bite here to help you do your own ParDo implementation that is going to validate the csv lines syntactically and logically — if you want — before you convert them into GenericRecord following the avro schema that you created earlier using Bite 3. The ParDo implementation also side parks the bad records into a separate PCollection of String objects to be handled as appropriate in your application. This ParDo implementation is rather using MultiOutputReceiver concept that allows producing multiple PCollection objects encapsulated in PCollectionTuple, each PCollection in the PCollectionTuple is tagged with a meaningful TupleTag.

ParDo implementation to parse csv lines, validate, and split into bad and good records

Conclusion

Using the four code bites, you can simply write an application that reads in CSV lines, validates them against the expected schema, side parks the bad records into separate PCollection for later handling, and converts the valid string CSV lines into GenericRecord objects. Add your logical data transformations on top as needed, and then write the final output to parquet using ParquetIO.

pipeline
.apply(...) // PCollection<GenericRecord>
.apply(FileIO.<GenericRecord>
.write()
.via(ParquetIO.sink(SCHEMA))
.to("destination/path")

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade