Speed up Kafka queries on Presto

Hao Gao
Hadoop Noob
Published in
2 min readJun 2, 2017

After I added protobuf and avro decoders into Presto, right now I can query my Kafka cluster through Presto. It saved me lots of time debugging data issues in my data pipelines. Basically If I didn’t see data in Kafka, I do not need to debug my downstream data pipelines.

But I do notice one thing in the current Kafka connector. In the current implementation, there is no predicate pushdown. Presto will try to scan the whole Kafka topic. This implementation works fine if the topic is small. But If we are talking about billions records daily, it won’t work. I know predicate pushdown is not easy, especially the underlying layer is Kafka. But I think partitioning may be a good starting point to speed up the queries.

Imaging if we could have partitioning based on offsets and timestamp for Kafka. Something like this:

_partition_id | _offset_start | _offset_end | _timestamp
--------------+--------------+------------+------------
0 | 7684749 | 7878638 | 1496447327
1 | 7691636 | 7885237 | 1496447327
2 | 7684691 | 7878592 | 1496447327

Then when we have the following query:

select id, id_str, user_name from kafka_local.default.protobuf_message where _timestamp > to_unixtime(timestamp '2017-05-31 00:00 UTC')

It could just scan the data after ‘2017–05–31 00:00 UTC’. Basically you scan less, you scan faster.

To archive this, we need to implement a metastore (idea from hive metastore) and push down the predicate to the Kafka connector (through TableLayout).

So everything is WIP. Check out code here

https://github.com/hadoop-noob/presto/tree/0.178_noob

If anyone has any idea, feel free to reach out to me.

--

--