Process AVRO files in Azure Synapse Analytics Integrate Data Flow or ADF
Published in
3 min readJun 5, 2021
IoT and other AVRO schema changeable format
Requirements
- Azure Account
- Azure Storage Account
- Upload sample avro or generate sample
- create a container and upload the avro file
- Find the avro schema
- Azure synapse analytics workspace
- Create a integration runtime
Steps
- First create a storage container
- I have uploaded 22.avro sample file in the data folder in this repo
- Sample is below
SequenceNumber,Offset,EnqueuedTimeUtc,SystemProperties,Properties,Body,BodyNew
507,364464,5/20/2021 11:46:23 PM,,,"{\"applicationId\":\"3a11d300-d923-4a3d-9cd9-75364b23f710\",\"messageSource\":\"telemetry\",\"deviceId\":\"mymxchipbb\",\"schema\":\"default@v1\",\"templateId\":\"urn:6bccexgax:ex1fym5he\",\"enqueuedTime\":\"2021-05-20T23:46:21.84Z\",\"telemetry\":{\"gyroscope\":{\"z\":350,\"x\":1400,\"y\":-3150},\"accelerometer\":{\"x\":-67,\"y\":-735,\"z\":686},\"humidity\":50,\"temperature\":30.299999,\"pressure\":999.015137,\"magnetometer\":{\"y\":270,\"z\":-251,\"x\":185}},\"messageProperties\":{},\"enrichments\":{}}","{\"applicationId\":\"3a11d300-d923-4a3d-9cd9-75364b23f710\",\"messageSource\":\"telemetry\",\"deviceId\":\"mymxchipbb\",\"schema\":\"default@v1\",\"templateId\":\"urn:6bccexgax:ex1fym5he\",\"enqueuedTime\":\"2021-05-20T23:46:21.84Z\",\"telemetry\":{\"gyroscope\":{\"z\":350,\"x\":1400,\"y\":-3150},\"accelerometer\":{\"x\":-67,\"y\":-735,\"z\":686},\"humidity\":50,\"temperature\":30.299999,\"pressure\":999.015137,\"magnetometer\":{\"y\":270,\"z\":-251,\"x\":185}},\"messageProperties\":{},\"enrichments\":{}}"
508,365184,5/20/2021 11:46:32 PM,,,"{\"applicationId\":\"3a11d300-d923-4a3d-9cd9-75364b23f710\",\"messageSource\":\"telemetry\",\"deviceId\":\"mymxchipbb\",\"schema\":\"default@v1\",\"templateId\":\"urn:6bccexgax:ex1fym5he\",\"enqueuedTime\":\"2021-05-20T23:46:31.997Z\",\"telemetry\":{\"accelerometer\":{\"x\":-67,\"y\":-735,\"z\":686},\"humidity\":50,\"temperature\":30.4,\"pressure\":999.022461,\"magnetometer\":{\"x\":188,\"y\":271,\"z\":-251},\"gyroscope\":{\"z\":350,\"x\":1330,\"y\":-3150}},\"messageProperties\":{},\"enrichments\":{}}","{\"applicationId\":\"3a11d300-d923-4a3d-9cd9-75364b23f710\",\"messageSource\":\"telemetry\",\"deviceId\":\"mymxchipbb\",\"schema\":\"default@v1\",\"templateId\":\"urn:6bccexgax:ex1fym5he\",\"enqueuedTime\":\"2021-05-20T23:46:31.997Z\",\"telemetry\":{\"accelerometer\":{\"x\":-67,\"y\":-735,\"z\":686},\"humidity\":50,\"temperature\":30.4,\"pressure\":999.022461,\"magnetometer\":{\"x\":188,\"y\":271,\"z\":-251},\"gyroscope\":{\"z\":350,\"x\":1330,\"y\":-3150}},\"messageProperties\":{},\"enrichments\":{}}"
- Usually if the Body column is base 64 we need to convert to string as BodyNew
Azure Synapse integrate pipeline
- Go to Azure Synapse Analytics Workspace Studio
- Go to manage
- create a new integration runtime with 16+ cores for spark processing
- Go to Develop
- Create a new data flow
- Connect to source as the storage account created with new avro file
- Create a new dataset connecting to data store
- Turn the debug on
- go to Data preview
- now add select to select columns
- Create a derived column to convert Body (if base 64) convert to string with new column name BodyNew
- Now add Parse
- Create a new column as json
- in Expression select the column as data either Body or BodyNew
- for output column type
(applicationId as string, messageSource as string, deviceId as string, schema as string, templateId as string, enqueuedTime as string, telemetry as (gyroscope as (z as float, x as float, y as float), accelerometer as (z as float, x as float, y as float), humidity as float, temperature as float, pressure as float, magnetometer as (z as float, x as float, y as float)), messageProperties as (messageProp as string), enrichments as (userSpecifiedKey as string))
- Finally use sink and store as parquet for further processing.
- Take a look at data and see if you can see the parse data like below
- Expand the telemetry column and see if you can see the sensor details
- Create a Integrate pipeline
- Create a data flow and select the above before
- Click debug
- Click and view the details
original article at Samples2021/avroprocessdf.md at main · balakreshnan/Samples2021 (github.com)