A Realtime Flink Parquet Data Warehouse

Hao Gao
Hadoop Noob
Published in
3 min readSep 22, 2017

Recently I am working on migrating our currently pipelines (mostly pyspark) to JVM based. Our plan is to use spark for batch processing and flink for real-time processing.

One of our most critical pipeline is the parquet hourly batch pipeline. It generates the data for our data warehouse and people use presto to query it. The original design is very simple and robust. It is a pyspark batch job. It consumes secor logs and produces data on s3. Secor is used to ensure data exact once. Since the parquet job needs to wait for the secor batch finished, it generally brings delay to the data.

So is that possible to make the data available in near real time? I think flink probably a perfect fit here. Pipeline itself is simple, I need to have a Kafka source connected to a Parquet sink. That’s it.

In order to archive this, I still need to write some custom code:

  1. A decoder for my Kafka message which is easy. I just need to implement the DeserializationSchema with my generated java class. Only one thing need to notice, I need to register my generated java class.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
nv.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(ProtoSchema.ProtoClass.class, ProtobufSerializer.class);

2. A parquet sink writer. Sink, I choose flink’s BucketingSink since we are creating a warehouse here. We certainly want partitions in it. Let’s take a look at flink’s sink writer API. We need to implement write, open, getPos, flush, close and duplicate. But if you look at the Actual parquet writer (Here we choose AvroParquetWriter), you will notice there is no flush method and getPos.

Then you probably will google around, just try “flink parquet writer”. There are few people taking about. Most of them are talking about flink’s batch API. This makes sense, if you calm down and think about. The actual parquet writer implementation is not record by record, it writes row group by row group. Some interesting link:

  1. https://stackoverflow.com/questions/41144659/flink-avro-parquet-writer-in-rollingsink. This one basically tells you it is not possible. I agree with it, parquet writer doesn’t fit in streaming. Micro batching makes better sense.
  2. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-parquet-Avro-output-td11123.html. This one tells you it is possible. They provided a flink parquet writer. At first glance, this looks like just what I want. But If we take a look at flush and getPos, it uses writer.getDataSize for both method. So what exactly is getDataSize? It actually does nothing but returning current data written in both memory & disk. This implementation doesn’t work when your flink streaming pipeline fails and try to restart from the last successful checkpoint. In order to explain why, I probably need to have a separate post.

To conclude:

  1. If you are looking for some existing solutions, the answer is No. Maybe just use traditional Batch pipelines. You probably can implement it in 5 minutes, even with flink batch API. If you want near real time, try spark, since it is micro batch. UPDATE: found this link, it seems spark structured streaming is the solution. https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html
  2. If you do not care about record loss when the pipeline fails, you can use #2.2.
  3. If you want real flink streaming parquet (technically it still a micro batch, but you can utilize flink’s checkpoint mechanism ), you need to customize both flink & parquet code. I will cover how to archive this later.

I am sure my opinion is biased. If anyone has a better idea or implementation, please share with me. I really appreciated.

--

--