Reading file in Japanese character set encoding using Apache Beam

Darshan Mehta
2 min readOct 18, 2019

Audience: This is aimed towards those who run-into technical problems like I did as a Software Engineer working with data pipeline.

Tech Stack: Java, Apache Beam, Redis, Kafka, Google Cloud Platform (GCP) Storage, and Google Cloud Platform (GCP) Datastore.

Objective: Read the file from Google Cloud Platform Storage, create a data pipeline, transform data, then store in Google Cloud Platform Datastore. The file should be in a certain GCP Storage folder and have several hundreds of thousands of records.

Journey to the solution:

I referred to the Pipeline example here and used the code snippet to read file into PCollection of string.

PCollection<String> readCollection = pipeline.apply("read " + file, TextIO.read().from(path + fileName));

Well, it turned out this was not enough! This code snippet was not reading the data since the file content was in Japanese, character set encoding “Shift_JIS”. I looked through Beam API documentation v2.15.0 as well as other online resources, though they were not enough to lead me into a working solution. Beam-Python TextIO API accepts character encoding (credit to this post form Khushboo) however there is no such provision in Java APIs.

Then, I came across this post where @f.loris suggested adding a line number to the file. I tried this and it worked for UTF-8 encoding but it failed for Shift_JIS.

Worked here: 
//BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "UTF-8"))
Failed here:
//BufferedReader br = new BufferedReader(Channels.newReader(f.open(), "Shift_JIS"))

Finally, after a lot of trial and error I figured out a viable, working solution.

PCollection<String> readCollection = pipeline.apply(FileIO.match().filepattern(path + fileName))
.apply(FileIO.readMatches())
.apply(FlatMapElements
.into(strings())
.via((FileIO.ReadableFile f) -> {
List<String> result = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path + fileName), "Shift_JIS"));){
String line = br.readLine();
while (line != null) {
result.add(line);
line = br.readLine();
}
} catch (IOException e) {
throw new RuntimeException("Error while reading", e);
}
return result;
}));

I hope this helped, and be sure to share this solution and I welcome any requests, recommendations, or corrections!

Darshan Mehta

More stories:

--

--