New kid on the block, DAP meets streaming

Marek J
inganalytics.com/inganalytics
6 min readDec 17, 2020

--

The Data Analytics Platform (DAP) — developed internally by ING WBAA — is a cloud-native platform for data democratization, created to provide any ING employee working with data a modern, reliable and feature-rich environment for experimentation. It features horizontal scalability, the latest open source technologies, and end-to-end security.

In this article, we are going to explain and explore the go-to ING platform’s streaming ingestion capabilities while gaining knowledge on the basics of the process.

Nowadays businesses receive huge volumes of data in a constant manner and need to act and make decisions fast. Streaming processing has been on the rise in recent years, often replacing more traditional, batch-based ETLs or being the second leg of an infamous Lambda architecture. In many scenarios a batch approach, though often easier, is less favourable. Having to wait becomes costly. Also, new architectures, like Kappa Architecture, where everything is a stream, gain popularity.

Although DAP plays a slightly different role and has different goals — in a sense that it is not an operational or a transactional system (DAP’s primary premise is data exploration, experimentation and model training), data needs to find its way to DAP in the first place. Having an option to ingest data from streaming sources facilitates onboarding new data sources which

  • favour or already support streaming channels (e.g. they emit events to event buses)
  • need a lower latency
  • don’t fit well into a batch approach

Streaming ingestion could also enable real-time¹ analytics. DAP now supports streaming ingestion!

Before deep-diving into that, let’s take a step back and cover some basic ideas behind data ingestion, understand the relevant technical vocabulary, and find out what’s the difference between batch processing and streaming processing.

Data ingestion

Data ingestion is a transportation of data sources to a storage medium where it can be accessed, used and analyzed — the essence of DAP. It comes in a variety of forms, with batch processing being the most common. However, streaming processing is gaining momentum.

Batch vs Streaming

So, what is batch processing? A batch is a collection of records, events or data points grouped together within a time interval, for example, all transactions for the given day. Batches are collected periodically (usually on a schedule, e.g. every morning) and feed analytic systems. Batch processing works well when real-time analytics and data freshness is not a primary requirement and a significant delay, e.g. 24 hours, is acceptable. So far, most of the data sources on DAP are ingested in this manner.

Streaming processing, on the other hand, deals with continuous data and feeds analytic systems with data as they arrive². The latency is typically much lower and near real-time analytics is possible.

Note: to make the terminology a little bit confusing many streaming processors, including Spark Structured Streaming, name their processing engines micro-batch, which reflects internal implementation.

Both approaches have their use cases and they often co-exist.

Why Spark Structured Streaming?

There are many streaming engines available on the market to choose from, most notably Apache Flink (“true” streaming processing engine), Kafka Streams (technically a library) and Spark Structured Streaming (micro-batch). They offer high throughput, low latency and fault-tolerance. For DAP we’ve decided to adopt the Spark Structured Streaming. This decision was backed up by many factors, including

  • Apache Spark is ubiquitous in DAP — it’s our daily bread and butter
  • We can leverage the existing Spark/YARN infrastructure and know-how
  • For our use-cases, very low data latency, measured in milliseconds, is not critical

Note: Spark actually offers two ways to handle streaming. An older implementation, powered by Spark RDDs and DStream API, and a newer Spark Structured Streaming with Dataset/Dataframe API. Spark Streaming provides a flexible, but low-level API, whereas Spark Structured Streaming is built on top of Spark SQL and provides familiar Spark SQL functions and Dataset/Dataframe API known from batch processing. To make it even more confusing, Spark Structured Streaming also introduced an experimental low latency processing mode, called Continuous Processing (not applicable in our use cases)

Input sources

Streaming ingestion needs a source to read from. We currently support two primary input sources:

File source — reads files as a stream of data. All Spark built-in file formats are supported, i.e. CSV, JSON, ORC, Parquet.

Kafka source — reads data from Kafka. By default, messages received from Kafka are just opaque byte arrays. Our streaming engine can deserialize these messages from the following popular data formats:

We also provide an option to leave messages intact, in a serialized form, and automatic message decryption, if needed.

For all formats, we store message schemas in a schema registry. Schemas can include additional metadata, which can be presented in our data discovery service³.

Sinks

Having a source is just the first step. Data needs to be persisted somewhere. Streaming ingestion on DAP saves data from sources to table sinks, with a punch that these are not boring Hive tables anymore! Along with the streaming ingestion, we’ve introduced some new, modern table formats, which are better suited for streaming processing (but are also great for huge analytics datasets). Those are expected to work better with cloud storage in a post-Hadoop world and have some more advanced features. These new formats are Apache Iceberg and Delta Lake. Although we support both Iceberg and Delta table formats, our streaming processes use Iceberg to have seamless integration with DAP’s other offerings, like Superset, Presto and Amundsen (our Data Discovery Service).

Worried about the new table formats? No need! We still register them in Hive Metastore, so you can maintain the same workflow as with regular Hive tables⁴ and you can query them using the familiar Superset/Presto duo.

Additional features

Ingesting data is not necessarily just moving data from one location or medium into another. If this is all you need, go for it, but you could potentially get an added value from this process as well.

Streaming ingestion on DAP offers some additional features for all supported input sources

  • optional meta columns — for File source it is just an additional file name column and for Kafka, we can extend your tables with Kafka-specific data like offset or topic
  • typecasting (converting one data type into another) — many Kafka messages (and resulting files) at ING have all-strings types when in transit. It becomes cumbersome to work with this data because you have to cast these strings to proper types yourself. Provided a valid schema our streaming ingestion can do it for you automatically
  • transformations — during the ingestion we can transform the source data, e.g. perform some non-standard or more complicated typecasting, or apply some Spark SQL functions
  • calculated columns — we can derive new columns from existing ones³, e.g. add new columns you want to partition by
  • skipped columns — we can remove from the target table some redundant columns³ ⁵
  • partition discovery — for File sources, we can detect partitioning scheme in file paths⁶ and extend your target tables with these partitioning columns
  • discovery — the resulting tables are searchable in Amundsen. The more metadata you provide in schemas, the more useful/searchable it gets³

The streaming ingestion capabilities of DAP are being built and worked on by our Data Assets team — responsible for onboarding new data sources while developing and maintaining a generic ingestion pipeline. All this work wouldn’t have been accomplished without them so I would like to end this blog with a shout out to my team.

If you wish to learn more about ING WBAA, DAP, Data Assets, feel free to send an email here.

[1] Throughout this article, I use a real-time term not in a strict computer science sense, but as a vague, lay term

[2] It does not necessarily mean that it must act on every incoming event immediately

[3] Currently limited to Avro schemas only

[4] Only some additional properties in Spark session are needed

[5] Applicable to formats where fields are positional, e.g. CSV

[6] For example /year=2020/month=12/day=10/file.parquet

--

--