Empowering Data-Driven Organizations Part 2: A game-changing integration between OpenLineage and OpenMetadata

Natalie Zeller
NI Tech Blog
Published in
7 min readJun 27, 2023

This is the second post in a two-part blog post series.

In the preceding blog post, we explored the concept of data observability and showcased how Natural Intelligence uses OpenMetadata for discovery and observability of metadata in the organization.

In this post, we will reveal how we integrated OpenLineage and customized it to transmit lineage from our Spark jobs to OpenMetadata automatically. Additionally, we will explore our strategy for extracting lineage from queries executed directly via JDBC, effectively bridging the gap for queries running outside of Spark.

Discover how these combined efforts allowed us to gain increased transparency and complete visibility of the data path.

OpenLineage Meets OpenMetadata

As mentioned in the previous post, we were searching for an automated way to extract the lineage information from our Spark jobs, instead of specifying them explicitly.

Fortunately, there are tools that can simplify this task for us. To address this challenge, we leveraged OpenLineage, an open-source project that collects and analyzes data lineage from jobs at runtime. OpenLineage reports input and output datasets to a compatible backend for further study. However, OpenMetadata does not comply with the OpenLineage standard event format.

To overcome this limitation, we extended OpenLineage by adding a custom report type that transmits lineage information from OpenLineage to OpenMetadata, conforming to the OpenMetadata format. The capability to create custom transport types, a contribution from Natural Intelligence, is available starting from OpenLineage version 0.24.0.

Leveraging the integration between OpenLineage and OpenMetadata for our Spark jobs allows us to automatically generate lineage information, improving the accuracy and maintainability of our data lineage records.

Integrating OpenLineage with OpenMetadata in Two Easy Steps

Using the ‘openMetadata’ transport type with your Spark applications is straightforward. It involves two simple steps to enable your applications to write lineage to your OpenMetadata server:

  1. First, add the openlineage-spark and openlineage-openmetadata-transporter jars to your classpath.
  2. Second, when submitting your application, include the following Spark configuration parameters:
spark.openlineage.transport.type=openMetadata
spark.openlineage.transport.auth.type=api_key
spark.openlineage.transport.auth.apiKey=yourToken
spark.openlineage.transport.url=http://your-openMetadata-host
spark.openlineage.transport.pipelineName=yourPipelineName
spark.openlineage.transport.pipelineServiceUrl=http://your-airflow-host

That’s it!

Thanks to the integration of OpenLineage with OpenMetadata, we’ve gained valuable insights into the flow of data within our Spark pipelines.

Pipeline lineage graph in Open-Metadata, based on OpenLineage data

Additionally, the ‘openMetadata’ transporter reports the last modification time of output tables, which can be viewed in the ‘custom properties’ tab.

lastUpdateTime of output table

Diving into ‘openMetadata’ transporter implementation

Interested to know what’s going on behind the scenes? Here are the details, you can also find the implementation on Github.

The ‘openMetadata’ transport type reports the lineage through OpenMetadata APIs in a step-by-step process:

Initially, the pipeline service is created or updated, followed by the pipeline itself, and finally, the lineage edge is created or updated, connecting the pipeline to the input or output data asset.

Additionally, the last update time of output tables is reported, using a custom table property in OpenMetadata.

Let’s delve deeper into the code:

@Override
public void emit(@NonNull OpenLineage.RunEvent runEvent) {
try {
if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.START)) {
Set<String> inputTableNames = getTableNames(runEvent.getInputs(), LineageType.INLET);
inputTableNames.forEach(tableName -> {
sendToOpenMetadata(tableName, LineageType.INLET);
});
}

if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.COMPLETE)) {
Set<String> outputTableNames = getTableNames(runEvent.getOutputs(), LineageType.OUTLET);
outputTableNames.forEach(tableName -> {
sendToOpenMetadata(tableName, LineageType.OUTLET);
});
}
} catch (Exception e) {
log.error("failed to emit event to OpenMetadata: {}", e.getMessage(), e);
}
}

The emit method gets the RunEvent generated by OpenLineage, extracts the required information and builds events to be sent to OpenMetadata.

Table names are extracted from the event and a cache mechanism is used to ensure that each table is processed only once. We then retrieve each table’s ID from OpenMetadata for table recognition and lineage creation. Lastly, we send a PUT request to either update or create the pipeline service in OpenMetadata, assuming Airflow as the pipeline service type.

Next, we send another create/update request, this time for the pipeline:

public HttpPut createPipelineServiceRequest() throws Exception {
Map requestMap = new HashMap<>();
requestMap.put("name", pipelineServiceName);
requestMap.put("serviceType", "Airflow");

Map connectionConfig = new HashMap<>();
connectionConfig.put("config", new HashMap<String, String>() {{
put("type", "Airflow");
put("hostPort", pipelineServiceUrl);
}});
requestMap.put("connection", connectionConfig);
String jsonRequest = toJsonString(requestMap);
return createPutRequest("/api/v1/services/pipelineServices", jsonRequest);
}

public HttpPut createPipelineRequest() throws Exception {
Map requestMap = new HashMap<>();
requestMap.put("name", pipelineName);
requestMap.put("pipelineUrl", pipelineUrl);

if (pipelineDescription != null && !pipelineDescription.isEmpty()) {
requestMap.put("description", pipelineDescription);
}
requestMap.put("service", pipelineServiceName);
String jsonRequest = toJsonString(requestMap);
return createPutRequest("/api/v1/pipelines", jsonRequest);
}

After getting the pipeline id from the pipeline request, a lineage edge request is generated, connecting between the pipeline id and the table id.

Lastly, a custom property named “lastUpdateTime” is updated for output tables. If the custom property doesn’t exist for the table entity, it will be created.

public HttpPut createLineageRequest(String pipelineId, String tableId, LineageType lineageType) throws Exception {
Map edgeMap = new HashMap<>();
if (lineageType == LineageType.OUTLET) {
edgeMap.put("fromEntity", createEntityMap("pipeline", pipelineId));
edgeMap.put("toEntity", createEntityMap("table", tableId));
} else {
edgeMap.put("toEntity", createEntityMap("pipeline", pipelineId));
edgeMap.put("fromEntity", createEntityMap("table", tableId));
}

Map requestMap = new HashMap<>();
requestMap.put("edge", edgeMap);

String jsonRequest = toJsonString(requestMap);
return createPutRequest("/api/v1/lineage", jsonRequest);
}

private void updateTableLastUpdateTime(String tableId, String tableName) {
try {
String url = this.uri + "/api/v1/tables/" + tableId;
String currentTime = LocalDateTime.now() + " UTC";
String jsonPatchPayload = "[" +
" {" +
" \"op\": \"add\"," +
" \"path\": \"/extension\"," +
" \"value\": {" +
" \"lastUpdateTime\": \"" + currentTime + "\"" +
" }" +
" }" +
"]";
createPatchRequest(url, jsonPatchPayload);
} catch (Exception e) {
log.error("Failed to update last update time in OpenMetadata for table {} due to error: {}", tableName, e.getMessage(), e);
}
}

Extract lineage from JDBC queries using java instrumentation

Although the solution described above handles Spark executions, it doesn’t capture direct table updates via JDBC connection, such as when we use Spark ETLs to transfer data from the data lake to Mysql/Redshift through JDBC-executed prepared statements.

To handle this use case, we’ve created a java agent that intercepts JDBC queries as they are executed and parses them using a 3rd party library, wrapped by OpenLineage.

After the query is parsed and analyzed, input and output tables are identified and the lineage is sent to OpenMetadata using the custom transport code we’ve created above.

Getting this agent set up with your applications is a two-step process:

  1. Include the following jars in your classpath: openlineage-sql-java and openlineage-openmetadata-transporter.

2. Specify the openlineage-openmetadata-transporter jar file as a java agent, with the relevant transport configuration added as follows:

-javaagent:/path/to/openlineage-openmetadata-transporter.jar=transport.pipelineServiceUrl=http://my-airflow-host,transport.auth.apiKey=myJwtToken,transport.pipelineName=my-pipeline,transport.url=http://my-openMetadata-host'

The configuration parameters will be used for invoking the ‘openMetadata’ transporter.

After running the application, you should be able to see the lineage for tables processed through JDBC connection as well.

How is it done?

Let’s explore the code that generates this functionality.

Within our agent code, the premain method creates an OpenMetadataTransport instance based on the specified parameters, and two interceptors — one for handling Mysql prepared statements, and the other for handling Redshift prepared statements.

private static OpenMetadataTransport openMetadataTransport;
private static final String MYSQL_CLASS_NAME = "com.mysql.cj.jdbc.ClientPreparedStatement";
private static final String REDSHIFT_CLASS_NAME = "com.amazon.jdbc.common.SPreparedStatement";

public static void premain(String agentArgs, Instrumentation inst) {

try {
generateOpenMetadataTransport(agentArgs);
attachJdbcLineageAdvice(inst, MYSQL_CLASS_NAME);
attachJdbcLineageAdvice(inst, REDSHIFT_CLASS_NAME);

} catch (Throwable e) {
System.out.println("Failed to create jdbc query transformer");
}
}

Using ByteBuddy, advice is applied on the relevant PreparedStatement implementations.

This advice is configured to run upon invocation of the executeUpdate, executeQuery and execute methods of the relevant classes.

private static void attachJdbcLineageAdvice(Instrumentation inst, String className) {
new AgentBuilder.Default()
.type(named(className))
.transform((builder, typeDescription, classLoader, javaModule, protectionDomain) ->
builder
.method(named("executeUpdate"))
.intercept(Advice.to(OpenMetadataJdbcLineageAdvice.class))
.method(named("executeQuery"))
.intercept(Advice.to(OpenMetadataJdbcLineageAdvice.class))
.method(named("execute"))
.intercept(Advice.to(OpenMetadataJdbcLineageAdvice.class))
).installOn(inst);
}

The OpenMetadataJdbcLineageAdvice class is designed to parse JDBC queries running on Redshift or Mysql DB, extract the input and output tables and send the lineage information to OpenMetadata.

The method annotated with @Advice.OnMethodExit is invoked whenever one of the intercepted methods has finished executing.

private static final String REDSHIFT = "redshift";
private static final String MYSQL = "mysql";

@Advice.OnMethodExit
public static void onExit(@Advice.This PreparedStatement statement) {
try {
String dialect = extractDialect(statement);
if (dialect == null) {
return;
}

String sql = extractSql(dialect, statement);
if (sql == null) {
return;
}

Optional<SqlMeta> sqlMetaOpt = OpenLineageSql.parse(Collections.singletonList(sql), dialect);
if (sqlMetaOpt.isPresent()) {
String url = extractUrl(statement);
System.out.println(String.format("Extracting lineage from %s jdbc query: %s", dialect, sql));
handleLineage(sqlMetaOpt.get(), url);
}
} catch (Throwable ex) {
System.out.println("Error occurred when trying to extract lineage from JDBC query: " + ex.getMessage());
}
}

The sql query is extracted from the prepared statement. Each dialect is handled differently, according to its implementing class.

After we get the sql query, OpenLineageSql.parse is invoked for parsing the query and getting the input and output tables. OpenLineageSql is a class provided by OpenLineage, which wraps the external Rust library that parses sql queries.

public static void handleLineage(SqlMeta sqlMeta, String url) {
OpenMetadataTransport openMetadataTransport = OpenMetdataJdbcAgent.getOpenMetadataTransport();
String dbName = openMetadataTransport.extractDbNameFromUrl(url.replace("jdbc:", ""));

if (sqlMeta.inTables() != null && !sqlMeta.inTables().isEmpty()) {
transportLineageToOpenMetadata(OpenMetadataTransport.LineageType.INLET, sqlMeta.inTables(), dbName, openMetadataTransport);
}

if (sqlMeta.outTables() != null && !sqlMeta.outTables().isEmpty()) {
transportLineageToOpenMetadata(OpenMetadataTransport.LineageType.OUTLET, sqlMeta.outTables(), dbName, openMetadataTransport);
}
}

public static void transportLineageToOpenMetadata(OpenMetadataTransport.LineageType lineageType, List<DbTableMeta> tables,
String dbName, OpenMetadataTransport openMetadataTransport) {
tables.forEach(table -> {
String fullTableName = Optional.ofNullable(table.database())
.map(db -> db + ".")
.orElseGet(() -> Optional.ofNullable(dbName)
.map(db -> db + ".")
.orElse("")) + table.name();
openMetadataTransport.sendToOpenMetadata(fullTableName, lineageType);
});
}

Now that we have the input and output tables, we can create a lineage request that connects them to the relevant pipeline in OpenMetadata, using OpenMetadataTransport.

The java agent implementation can also be found on Github.

Data lineage unlocked

By enhancing OpenLineage capabilities, we can now see the full picture in OpenMetadata. Customizing the transport type allows us to utilize the analysis done by OpenLineage, enriching OpenMetadata with vital data concerning pipelines’ input and output tables.

Using instrumentation to intercept JDBC queries addresses the use case of queries running outside of Spark, offering a comprehensive solution.

With this approach, the dependencies and relationships between pipelines and tables become readily visible. This clarity allows us to not only confidently determine which pipeline produces a table and when it was last modified, but also accurately identify the inputs of each pipeline.

The combination of OpenMetadata and OpenLineage empowers us with comprehensive insights into our data path, facilitating informed decision-making.

We invite you to explore the project on Github and give it a try!

--

--