Entity Resolution using Google Cloud Dataflow

Photo by George Prentzas on Unsplash

Summary

This article illustrates how we modernized a client’s data platform by implementing an entity resolution pipeline. We tied disparate data sets together and performed customer matching to create a Golden Customer Record using Google Cloud Dataflow. The solution enabled their Marketing/Analytics teams to derive valuable insights about their customers, make better-informed decisions for marketing campaigns, and explore new ways to improve customer experience/retention.

Preface

Entity resolution (ER), otherwise referred to as record linkage or data matching, is the process of disambiguating — identifying, matching, and merging — different manifestations of the same real-world entity across disparate data sources.

For instance, a business that maintains customer information may store multiple data records referring to the same customer across several systems. These records can contain different full names (legal name vs. preferred or maiden name), email addresses (personal vs. business), or phone numbers (primary vs. secondary). Determining how to group each of these individual records that correspond to the same individual is the challenge that entity resolution solves.

Technology has revolutionized the way businesses operate. With continuous feeds of data, including online transactions, customer profiles, IoT devices, subscriptions, and many more, enterprises have the ability to capitalize on this abundance of information to gain insights and increase their profitability. These siloed sets of information are often stored in separate data stores, making it difficult to generate connections and obtain a holistic 360° view. Entity resolution is the means through which the disambiguation of this data is achieved.

Real-world scenario: Rick’s Coffee Shop

Let’s use Rick’s coffee shop to illustrate an example. The coffee shop begins with a single brick-and-mortar store where he sells his blend of coffee. As his shop picks up popularity and many repeat customers, a loyalty rewards program gets implemented to incentivize customers to stay loyal and collect points for discounts and other promotions. Customers sign up for the loyalty program by providing their name, email, and phone number in-store. This information, along with subsequent transactions, is logged and stored within the loyalty database.

Soon after, the popularity of Rick’s special blend of coffee justifies a packaged offering that customers can purchase online for those who want to enjoy the coffee from the comfort of their homes. This results in an online store that stores e-commerce user-profiles and payment information in the web platform database.

Social media's importance prompts the shop to strengthen its online presence through various online platforms. This is also an easy way to connect with customers. The user information and activity with the coffee shop’s social media account are stored in the platform’s databases and are accessible through their APIs.

Digital Manifestations vs. Real-world Entity

In this common example, we can extract the following records that correspond to a single entity/customer:

  1. Payment information from in-store purchases
  2. Loyalty account information
  3. E-commerce customer information
  4. Social media following

The value of joining these records together to create a “single pane of glass” view can transform and increase businesses' longevity. For example, analysts can use insights into consumers' spending behaviour and patterns to segment customers that drive marketing campaigns. While this matching process may seem trivial to humans, this is not the case for machines.

Silos

Silos are one of the key challenges of entity resolution. Typically, systems are designed and developed over many years. With newer and ever-changing technologies, the possibility of having many disparate systems is very high. As a result, it is cumbersome to join all of the data across these sources to generate a unified view. However, this may be simplified by the existence of a data lake or warehouse, which acts as a centralized repository.

Definition of “a match”

Assuming you can aggregate the data across disparate systems into a pipeline, determining which fields to join is non-trivial. If a globally unique entity identifier exists across all data sources, the matching process is as simple as joining on that identifier. However, it is improbable that this unique identifier exists in every single source. In the absence of entity identifiers to link the same entity across the disparate systems, it is up to the business to determine which attributes are suitable to match against. Every enterprise has its own definition of what is considered a “match”.

Data Quality

The quality of data can make or break the integrity of matching pipelines. There is no guarantee that one data source attribute will appear exactly the same in another data source due to incomplete information, duplicated entries or a lack of standardization across source systems. This typically results from data entry errors, missing values, inconsistent formatting, a lack of data validation, or changing data. Inaccurate data can yield missed matches (false positives/false negatives).

Scalability

There is a direct correlation between the amount of data a business has and the data pipeline's efficiency. Entity resolution solutions must utilize parallel processing frameworks, such as MapReduce, to conduct the matching process efficiently. Also, it is paramount to ensure that enough workers are present to match the load based on the amount of data being processed.

Enter Google Cloud Dataflow

Apache Beam + Google Cloud Dataflow

Google Cloud Dataflow addresses the challenges of processing data for entity resolution pipelines. It is a fully managed service for running data pipelines, including provisioning, management, and horizontal autoscaling of resources. It is built on top of the Apache Beam SDK, a unified data processing framework for batch and streaming workloads.

  1. Silos: The framework offers various I/O transforms which allow pipelines to connect to file-based, database, streaming (PubSub), and other data sources.
  2. Definition of “a match”: Since the pipeline is developed using the SDK, data engineers can write business logic for identifying matches via a configuration file passed in at runtime used to build the Dataflow pipeline dynamically.
  3. Data Quality: The pipeline can perform parallel pre-processing steps to address the data quality before matching.
  4. Scalability: One of the key offerings of the Dataflow platform is its autoscaling capability. The pipeline will intelligently scale based on throughput (or other characteristics) to increase or decrease worker instances based on the workload dynamically.

The Ask

We engaged with a retail client to build a cost-effective and scalable customer matching pipeline to empower their analytics team, who initially focused on designing and implementing new marketing campaigns. The client had a strong retail presence through their stores and was growing their online presence. Some of the information stored across their systems included loyalty information, e-commerce user profiles, email subscription information, and transactions. The goal was to create a unified “Golden Customer Record” containing all relevant information for each customer. Analysts would use it to understand spending behaviours, preferences, and other valuable characteristics for targeted marketing campaigns.

Solution

High-Level Architecture

The client had started their cloud adoption journey on Google Cloud Platform by extracting data from source systems into Google Cloud Storage (GCS) which was then transformed and loaded into BigQuery. To remain within the GCP ecosystem and take advantage of its benefits, our solution was the implementation of a Google Cloud Dataflow pipeline that consisted of five components:

  1. Extract
  2. Pre-process
  3. Partition
  4. Entity Resolution
  5. Load

Using “Alice Coots” as our customer, we’ll illustrate the components of the pipeline via an example with three source systems:

  1. Loyalty Membership Information: Stores loyalty information when signing up in-store. The following fields are stored: first name, last name, address, email, and phone number.
  2. E-commerce User Profiles: Stores user profile information when purchasing through the online store. The following fields are stored: full name, address line 1, address line 2, city, province, postal code, email, phone number.
  3. Email Subscriptions: Stores email addresses when customers sign up for newsletters, coupons, and promotions. The following fields are stored: full name, email.

Extract

The first step in the pipeline was to extract various representations of real-world entities from each data source. The Apache Beam SDK provides several connectors that can be used to connect and extract data from various sources. BigQuery and GCS were the primary sources used in this solution.

Note: A prerequisite to this step is schema matching, which is the exercise of identifying all of the database tables and attributes from each source that refers to the same information that Apache Beam can use in the matching process.

The pipeline was dynamically generated based on a configuration file passed in at runtime. The contents of the file included the information/queries from the schema matching process, generating the PCollections for each data source dynamically.

Example:

Extract

In this example, data is extracted from three sources into separate PCollections, one for each data source: Loyalty Members, E-commerce User Profiles, and Email Subscriptions. There are a total of six customer records for Alice and Bob.

Pre-process

Once the pipeline extracted data from each source, the next step was to clean and standardize that data. This was to ensure that the matching attributes in each of the sources have the same structure and format. This improved the quality of data to be matched.

Steps:

  1. Remove unwanted characters/words: whitespaces, commas, colons, semicolons, periods, hashes, quotes, brackets, and stop words (words that don’t contain any relevant information for the matching process).
  • This was applied to phone number fields in which some phone numbers contained brackets, hyphens, or country codes, while others did not.

2. Standardize to lowercase: All strings were standardized to lowercase to avoid mismatches due to case sensitivity.

3. Parse values into granular components: This step involved segmenting larger column values into several smaller attributes.

  • Address columns were separated into Street Number, Street Name, Street Type, Direction, Apartment/Unit Number, City, State/Province, ZIP Code/Postal Code.
  • Name columns were separated into First Name, Middle Name, Last Name. This was done using the HumanName Python library.
  • Telephone numbers were separated into Area Code, Number, Extension.

4. Validate correctness and extract missing information: The pipeline performed this step to ensure that the values contained within columns are, in fact, correct. For instance, validating that a given address is valid within the specified region using a database containing all known addresses in a country. This can be extended to look up a ZIP code/postal code for a given address used to determine matches.

Example:

Pre-process

In this example, the records are cleansed and standardized since their representations differ between the source systems. For example, the loyalty membership records store the first name and last name in separate columns. However, the E-commerce user profile records contain a full name in a single column. The standardization process parses the values into granular components to address this difference.

Partition

The next step was to partition the data (otherwise known as indexing or blocking) with the pre-processed data. The objective of partitioning was to group records based on a field to reduce the load when creating a graph via the networkx library (the usage of the library is explained further in the Entity Resolution section below). Our pipeline partitioned the records based on the first letter of the customer’s name.

Note: The implication of partitioning on the first letter of the customer’s name is that the pipeline will not match any records that are not in the same partition. If this does not suffice, other blocking techniques using phonetic encoding algorithms, such as Soundex or Phonex, can be used.

Example:

Partition

In this example, the PCollection containing the pre-processed records is split into separate PCollections based on the first letter of the first name (i.e., Alice’s records are contained within the PCollection for records where the initial of the first name is ‘A’).

Entity Resolution

Within each partition, the records were put through a deterministic matching process — using personally identifiable information (PII), such as name, phone number, and email.

Note: If the pre-processing step cannot guarantee high-quality data, deterministic matching may not be sufficient. In this case, probabilistic matching techniques that utilize statistical approaches to measure the similarity of two records are preferred. However, probabilistic matching can lead to false positives such that two records have matched when in fact, they correspond to two different entities. The client decided to prioritize accuracy in the solution by choosing to perform deterministic matching solely.

The entity resolution process consisted of the following operations for each partition:

Step 1 — Restructure the data into key-value pairs to prepare for performing a hard match on the key of a record

Apply a FlatMap to output records with the following:

  • key: The field(s) within the record to match (Example: Name + Phone Number).
  • value: The full record.

Example:

Alice’s records are transformed into key, value pairs for the following matching rules:

  • Name & Phone Number — “alicecoots1052455088” is the concatenation of the first name, last name, and phone number.
  • Email — alice@answers.com and alice@shared.com are the email addresses used by Alice.

Step 2 — Perform hard matches in parallel

Apply a GroupByKey against the PCollections from the previous step, which will produce the matches for each matching rule.

Example:

Alice’s records are grouped based on the matching rules.

Step 3 — Output the group of matches using connected components

Apply a GroupByKey transform to aggregate all matches to a single worker per partition from which a Graph was generated using the networkx library. A node in the graph corresponds to a record, and an edge represents a match between two records. With the generated graph, we used the connected_components function to output each group of matches.

Example:

Alice’s records have been grouped as a result of the connected components API.

What was the purpose of the networkx library?

We used the networkx library to resolve transitive matches. In our example with Alice, she has three records: loyalty profile (L1), e-commerce user profile (W1), and email subscription (E1). L1 and W1 match on Name & Phone Number. W1 and E1 match on Email. However, there is no match between L1 and E1. Therefore, we need a step to consolidate these matches as depicted in the following diagram:

The networkx library provides functions for creating graphs via an iterable, such as an array. Once a graph is created, the connected_components function returns a list of groups for a graph. Therefore, we aggregated the matches onto a single worker (for each partition) to yield the connected components. Partitioning is foundational for this step in the pipeline since we are consolidating all records onto a single worker to perform the connected_compoents operation. In the absence of these partitions, we would have to create a single graph to resolve transitive matches for all records on a single worker rather than creating multiple smaller graphs based on the first letter of the first name, which can result in hot keys.

Load

The final step in the pipeline was to write the Golden Customer Records into BigQuery. This was done using Apache Beam’s BigQuery I/O connector. As a result, the consolidated view of each customer was available in BigQuery for analysis and reporting.

Load

Conclusion

In summary, this article showcased how we leveraged Google Cloud Dataflow to build a scalable entity resolution pipeline that advanced a retail client’s data analytics platform. This pipeline utilized Apache Beam’s I/O connectors to extract data from each of the client’s disparate sources. It then used Apache Beam’s transforms to pre-process and partition that data before performing data matching to generate a Golden Customer Record for each individual customer. With this, the client gained a holistic view of a customer, including all of their contact information, transaction history, and product preferences. This allowed them to analyze their customers’ spending habits, segment these customers based on common characteristics, and strategize/execute personalized marketing campaigns to improve their customer experience and increase their revenue.

Slalom Technology

For perspective and analysis on everything IT: cloud…