Navigating data lakes using Atlas

Lineage of date in Apache Atlas

Nowadays almost every company wants to have their own Big Data system to analyse client behaviour and optimise operating costs. One of the most popular solutions for implementing such systems is a Data Lake based on the Hadoop ecosystem. If you don’t know what exactly a Data Lake is, you can read about it in our another post Diving in the data lake.
When you already have such a system, you put more and more data into it and the lake grows and becomes deeper. At some depths, there is data that no one has ever looked at. Or, even worse, no one knows what this data is or where it comes from. Imagine you wanted to dive into such a lake: how helpful would a simple map or a shiny Atlas be?

TL;DR

To achieve lineage of data in the Atlas for sample Spark application you should follow:

  • clone repo:
git clone https://github.com/VirtusLab/data_lake_navigation_atlas.git
  • run HDP sandbox
  • configure Atlas
  • sbt run --create
  • log into atlas (default credentials for hdp sandbox are ‘admin’/’admin’)
  • in a search box look for the phrase ‘rfc7540’
  • choose one of the links and you should get lineage diagram for the source file

What is Atlas

Apache Atlas shows you where your data comes from, how it can be transformed, and what the artefacts of those transformations are. It is a metadata management service created for the Hadoop ecosystem that can create a lineage diagram for each piece of data in your lake. Atlas is a relatively young and rapidly developing project. Unfortunately, it supports only a small part of the Hadoop ecosystem. Support for other services will be added incrementally, but don’t worry; you can use the REST API or the Kafka API to write your own integration with Atlas. In this post I will try to show you how to integrate Atlas with a Spark application via the REST API.

What is Spark

Apache Spark is a general-purpose cluster computing system for large-scale data processing that can be run as part of the Hadoop platform or totally separately. When it’s run on Hadoop, we get integration with some other tools like Hive, which has its own metadata store that Atlas can use to provide information about data sources and build lineage, even if SQL queries are run via Spark. However, when we want to build a Spark application, the framework itself doesn’t provide enough information. One of the biggest improvements in Spark over traditional MapReduce is the Directed Acyclic Graph (DAG). DAG is built to manage and optimise each MapReduce job. It is a lineage itself. We will use DAG as a source of information for Atlas.

Prerequisites

To reproduce each step on your local machine you have to have access to Atlas. If you don’t have any running instances, you can compile it from source, although this is time-consuming and complex. I recommend downloading and running a virtual machine with HDP Sandbox; however, with pre-installed Atlas it requires additional setup.

Atlas REST API

Atlas provides a REST API which is used by its web user interface. It can be also used to integrate with other services. The version 0.8-incubator introduced a new API, which will be used in our example. If you want to play around with the API, it provides a swagger interface. Unfortunately, there’s no proper description of objects and endpoints. Better documentation can be found on the Hortonworks Data Platform’s documentation.
However, there are also missing parts, e.g. authorization.

Authorization

Authorization is quite problematic. I couldn’t find information anywhere on how to correctly authenticate and authorize with the Atlas API and the documentation doesn’t mention anything about it. I debugged the web application’s login form and reproduced it in my application, but this is only a workaround. Maybe in the future, a more secure way to do this will be provided. I should probably use Apache Knox, but it would be a more complex solution. If you know any better way, please let me know.

I use user admin and password admin provided with the HDP sandbox.

To obtain the session you need to log in via the request:

The session is returned in a cookie with the name ATLASSESSIONID. You need to save it somewhere as it will be needed for each request. I use Postman to play around with the API and intercept the cookie from the browser.

Spark metatype model

Before we start, we need to create a model of metatypes which will describe the Spark application. I will only describe our spark_application type. For more information, take a look at the HDP2 documentation or check the repository, where you can find definitions of metatypes provided with Atlas.

The important thing is that the metatype model has changed since version 0.8. The official documentation doesn’t mention it! However, you can find better documentation at the Hortonworks data platform web page.

Generally, metatypes are similar to classes in object-oriented programs. There are several kinds of types in Atlas: enum, structure, classification (also called tag), and entity. I think that enum and tag are self explanatory. Structure and entity are very similar, the main difference being that entity may exist as an independent type to which attributes can refer. The structure must be accompanied by another metatype which is more like a private property defined in a narrow scope.

The important thing is that tag in a web application is the same as classification in the API.

We need to define the type for the Spark application as the main instance of the working process. So, we define our metatype as an entity. Each metatype has to have unique name. We start each name with a prefix formed from the name of the service it’s related to. Our entity will be called spark_application.

Like every class, a metatype can inherit attributes from super types. There are no restrictions on how many types are mixed in. We use Process, which is one predefined Atlas metatype. It gives us inputs and outputsattributes as well as name, qualifiedName and a few other attributes.

The last important things are definitions of attributes. I will add start and end time as required fields to keep information about execution time. An attribute requires at least a name and a type of field. You could use any other type here.

A definition of a Spark application written in json format looks like this:

Create metatype in atlas

Now we need to put the definition of the metatype into Atlas. To do this we need to make a POST request to the /api/atlas/v2/types/typedefs endpoint with our defined metatype wrapped in an additional object.

If you are using Postman or another similar tool, it might be required to set an additional header for POSTs (see this).

Great! Our first metatype is created!
If you want to double check, make GET call to /api/atlas/v2/types/typedef/name/spark_application.

If your type definition is wrong or you want to improve it, make a call with your new type definition, or drop it and create it again. To drop your metatypes you need to make a DELETE call to /api/atlas/v2/types/typedefs with body

Create instances of Spark application

To create an instance of the Spark application in Atlas, we need to collect all required data. To do this we create an Atlas listener which will listen for all Spark events and forward them to the aggregator actor.

We need to register our listener in the context of Spark.

Spark events don’t contain full information about sources and outputs of data. I added two events which I send from my application to the aggregator. One for inputs and another to cover information about outputs. Thanks to this, I can build full lineage in Atlas.

The aggregator actor preserves the current state of the running SparkApplication and builds a view of it. When it receives the SparkListenerApplicationEnd event, it makes several REST calls to the Atlas server with the collected data.

This simple version handles two spark events, two additional events and the response from the server.

There are more spark events which we didn’t handle in our sample. If you were listening for them, you could build a view of the entire Spark process, describing transformations on job, stage or RDD level. If you collected this data, you could build a more suitable lineage for your needs.

The first request is the authorization call to obtain a session token. With this token we make a second request to create (or update if they already exist) sources. We need to create them first because we want to reference them in the next call. To do this we POST to /api/atlas/v2/entity/bulk endpoint to bulk create or update with a json body similar to this:

In response we should get information about the updated and created objects. We will also get unique guid which can be used to search for the entity.

The third and last call is to create an instance of the Spark application entity. To do this we use the API to create one entity. Also, we need to pass inputs and outputs as attributes. Since we’ve created them in the previous step, we can use the guid from the response or use a unique attribute like qualifiedName.

Make a POST call to /api/atlas/v2/entity endpoint to create or update entity with json body:

In the response we get confirmation of creation of our application and a guid.

Now, when we log in to the Atlas web application and search for the source of our data, we should see a lineage diagram for this data.

The lineage diagram shows the input data, how they were transformed and where the results are stored.
By applying this approach you can track each piece of data in the data lake.
Thanks to that your data scientist could identify sources of truth for each data.
Also, when you want to change something in the data lake e.g. transformation or remove some data, you can estimate the impact of this changes, simply find out owners of this dependencies and notify them.

Conclusion

Atlas seems like a product that fills a gap in the Hadoop ecosystem. However, the lack of good documentation makes it hard to use and problems with naming consistency introduce additional complexity. The API should provide a way to authorize machine-to-machine and all actions should be registered as actions taken on behalf of the user who runs a process, e.g. a Spark application. I think it needs more time to become a mature product. Despite this, it is a great extensible tool to which you can add your own integration. I couldn’t imagine a modern data lake platform without it. It’s definitely worth using.

All the source code is available in the repository.