Ultra Fast Indexing, Profiling and Exploration of Unified Data

Ravishankar Nair
Mar 1 · 11 min read

We want data: We don’t care about whether data is from ten disparate sources or more. We are least bothered about data is in cloud or on premise or hybrid. We want to have the best, cheap and sustainable method to run any queries on this unified data without hitting database every time..We don't want to worry whether the tables in the underlying data sources are indexed or whether data modeling was perfect. We want field level security. We should be able to add in additional data sources when new businesses and mergers happen.…read on. We have a powerful solution. Done deal.

Every-time I meet a technology person(s) who is managing Business Intelligence in certain way, I am confronted with some valid questions given below. Hope you too are no different. This article is to help you try or propose a new architecture to really make better sense of data. One or more data lakes (cloud storage, S3 or HDFS based) and databases( 0racle, MongoDB, Hive or any such) or real time streams (Kafka) are the sources of data. The questions are:

  1. Do you know, the biggies like Cognos and Business Objects etc work on the concept of semantic layer. This acts as intermediary between data and final report/dashboard. Won’t it be fast if we get rid of this semantic layer?
  2. Well, we have Tableau with no semantic layer business. Very nice dashboards can be created quicky. Just create using Tableau desktop, deploy it in server and voila! But the cost is too much and we need skilled resources to bring out something good
  3. I want to create a dashboard from unified data. For any reporting solutions, I need to individually query the same, and relevant joins or unification happen at server even though output is small. This is very heavy especially when multiple users are running a report from lultiple sources. The concept of “composite” data source just copies the entire data from sources
  4. When a query is executed and brought data to reporting server, there is no way to query that data further or profile it unless we create as many reports and dashboard templates depending on the tool.
  5. Agility is far from dreams. In this world of data and more & more data, what we need is a tool which can function like Google. We want near real time refresh. We want to project the near real time business scenarios to big screens to see how our clientele is accessing and utilising our ecosystem

These are the common business intelligence problems today. By properly combining open source solutions, you can get very useful and appealing result from your multiple disparate data sources effectivlely. The approach desribed below alleviates the need for expensive human resources, costly software bills and accelerates the time to market. The solution is unique in the sense that I have not seen something of this sort in the industry. I have pretty good feedback from medium and large sized companies where proposed solution is really making excellent results.

Overall Architecture

Below diagram captures the essentials.

Simplest Architecture for Fastest Data Analysis

The diagram needs very minimal explanation, you get it when you see it. We put the fantastic distributed SQL query engine Presto as the fundamental layer which will be configured to connect to the data sources that we want to connect to. This is denoted as 1. Even if you are connecting to even a single data source, please put in place Presto — it makes the access from sources faster than the direct access, thanks to Presto’s ultimate parallelism within and across nodes( Note that the connection to data source from Presto is direct, not parallel — it is single threaded, but once data is within Presto memory, it is crazy fast parallelized).

Now comes the magic of integration. Often this is the piece lacking in the industry and in the use case implementations. Your are usually swayed away by some marketing tactics by some third party tools which will promise more than 100 different features. Soon you realize that your primary goal is not fully met, once you purchase them. To overcome this, please start believing in open source solutions. Allow smart engineers to craft the open source to a customized implementation that solves your primary issue. And then given it back to open source community. This loop when iterated by different organizations multiple times transforms the original open source product into a highly reliable free software which can be far better than the equivalent commercial ones. There is no Gartner needed in this modern age of open source revolution. Everything you need is out there — you just need the passion to stitch them properly like a set of logo pieces integrated into a final masterpiece.

A word of caution — if you refer to the architecture diagram above, the use of Presto to unify data from multiple sources( marked 1) is irrelevant, if you use a modern cloud storage platform like Minio as your single source of truth.( No additional data sources considered here). The reason being, Minio has a listener built-in which when configured will insert data into elastic search automatically. You can see it here. In this article, we are excluding such possibilities and assuming you have some requirement to pull data from multiple sources.

1. Presto

The super duper data unification software that has a distributed SQL query engine in built. As per the authors, Facebook uses Presto for the analysis of their 300PB data warehouse with 30K queries every day. Its being used widely across various industries — Netflix, Uber, Amazon are just few of them. You will get detailed information here.

2. Logstash

Logstash was originally developed by Jordan Sissel to handle the streaming of a large amount of data from multiple sources. Primarily the development was focused on logs, but later extended to many plugins.

In centralized logging system, a data pipeline consists of three main stages: extraction & aggregation, processing, and storage. The first two stages are traditionally the responsibility of Logstash. Obviously when large amount of data needs to be processed, the Logstash instance/node becomes a bottleneck. You have an exactly similar situation in Apache Spark too — for example, if I want to run MLlib on 1 TB of data fetched from multiple data sources, then the nodes occupy the data in memory giving very little or no space for Spark MLlib processing — causing the infamous OOM exception. I had proposed a topic named “Apriori Unification Pattern for Apache Spark” in 2016 Spark Summit (obviously Spark team did not select it :) ) to avoid this weird situation, prevalent even today which they are trying to solve by fast serialization methods. You can easily place Presto in-front of Spark, so that the heavy lifting of data is handled by Presto and Spark requires to store only unified data output from Presto. We are going to do exactly the same now — Apriori Unification Pattern (AUP) for Logstash — essentially placing Presto to do heavy lifting of unification from innumerable sources most effectively. You will basically be using the jdbc input plugin within logstash configuration. The nice thing is, the configuration comes together with a scheduler (Rufus scheduler). That means, if we know the data and the changing field, we can configure the scheduler such as to constantly poll the data sources and push the latest change to Elasticsearch at regular intervals( a kind of micro-batching). That is really wonderful — no manual interventions. Rufus scheduler mentioned above does not have a UI, but logstash gives us the ability to configure the scheduler with simple notation, similar to cron jobs in *nix systems.

3. Elasticsearch (ES)

Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents. We can send data in the form of JSON documents to Elasticsearch using the API or ingestion tools like Logstash. Elasticsearch automatically stores the original document and adds a searchable reference to the document in the cluster’s index. You can then search and retrieve the document using the Elasticsearch API. As a result you get a fully indexed fastest access to your data. The most interesting thing I like about ES is field and document level security. Presto does not have a strict security model for field level restrictions. At the most you can set who can access which schema: we eliminate this drawback by exposing the unified data ( from Presto) through ES. We should not allow anyone to use Presto querying directly (unless required). That solves an important and frequently asked requirement at enterprise implementations with respect to security.

4. Kibana

An important component, part of Elasticsearch stack that allows us to explore, visualize and discover data. Usually Elasticsearch package is denoted as “ELK” stack where E stands for Elasticsearch, L for Logstash and K for Kibana. Kibana’s allows data querying and analysis. Using various methods, users can search the data indexed in Elasticsearch (that’s important — indexing makes it ultra fast accessible which normally can be achieved in databases for certain columns only) for specific events or strings within their data for root cause analysis and diagnostics. Based on these queries, users can use Kibana’s visualization features which allow users to visualize data in a variety of different ways, using charts, tables, geographical maps and other types of visualizations. There is another tool called Grafana, but in this article we will focus on Kibana.

Code in Action

We will have two data sources to consume data from. One is a Kafka message topic named “realtime.transactionevent” and other is a MySql table “cardsdetails”. The Kafka topic brings the real time fictitious transactions happening at different shops and it gives the credit card number and the type of shop where the card is used, along with timestamp. We are interested, say, who owns each card and finally number of transactions against each card in almost real time. Unfortunately card ownership info is in MySql, hence we need to join the info from Kafka and MySql.

Accordingly, we have the following two property files in Presto. For installation of Presto and using it, either please refer to Presto site or my previous article.

Kafka and MySql property files for connection

You will need to have Zookeeper installed, unless you are using the default Zookeeper shipped with Kafka. Make sure that you have Zookeeper running:

telnet followed by stats command shows Zookeeper is running at default port 2181

Before starting Presto, lets create the topic named “realtime.transactionevent” using standard Kafka commands. Please make sure that the required components like Zookeeper are started. Refer here in case you are in difficulty.

Create topic named realtime.transactionevent in Kafka

Next step is to start Presto. You do this by executing launcher script in bin folder of your Presto installation. Wait till you get “server started” message. Once again, remember to have the relevant property files in etc/catalog directory within Presto server. Basically we have Mysql and Kafka properties

Presto startup screen

Now we need a sample program to generate events and submit to our topic named realtime.transactionevent. Here is the code snippet.

TransactionEventProducer.java

Lets run the above code. As you can see I have intentionally created a delay of 1 minute at line 61, so that events are generated slowly. In real time scenarios, this can be hundreds of thousands of events.

You can run the following command to see if our events are pushed into the topic

Topic shows that events are being pushed from our code

Now lets query the same from Presto, just to test this. Run the CLI (Command Line Interface) provided by Presto and query with simple SQL (Thats the power of Presto).

Presto accesses Kafka through simple SQL

How can you join this with MySql? If you see the Kafka message, the second filed after first pipe “|” symbol is our credit card number. By using an SQL substr function we can extract that. Then you can join with MySql to get the relevant card holder name. Here is the query:

select substr(_message, 25, 10) from kafka.realtime.transactionevent;
//Just hard-coded the start position and length to extract the card //number

OK, what will be our final query? Again, we can test it in Presto CLI

select substr(t._message, strpos(t._message, '|')+1 ,10) as cardnumber, substr(t._message, strpos(t._message, '|')+12 ,15) as purchasetype, c.cardholder from kafka.realtime.transactionevent t join mysql.cards.cardsdetails c on cardnumber = c.cardnumber;

The output is:

Unified data from MySql and Kafka

Now we got the combined data. Whenever a transaction is happening, we know who is the person behind it. We can also use the data for finding affinity of the person on categories of purchases. But we are now interested in getting this data exposed to very fast search either by categories, or by card number or by card holder. If we are executing the SQL queries, note that Presto will hit the underlying data sources every time. So, we fetch the data, put into elastic search and use Kibana to query Elasticsearch to slide, dice or query on different conditions. May be we will refresh the Elasticsearch every 10 minutes, so that we get almost real time data for our explorations.

Nextstep is to configure rest of the components. You may follow the links for Elasticsearch, Kibana and Logstash. This should be quick, as no configuration at all are needed for default things to work. The only configuration required is for logstash. We will configure logstatsh to connect to Presto and execute our previous query and directly index it to Elasticsearch. The entire data gets indexed. First, please follow the instructions at the above link and start Elasticsearch and Kibana.

Two windows showing Elasticsearch and Kibana respectively

You can go to localhost:5601 (if you have used defaults) to see below Kibana home screen, if everything is fine.

Kibana home screen

Almost we are done. See my ultrafast.conf, the conifguration file for logstash.

Logstash configuration

As you see, we are inserting to an index called medium. Please see our query and most important- the schedule. ( We have set this query to be executed every 20 seconds) . When you run this, Elasticsearch will automatically create the index called “medium” and will insert all data. Then you can query it using Kibana, by creating an index pattern in Kibana.

Next Steps

You might think — is Logstash a bottleneck when querying big data? May be, depending on how much you pull. By building an elastic search connector for Presto, you can solve this. Fortunately we have that connector already available in Presto, unfortunately it has only read capability for now. Please stay tuned until that connector is updated.

One more point, rather than Kibana, you want such a fast indexed search from let’s say a third party application. What can we do ?

Query elastic search using the Presto elastic search connector I mentioned above.

What you have now is the basic architecture for very useful application functionality for your enterprise. It might cause millions to have such software unless you rely on the open source innovation.

Enjoy! You can connect with me through LinkedIn .

Ravishankar Nair

Written by

Consultant, inspiring speaker, author and technology evangelist

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade