Tasted Apache Pinot and we Loved it!

Shounak Kulkarni
13 min readJul 15, 2020

--

Our team at Publicis Sapient was pioneering the personalized experience for our client and we were on look-out for a time series database to fit into the design that could hold high volume of user data over a period of time (in our case 3 months) and which can ingest faster and respond back to real time queries with very low latency. We evaluated a few products in this space and finally zeroed in on Apache Pinot.

Apache Pinot is a real time OLAP distributed datastore designed to serve queries with low latency, and supports near real-time event stream ingestion (Faster writes). We had put together a design which streams data from the web/mobile experience into Apache Kafka via a microservice. Apache Pinot natively gets plugged into Apache Kafka, which made this situation convenient and ideal. Using a microservice, we query Apache Pinot to get the insights for user such as what’s the popular vehicle in town, what are the user’s most favored vehicles, what is the user’s affinity to each vehicle and so on.

High level architecture

As this was a pilot project, we had constraints on the budget and we were conservative in the cloud infrastructure for the initial launch. With these constraints in place, we were very strict with the non-functional requirements that we targeted and had gone through multiple iterations to ensure we performance tested and tuned all the systems for production.

In this architecture, Apache Pinot is the heart of the complete system. This blog is an attempt to bring out the different tuning we performed on Apache Pinot — the NFRs that we laid for ourselves. You will see sections talking about Resiliency, Observability, improving the 95th percentile of Apache Pinot and finally concluding with our benchmarked results.

Before I dive into the journey, I want to give a big shout out to the Pinot community. They have been extremely helpful and prompt in guiding us in this journey. Pinot docs has all the information documented, this blog will mostly help you connect the dots in a better way.

The Quickstart

The Pinot documentation has a seamless quickstart guide. I have tried both, local and the Kubernetes setup. The quickstart gives you all of the required pieces to solve the puzzle right from Pinot deployment, Zookeeper, Kafka, to even generating sample data in Kafka and ingesting it into a Pinot table.

The Pinot controller provides a UI to query Pinot so you can play around with the sample data. It also has a swagger UI for the controller REST APIs, which is useful for operating a cluster.

We are working with the Kubernetes setup, hence we’ll be talking more in that frame. We used the official helm charts available on Pinot git repository. The file pinot-realtime-quickstart.yml has the configuration used to create a table in Pinot. It also has a table-config and schema file. The table config contains information for the behavior of the table and the schema file has the actual table schema.

Getting the right recipe

Once you are comfortable with the basic Pinot setup you can proceed with the table configurations. The file used for the table creation, is where we need to put the schema and tune the table configs.

Before moving on to system tuning, getting the optimal schema and efficient indexing is very important. There are a few offerings by Pinot which should be considered when creating a table schema:

  1. Multi valued column
  2. Metric field column
  3. Ingestion transform functions

In Pinot, there are a lot of variables to tune the system, such as segment size, retention of segments, table partitions, replicas per partition, number of servers and brokers, resources per server, jvm heap, etc. But don’t worry, Pinot has eased this by providing a Real-time provisioning tool, which is the perfect point to start the system tuning process.

Know your data

Before moving any further, the first thing that needs to be done is data and traffic estimation. Based on these estimates, only the actual tuning parameters will vary. If you already have the insights on the traffic behavior, then well and good, but if you don’t, then try to get a realistic view of the traffic distribution over time. The real-time provisioning tool that we will explore next is primarily based on the sample segment that we need to provide. Your traffic will decide what segment size is required and over what time it should be created.

Realtime provisioning tool

This tool takes a sample segment and the table config file as input and gives a table from which we can decide upon what should be the segment size based upon the resources that are available for Pinot.

  • periodSampleSegmentConsumed : it’s the actual time that is required to capture the data in a given segment.
  • numHosts : it’s the possible number of servers we want to deploy our table to.
  • numHours : it’s a list of hours over which we want to project the traffic so as to create segments.
  • maxUsableHostMemory : it’s the available resources that we can assign to the server.
  • retentionHours : it’s the time from segment creation time after which the segments will be purged from the servers. Provide same value in the segmentsConfig section in table config.

The output is something like this:

Memory used per hostnumHosts → 2      |3      |
numHours
12 — — — → 17.96G |10.33G |
24 — — — → NA |15.44G |
Optimal segment sizenumHosts → 2 |3 |
numHours
12 — — — → 50.55M |50.55M |
24 — — — → NA |101.11M |
Consuming memorynumHosts → 2 |3 |
numHours
12 — — — → 3.77G |2.26G |
24 — — — → NA |4.52G |

Suppose we have a 12 GB memory resource for one server.

  • Memory used per host section will tell us that 3 servers and segment creation after 12 hrs data will fit our resource limit.
  • Optimal segment size section will tell us that 50.55 MB should be the desired segment size.
  • Consuming memory section will tell us the amount of memory occupied by the consuming segments (to decide on the value of ‘-XX:MaxDirectMemorySize’ jvm option of pinot-server).

Segment creation thresholds

As data in Pinot is distributed across segments, the size and number of segments play an important role in the query performance.

There are 3 ways in which we can impose threshold for segment creation:

  1. Time bound (realtime.segment.flush.threshold.time)
  2. Row bound (realtime.segment.flush.threshold.size)
  3. Size bound (realtime.segment.flush.desired.size)

Their details on use and implementation are given here. In the first approach, we faced an issue with uneven entries in segments. As a normal trend, traffic on weekends is more than on weekdays, hence this leads to uneven segment size. Even with the second approach, uneven-size segments were created but the reason was different. As our schema consists of many columns and the possible values vary over a wide size range, meaning that the size of each entry has a wide possible size range. Due to this every n number of entries may not lead to same size data.

The third approach solved the above issues and even Pinot recommends to use it as it is more efficient and auto tunes itself based on the change in data traffic over time. To enable this threshold we are supposed to set realtime.segment.flush.threshold.size=0 (row bound).

As mentioned before, this method auto learns the amount of rows to consume before creating a segment, so to start with, it needs an initial row count. This can be supplied by ‘realtime.segment.flush.autotune.initialRows’ (default is 100K). If we have an idea about the number of rows that would fit the desired size, then it’s better to provide some value less than that. If not, then there may be excess segment creations at start until it reaches the desired value (the increment happens by 1.5X until it crosses the desired size).

Expectations from Kafka

We have the segment size and a rough idea on how much time is required for a segment to go in completion. As the source of data for Pinot is Kafka hence unless data is persisted in Pinot we cannot afford data loss in Kafka. To explain it further, the data in consuming segments resides on volatile memory so if the pod goes down data is lost and hence it needs to be consumed again. So to avoid any data loss Kafka must have all the data that currently resides in the Pinot server consuming segments. To achieve this the retention time for kafka topic must be greater than the time bound threshold for segment completion.

Table partitions

It is recommended to have over partitioning, as there are many benefits to it.

  • Target space for queries narrows down (if partition aware routing is in place)
  • Parallelism increases in consuming data from streams.
  • Consuming segments can be spanned across many server nodes.

But over partitioning comes with its drawbacks. It takes more time to create a segment (if size threshold is followed), which means the retention on the streaming tool should be more. If segment size is kept, less than too many segments will be generated, which will end up adding latency to the query. So, it’s a trade-off between number of partitions and segment size.

Memory management

Pinot is an in-memory database and there are different configurations available for consuming segments and completed segments to efficiently use off-heap memory. There are primarily two kinds of off-heap configurations available, Direct Memory or MMAP (Memory-mapped). Currently, the default value for consuming segments and completed segments is Direct Memory (this could change afterwards).

The amount of direct memory needed per Pinot server could be determined by using the real-time provisioning tool, as mentioned above.

If there is a luxury of having the required amount of primary memory per Pinot server, then one can choose to side with Direct Memory setting. On the contrary, to leverage the benefits of virtual memory mapping and avoid an out of memory issue, MMAP is best suited and is the recommended approach to use.

Real-time completed segments use the loadMode setting of the table config. This key can take in 2 values (HEAP or MMAP). And for Consuming segments, setting pinot.server.instance.realtime.alloc.offheap to true, ensures Pinot efficiently uses off-heap memory by using MMAP. On the other side, to use direct memory for consuming segment pinot.server.instance.realtime.alloc.offheap.direct must be set to true. You can find/add these configs in the configmap.yaml of a Pinot server.

Ageing of Pinot

There were more than a few things we learnt over time while figuring out Pinot. Most of them were specific to our use case, but other things were more generic and very important when we started focusing on factors such as traffic management and fault tolerance.

Replica groups

Replica groups are a complicated strategy but on a high-level, it tries to group the servers together so that each group in itself is a complete copy of data. Which means that any one of the groups can serve any query with the help of the servers present in that group. This makes sure that the query is not unnecessarily scattered over all servers.

To achieve this, the following must be satisfied:

  • no. of replicas per partition > 1
  • no. of servers = replica per partition * n, where n ∈ N.

Implementation:

put the above config in the table config file.

Partition aware routing

Suppose there are 100 users and we are tracking user activities. The Kafka topic we are ingesting from has 10 partitions, so over time the activities of a user will be spread out across all 10 partitions. Hence, all the segments must be queried for insights on a single user. But partition aware routing says that while publishing data to Kafka, or any stream, if we distribute the data over different partitions, such that all entries with a specific kind of value for a key always goes in the same partition, then it becomes easy to look for that entry. In our case, now we need to query only the segments which are related to only one partition.

Make sure to apply this partitioning technique on keys which

  • are priority predicates in the where clause of queries
  • ensure almost equal distribution of traffic over all partitions
  • have high cardinality

Implementation:

Fault tolerance

If you are going to use Pinot in a production environment, then it becomes a top priority that your Pinot cluster is fault tolerant. Here are few pointers which can help to achieve this goal:

  1. Replicating data (replicas per partition > 1)
  2. Have enough brokers, if possible enable horizontal pod auto scaling (hpa).
  3. Give enough memory to each server by consulting the output of the real-time provisioning tool to avoid OOM exceptions.
  4. Apply log rotation on component logs (a log file is generated inside the components which can grow huge and occupy the node storage)
  5. Have 3 or more Zookeeper replicas.
  6. Make sure the retention time on Kafka or an equivalent stream is more than the segment flush-time threshold. As when the server pod goes down, the data in the consuming segment is lost, so the segment consumes the data again from Kafka.
  7. Have 3 controllers for redundancy.

3 controllers setup

As mentioned above, 3 controllers are recommended, as a single controller becomes a single point of failure. Currently the charts use a persistent volume provisioner in the statefulset, which creates a unique PVC (Persistent Volume Claim) for each replica. So we must create a PVC before and then provide that for the data volume mount.

statefulset.yaml given below has the required changes, the commented code is the persistent volume provisioner and the lines below point the volume mount to the PVC created by controller-pvc.yaml. It’s helpful to review the Kubernetes documentation around persistent volumes to better understand how to set up PVCs and why they are important.

Tasting Pinot

Observability

Once everything is in place, it’s time to test it. While testing, we must be able to monitor the different components of Pinot. For that, we used prometheus as the time series database and grafana for visualization. To expose the prometheus compatible metrics, Pinot provides a exporter jar and config file but to achieve that in Kubernetes from Pinot containers there are few steps we need to do:

  • Create a docker image which puts the jar and config file in a directory, say /temp.
  • Create an empty volume and mount it to the component container.
  • Run the created image as an init container for Pinot components and mount the same volume created above to the init container temp directory.
  • Provide that path in the jvmOpts and add pod annotations in values.yaml.

This will ensure metrics are exposed for prometheus to scrape at port 8080.

Here are some metrics which are basic and very helpful:

Create a dashboard on Grafana using these metrics. Once everything is in place then we can do a small performance test on Pinot and observe how it behaves through the Grafana dashboard.

Some validations

Here are few ways to validate the features of Pinot which we have implemented:

  • Replica group: when we fire a query, some metadata is also returned with the query result. It contains a field numServersQueried, this should be equal to the number of servers present in one replica group.
  • Partition aware routing: this can be validated by checking the numSegmentsProcessed field in the response metadata. This should be equal to the number of segments created by the partition consumer of the table which has data for that particular key getting queried.

Our Benchmark

We tested the setup for 3000 queries/second and 5000 entries/second ingestion to kafka simultaneously, which was our average expected traffic. These queries are over 200 million+ entries over 45+ segments in the table.

Cluster setup

Regarding the ingestion performance (i.e. consuming entries from Kafka), it was able to consume 5000 entries per second with ease. It’s actually capable of a much higher rate, when we started Pinot in bootstrap mode, we saw an ingestion rate around 60k-70k entries/second. And this is not the limit. Before the ingestion rate grows any further, segments are getting created. While testing large size segments, we have seen the ingestion rate cross 120k entries/second on one node, which is insane!

Troubleshooting

  1. Once you do helm install for Pinot, don’t worry about the initial restarts on server, controller, and the broker. Until Zookeeper is not up and running, other pods will keep on failing. Once Zookeeper is up, and the controller is still not up and running, both the servers and brokers will fail.
  2. Time/DateTime spec is mandatory in a table schema, so make sure it is provided.
  3. If the servers have an uneven segment distribution, run the rebalance endpoint provided by the controller APIs.
  4. In the controller APIs, the ideal state endpoint gives the ideal view of the segments, whereas the external view endpoint gives the actual state of the segments.
  5. Whenever server replicas are increased, triggering a rebalance endpoint is required, if we want to distribute segments over the new server replica.
  6. If a segment is missing in one of the replica sets, then restarting that server will take care of recreating the segment by downloading it directly from the controller.
  7. Do not use Azure file share for server persistent volume, instead use Azure disk.

Conclusion

Apache Pinot being the first OLAP database that I have worked with, I may not be able to compare it to similar systems. What I can say is that we used Pinot to solve our business use case and it had all the things that we were looking for. Pinot has a great open source community behind it, which makes the on-boarding and learning curve relatively painless. Each release has a lot of cool and value-added features, which depicts that this incubating project has a lot more to offer in the future.

We have just explored the tip of the iceberg and are very excited to explore it further!

Useful links

Guiding beacons in this journey :

--

--