Flink Parquet Writer

Hao Gao
Hadoop Noob
Published in
4 min readNov 8, 2017

From last post, we learned if we want to have a streaming ETL in parquet format, we need to implement a flink parquet writer. So Let’s implement the Writer Interface.

Writer V1:

public class FlinkAvroParquetWriterV1<T> implements Writer<T>
{
private transient ParquetWriter<T> writer;
private String schema;
private CompressionCodecName compressionCodecName = CompressionCodecName.GZIP;

public FlinkAvroParquetWriterV1(String schema) {
this.schema = schema;
}
@Override
public void open(FileSystem fs, Path path) throws IOException
{
Configuration conf = new Configuration();
conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
writer = AvroParquetWriter
.<T>builder(path)
.withSchema(new Schema.Parser().parse(schema))
.withCompressionCodec(compressionCodecName)
.withPageSize(8 * 1024)
.withConf(conf)
.build();
}

@Override
public long flush() throws IOException
{
return writer.getDataSize();
}

@Override
public long getPos() throws IOException
{
return writer.getDataSize();
}

@Override
public void close() throws IOException
{
writer.close();
}

@Override
public void write(T element) throws IOException
{
writer.write(element);
}

@Override
public FlinkAvroParquetWriterV1<T> duplicate()
{
return new FlinkAvroParquetWriterV1<>(schema);
}

We return getDataSize in both getPos and flush because there is no getPos and flush method in AvroParquetWriter. So is this legit? The answer is NO. Let’s take a closer look.

What exactly does getDataSize return?

public long getDataSize() {
return this.lastRowGroupEndPos + this.columnStore.getBufferedSize();
}

What exactly is buffer size?

approximate size of the buffered encoded binary data

So now we learned:

  • getDataSize returns approximate size
  • getDataSize returns the size of the mix of on disk & in memory data.

What does getPos do?

Remember we want to use BucketingSink in the last post? Let’s look at where getPos get called in BucketingSink. It controls when the sink closes the file. It doesn’t matter much since who cares if the file is slightly bigger or smaller than the batch size.

What does flush do?

We know Flink uses Checkpointing to ensure exact one. So Let’s take a look at the snapshotState function. We Learned BucketingSink stores the current file length, so when it handleRestoredBucketState, it truncates the invalid file to a valid position. So when the flush method get called, it should return the exact position of the DataStream on disk

What’s the potential solutions:

  1. Can we just uses lastRowGroupEndPos as the DataStream position to restore the file? No. We will lose the data buffered in memory
  2. Can we implement the flush function for AvroParquetWriter? Yes. But it is still not enough. We will cover this later.
  3. I am not familiar with parquet. I don’t want to touch parquet-mr. We all know when the writer is closed. All the data in memory will be flushed to disk. So we can simply call close() in flush(). Then how about the restore part? We can simply just do not restore. Because if the flink pipeline fails in the middle, the file will remain in-progress or pending state. The file in-progress or pending will have an underscore “_” prefix which can be used as a pattern for ignored files.

flush() Implementation

We want to implement flush because the #3 solution works but not perfect. Imaging if your checkpoint interval is small, e.g. 10s, you will end up having tons of small files. We all know we don’t like small files in “big data” world.

In InternalParquetRecordWriter

public long flush() throws IOException {
long memSize = this.columnStore.getBufferedSize();
LOG.info(String.format("Flushing %,d records to disk, mem size %,d.", new Object[]{Long.valueOf(this.recordCount), Long.valueOf(memSize)}));
this.flushRowGroupToStore();
this.initStore();
this.recordCountForNextMemCheck = Math.min(Math.max(100L, this.recordCount / 2L), 10000L);
this.lastRowGroupEndPos = this.parquetFileWriter.getPos();
return this.lastRowGroupEndPos;
}

In ParquetWriter

public void sync() throws IOException {
this.out.sync();
}

Writer V2

public class FlinkAvroParquetWriterV2<T> implements Writer<T>
{
private transient ParquetWriter<T> writer;
private String schema;
private CompressionCodecName compressionCodecName = CompressionCodecName.GZIP;

public FlinkAvroParquetWriterV2(String schema) {
this.schema = schema;
}
@Override
public void open(FileSystem fs, Path path) throws IOException
{
Configuration conf = new Configuration();
conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
writer = AvroParquetWriter
.<T>builder(path)
.withSchema(new Schema.Parser().parse(schema))
.withCompressionCodec(compressionCodecName)
.withPageSize(8 * 1024)
.withConf(conf)
.build();
}

@Override
public long flush() throws IOException
{
return writer.flush();
}

public static void appendFooter(FileSystem fs, Path path, byte[] footer) throws IOException
{
FSDataOutputStream outStream = fs.append(path);
outStream.write(footer);
outStream.hsync();
outStream.close();
}

public byte[] getFooter() throws IOException
{
return writer.getFooter();
}

@Override
public long getPos() throws IOException
{
return writer.getPos();
}

@Override
public void close() throws IOException
{
writer.close();
}

@Override
public void write(T element) throws IOException
{
writer.write(element);
}

@Override
public FlinkAvroParquetWriterV2<T> duplicate()
{
return new FlinkAvroParquetWriterV2<>(schema);
}
}

You probably noticed, two more functions getFooter & appendFooter. What’s that and why we need that?

We all know parquet file stores its metadata in its footer. When we snapshot the current state, we also need to snapshot the footer because when we restore the state, we also need to put the footer at end of the parquet file.

That’s all we need to implement a Parquet Writer in Flink. In order to implement it, you need to understand how Flink checkpoiting and how parquet works. I hope I covered all the why & how. If I didn’t make it clear enough, please comment down below.

--

--