Outputting headers on CSV files using Google Cloud Dataflow

If you need to combine several processed PCollections to one file in order to write XML or CSV as the output, you must observe how Dataflow currently works regarding writing to a file using WriteIO. The following code writes PCollections as they come:

Pipeline p = Pipeline.create(options);
PCollection<String> header = p.apply(Create.of(“Beggining”));
PCollection<String> stuff = p.apply(Create.of(“Body 1”, “Body 2”));
PCollection<String> end = p.apply(Create.of(“End”));
PCollectionList<String> pcs = PCollectionList.of(header).and(stuff).and(end);
PCollection<String> merged = pcs.apply(Flatten.<String>pCollections());
merged.apply(TextIO.Write.to(“/Users/main/Desktop/test”).withNumShards(1).withSuffix(“.txt”));
p.run();

So the result is one unordered file:

Body 1
Body 2
End
Beggining

While the following one overwrites the output completely:

Pipeline p = Pipeline.create(options);
PCollection<String> header = p.apply(Create.of(“Beggining”));
PCollection<String> stuff = p.apply(Create.of(“Body 1”, “Body 2”));
PCollection<String> end = p.apply(Create.of(“End”));
Bound<String> one_csv = TextIO.Write.to(“/Users/main/Desktop/test”).withNumShards(1).withSuffix(“.txt”);
header.apply(one_csv);
stuff.apply(one_csv);
end.apply(one_csv);
p.run();

Resulting in:

End

Now comes the trick to actually get it to work today. You need to build up the results of the CSV body inside the Dofn that generates the CSV body and also holds the header like this:

PCollection<String> output = data.apply(ParDo.of(new DoFn<String, String>() {
String new_line = System.getProperty("line.separator");
String csv_header = "id, stuff_1, stuff_2" + new_line;
StringBuilder csv_body = new StringBuilder().append(csv_header);

@Override
public void processElement(ProcessContext c) {
csv_body.append(c.element()).append(newline);
}

@Override
public void finishBundle(Context c) throws Exception {
c.output(csv_body.toString());
}

})).apply(TextIO.Write.named("WriteData").to(options.getOutput()));

There’s one gotcha: The CSV body must fit in memory. If it does not then you’ll have to partition your PCollection, getting the processing split up and you’ll end up with several CSV files.