Data lineage tracking using Spline on Atlas via Event Hub

Introduction

Data lineage tracking is one of the critical requirements for organizations that are in highly regulated industries face. As a regulatory requirement these organizations need to have a lineage detail of how data flows through their systems. To process data, organization need fast and big data technologies. Spark is one of the popular tool. It is a unified analytics engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph. While there are several products that cater to building various aspects of governance, Apache Atlas is a scalable and extensible set of core foundational governance services — enabling enterprises to effectively and efficiently meet their compliance requirements within Hadoop and allows integration with the whole enterprise data ecosystem. A connector is required to track Spark SQL/DataFrame transformations and push metadata changes to Apache Atlas. Spark Atlas Connector provides basic job information. If we need to capture attribute level transformation information within the jobs , then Spline is the another option. Spline is a data lineage tracking and visualization tool for Apache Spark. Spline captures and stores lineage information from internal Spark execution plans in a lightweight, unobtrusive and easy to use manner.

Overview

Users can manage metadata in Atlas using two methods via a REST API or Messaging. For Atlas integration with Spline, in this post we have shortlisted a messaging interface that is based on Kafka. The messaging interface is particularly useful if one wishes to use a more loosely coupled integration with Atlas that could allow for better scalability, reliability etc. Atlas uses Kafka based messaging services as a notification server for communication between hooks and downstream consumers of metadata notification events. Events are written by the hooks and Atlas to different Kafka topics. Azure Event Hubs provides a Kafka endpoint that can be used by your existing Kafka based applications as an alternative to running your own Kafka cluster. Event Hubs supports Apache Kafka protocol 1.0 and later, and works with your existing Kafka applications, including MirrorMaker.

Fig 1: Atlas Spline integration with Event hub architecture diagram

This post is using Spline from within Azure Databricks, persisting the lineage information to Apache atlas using the Azure Kafka enabled Event. To implement this, below are the required steps:

1. Create Kafka enabled Eventhub
2. Configure Apache Atlas to use Event Hub
3. Upload Spline Typedefs
4. Install Spline libraries within Azure Databricks
5. Spark Code Changes
6. Using Eventhub to check message flow
7. Using Atlas UI to check Lineage

Create Kafka enabled Eventhub

As a first step, create Kafka enabled eventhub name space using article. For Atlas Spline integration, only eventhub namespace to be created not event hub. Atlas Kafka plugin reads messages from ATLAS_HOOK topic and it will be created by Spline API during run time. We would need connection string during step 2(Configure Apache Atlas to use Event Hub) and step 5(Spark Code Changes). Once eventhub name space is created, open evenhub namespace. Goto setting -> Shared access policies -> RootManageSharedAccessKey and copy “Connection string–primary key”. As shown below:

Fig: Copy connection string from Event Hub namespace

Configure Apache Atlas to use Event Hub

Apache Atlas configuration are saved in java properties style configuration. The main configuration file is atlas-application.properties which is in the conf dir at the deployed location. To add event hub configuration into Apache Atlas, we need to modify below sections of atlas-application.properties file

· Notification Configs

· JAAS Configuration

Notification Configs

Azure kafka enabled eventhub is outside Atlas, so modify atlas.notification.embedded to false. To pull messages from Eventhub, Atlas needs eventhub kafka bootstrap server name, so modify atlas.kafka.bootstrap.servers to <<eventhub namespace name>>.servicebus.windows.net:9093.

Notification Configs section will look like below after all modifications:

#########  Notification Configs  #########atlas.notification.embedded=falseatlas.kafka.bootstrap.servers=<<eventhub namespace name>>.servicebus.windows.net:9093

JAAS Configuration

Atlas hook uses JAAS configuration section named “KakfaClient” to authenticate with Kafka broker. In a typical Kafka enabled Eventhub deployment this configuration section is set to use the Username and password. Where username is set to $ConnectionString and password is connection string copied from step 1. Eventhub kafka uses protocol as SASL_SSL and mechanism as PLAIN. These values also need to be set in JAAS configuration section.

As a solution need to add/update below in atlas-application.properties to enabled in secure mode communication between Eventhub and Atlas.

#########  JAAS Configuration ########atlas.jaas.KafkaClient.loginModuleName=org.apache.kafka.common.security.plain.PlainLoginModuleatlas.jaas.KafkaClient.loginModuleControlFlag=requiredatlas.jaas.KafkaClient.option.username=$ConnectionStringatlas.jaas.KafkaClient.option.password=<<Eventhub Namespace connection string copied from step 1>>atlas.jaas.KafkaClient.option.mechanism=PLAINatlas.jaas.KafkaClient.option.protocol=SASL_SSL

Upload Spline TypeDefs

Before start harvesting spark lineage information into Atlas, Spline meta model must be uploaded into Atlas environment using the Rest API v2. To interact with the Atlas REST V2 endpoint, either use curl or tools like Postman. First download spline-meta-model.json from github . Below is a sample interaction which is used to POST the Spline type definitions in Atlas:

ATLAS_BASE_URL=https://atlas-servername:port/api/atlas/v2curl -negotiate -u reenu -X POST -H ‘Content-Type: application/json’ -H ‘Accept: application/json’ “$ATLAS_BASE_URL/types/typedefs” -d “@./spline-meta-model.json”

Note: My Atlas instance was Kerberos protected and therefore the negotiate flag was used. Request JSON is stored into spline-meta-model.json.

On successful, you can see the definition in the response as shown below:

Fig: Spline metadata upload output

Install Spline libraries within Azure Databricks

To make Spline libraries code available to databricks notebooks and jobs running on your clusters, install spline-core libraries. We need to use the Maven coordinates and install these into Azure Databricks as Maven libraries. With assumption of using Spark 2.4, as part of Spline Atlas integration only below two libraries are required

za.co.absa.spline:spline-core:0.3.6za.co.absa.spline:spline-core-spark-adapter-2.4:0.3.6za.co.absa.spline:spline-persistence-atlas:0.3.9

To add just these libraries, you need to specify “exclusions” when adding these libraries in the Databricks UI. The exclusions that we have to add are:

org.apache.spark:spark-sql-kafka-0-10_2.11:${spark.version},org.json4s:json4s-native_2.11:${json4s.version}

Spark Code Changes

Now its time to setup the Spark session configuration items in order to connect to Kafka enabled Event Hub endpoint.

System.setProperty("spline.mode", "REQUIRED")System.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory")System.setProperty("atlas.kafka.bootstrap.servers", "<<Eventhub NameSpace Name>>.servicebus.windows.net:9093")System.setProperty("atlas.kafka.hook.group.id", "atlas")System.setProperty("atlas.kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<<Eventhub Namespace connection string copied from step 1>>\";")System.setProperty("atlas.kafka.sasl.mechanism", "PLAIN")System.setProperty("atlas.kafka.security.protocol", "SASL_SSL")

Next step is enable lineage tracking for that Spark session:

import za.co.absa.spline.core.SparkLineageInitializer._spark.enableLineageTracking()

Then we run a sample query which will read csv file into dataframe and later write 2 csv files from same dataframe.

val emp = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/emp13.csv")display(emp)emp.write.format("com.databricks.spark.csv").save("/FileStore/tables/emp100.csv")emp.write.format("com.databricks.spark.csv").save("/FileStore/tables/emp101.csv")

Note: For above code snippet, I had uploaded one csv file. And emp13.csv is an output of other databricks spark job.

After successful execution of Jupiter notebook, you can find 2 new csv files(emp100.csv and emp101.csv) in Data section of Azure Databricks

Fig : Spark Job output

Using Event Hub to check message flow

Open Azure portal and open Event Hub Namespace which was created as part of step1. Goto Entities-> Event Hubs section. There you can find a new event hub with name atlas_hook. This is created by Spline apis while processing databricks job. Atlas_hook is used as Kafka topic by Atlas kafka hook to pull data from Event Hub . In overview tab of atlas_hook you can see 2 incoming messages and 2 outgoing messages, as shown below. This indicate successful retrieval of messages by Atlas hook.

Fig: Event Hub message flow

Using Atlas UI to check lineage

Now its time to check lineage information of data we processed in spark shell. Every time a spark job runs in Databricks shell, it creates a new lineage information of type spark_job. Open Apache Atlas UI. Go to search and find “spark_job” in “Search By Type” drop down. In search result, there is a new entry with name “Databricks Shell”. Open “Databricks Shell” job and you can see properties as shown below.

Fig : Spark Job lineage

To get details about lineage information, go to Lineage tab. It will provide details as shown below. You can see multiple Databricks Shell in this diagram, because emp13.csv was an output of multiple spark jobs.

Fig : Spark Job lineage

If you want to get more details about emp101.csv file, click on dbfs:/FileStore/tables/emp101.csv in above lineage information graph or search for hdfs_path in “Search By Type” drop down.

Fig : hdfs path lineage

For a multi-step spark shell job, output will be slightly different as shown below.

Fig: Complex spark job lineage

In more complex scenario, where one file is being processed by multiple Databricks shell, output will consist of details of all spark jobs, hdfs and related operations.

Recap

To capture attribute level lineage information of a spark job, we used Spline plugin to push lineage information from Databricks shell to Apache Atlas. We leveraged Kafka enabled Event Hub to work as messaging platform between Spline and Atlas. Later we used azure portal to track flow of messages from Event hub and Atlas UI for insight of lineage information.Data lineage tracking using Atlas + Spline via Event Hub