Solving Data Quality in Streaming Data Flows

Vinu Kumar
Mar 8 · 8 min read

“Data is the new oil. It’s valuable, but if unrefined it cannot really be used. It has to be changed into gas, plastic, chemicals, etc to create a valuable entity that drives profitable activity; so must data be broken down, analyzed for it to have value.” — Clive Humby (UK Mathematician and architect of Tesco’s Clubcard)

Clive Humby first coined the term “Data is the new oil” in 2006. The chaos in data always existed and the struggle from businesses to extract meaningful information from this data is universal. With the computing capability to process more data than we can handle, 90% of data we receive is unstructured. Only 0.5% of the data we receive is ever analysed and used.

  • How will it fit into the Data Quality Dimensions like accuracy, completeness, consistency, timeliness, availability, and fitness for use
Figure 1. Data Quality Tracking — Manual
  • Early warning and actionable detection of outliers and anomalies on a stage-by-stage basis throughout the pipeline.
  • Rich in-stream data cleansing and preparation capabilities to ensure timely delivery of consumption-ready data
  • A large number of built-in integrations with source and destination systems
Figure 2: Automated Data Quality Check using StreamSets
  1. Apache Griffin runs in spark collects the quality metrics and publish it into ElasticSearch. This process can be extended to email someone when a quality check fails or doesn’t meet the threshold while the rules are embedded in the Spark cluster as a JSON. Griffin supports data profiling, accuracy and anomaly detection.
  2. Kafka is the intermediary broker layer that enables the streaming process. This topic could also be tapped into by other consumers that are interested in the raw data
Tip 2: To use a persistentVolume, just create a persistentVolumeClaimExample---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: ss-data
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
volumes:
       - name: data
          persistentVolumeClaim:
           claimName: ss-data
           readOnly: false
Figure 3: StreamSets Data Collector
Figure 4: Kafka Consumer

Step 1 — Prepare configuration files:

Griffin env file:

{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs:///griffin/checkpoint",
    "batch.interval": "20s",
    "process.interval": "1m",
    "init.clear": true,
    "config": {
      "spark.default.parallelism": 4,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },
  "sinks": [
    {
      "type": "console"
    },
    {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/persist"
      }
    },
    {
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://<Elastic Search IP>:9200/griffin/accuracy"
      }
    }
  ],
  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "<Zookeeper IP>:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}
{
  "name": "customer_data_accu",
  "process.type": "streaming",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connectors": [
        {
          "type": "kafka",
          "version": "1.0",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "<Kafka IP>:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "source",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/source",
        "info.path": "source",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": [
          "-5m",
          "0"
        ],
        "updatable": true
      }
    },
    {
      "name": "tgt",
      "connectors": [
        {
          "type": "hive",
          "version": "2.1",
          "config": {
            "database": "default",
            "table.name": "customers_country"
          }
        }
      ],
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs:///griffin/streaming/dump/target",
        "info.path": "target",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": [
          "-1m",
          "0"
        ]
      }
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "src.id = tgt.id AND src.first = tgt.first AND src.last = tgt.last ",
        "details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out": [
          {
            "type": "metric",
            "name": "accu"
          },
          {
            "type": "record",
            "name": "missRecords"
          }
        ]
      }
    ]
  },
  "sinks": [
    "CONSOLE",
    "HDFS",
    "ELASTICSEARCH"
  ]
}
"rule": "src.id = tgt.id AND src.first = tgt.first AND src.last = tgt.last "
“dq.type”: “accuracy”
Figure 5: Dataproc Cluster
CREATE EXTERNAL TABLE `customers_country`(
  `id` STRING,
  `email`     STRING,
  `first`           STRING,
  `last`  STRING,
  `country`    STRING,
  `created_at` string )
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
LOCATION
  'hdfs:///griffin/data/customers/customers_country';
LOAD DATA LOCAL INPATH 'customers_country.csv' INTO TABLE customers_country ;

Step 3 — Setup a Spark Job

Figure 6: Spark Job creation through Dataproc Jobs
9/02/19 05:39:45 INFO org.spark_project.jetty.util.log: Logging initialized @5135ms
19/02/19 05:39:45 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
19/02/19 05:39:45 INFO org.spark_project.jetty.server.Server: Started @5239ms
19/02/19 05:39:45 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@5371d1e7{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/02/19 05:39:45 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at cluster-griffin-m/10.152.0.20:8032
19/02/19 05:39:49 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1550554583894_0001
In Hive:
183,verdie12@hotmail.com,Angelina,Stiedemann,2014-12-13T09:16:07.580Z,Samoa
Figure 7: ElasticSearch displaying quality statistics from Apache Griffin
Figure 8: DataOps Enabled Platform

HorizonX

We’re a team of passionate, expert and customer-obsessed practitioners, focusing on innovation and invention on our customer’s behalf.

50

50 claps
Vinu Kumar

Written by

Chief Technologist at HorizonX, Google Cloud Certified Data Engineer, Google Cloud Certified Architect, Consultant

HorizonX

HorizonX

We’re a team of passionate, expert and customer-obsessed practitioners, focusing on innovation and invention on our customer’s behalf.