Elixir/Flow — Local MapReduce

Build simple concurrent workflows with Elixir and Flow

In day-to-day development, we run across smaller tasks that need to parse and structure a hefty amount of data. One programming pattern we can use is MapReduce, which allows large datasets to be broken up and processed concurrently. Being appropriately named, MapReduce is primarily two steps: a map, where data is divided into a distinct part (and sometimes transformed), and a reduce, where the data is combined into the desired output (a sum or list). MapReduce programs are often created for huge amounts of data and are run across clusters of processors.

Though MapReduce and concurrency may seem like a daunting prospect to use for a small program or script, the Elixir language and Flow module make setting up a concurrent workflow clear and simple. Let’s go through a small script that helped accomplish this for a recent task.

Example: Parsing Message Queue Logs

Note: This article uses some terms from Elixir, like “producer” and “consumer”. Definitions and more info can be found at GenStage and Elixir School.

For this task, I needed to document all the types of messages one of our apps sent to our message queue, along with an example of each message type. The app sends a lot of messages to our message queue, and due to a large amount of message constructors, various eras of code style, and varied sources, there was not an easy way to take stock of where all the messages were created. After looking for alternatives, I was pointed to this application’s log files where we keep around a week’s worth of logged info, including messages created and sent to our queue. These log files amounted to about 6GB of data. The messages looked like this:

I, [2018–09–05T22:27:25.721307 #12432] INFO — : publishing <{“event_type”:”CompleteEvent”,”user_id”:0000,”occurred_at”:1536186445,”event_data”:{“avatar_url”:AVATAR_URL,”user_url”:USER_URL,”content_title”:”CONTENT_TITLE”,”user_display_name”:”John Doe”,”username”:”j_doe”}}> to [Event.CompleteEvent]
I, [2018–09–05T22:27:25.726777 #12432] INFO — : publishing <{“uids”:[0000],”data”:{“submission_id”:0000,”event”:”signature”,”source”:APP_NAME,”type”:”submission”,”build_type”:”none”,”message”:”You marked this complete”}}> to [pushstream_publisher]
I, [2018–09–05T22:27:25.728935 #12432] INFO — : publishing <{“user_id”:0000,”type”:”signature”,”status”:”done”}> to [user_activity.event]
I, [2018–09–05T22:27:25.888908 #18163] INFO — : publishing <{“service”:”RecordUserIsOnline”,”payload”:{“user_id”:0000}}> to [low_priority]

While thinking of tools for this task, Elixir and Flow came to mind quickly. The word count example from the Flow documentation is a wonderful base to build a concurrent workflow to fit the task.

Building the script

The source code for the complete MapReduce script can be found here. We’ll start by examining the main start_flow/0 function first:

Let’s walk through this pipeline.

Inside start_flow/0, we start our pipeline by getting the data we need. The streams/0 implementation is from the “Avoid Single Sources” section of the Flow documentation:

defp streams() do
for file <- File.ls!(“/logs”) do
File.stream!(“/logs/#{file}”, read_ahead: 100_000)
end
end

This function sets up the whole directory of logs as a data source, and File.stream!/3 returns an Enumerable to allow for pulling data.

Moving down to line 4, Flow.filter makes sure only lines that return true from the passed-in filter function are passed on to the remaining pipeline. In our case, each message had a prepended “publishing” string, so checking for the presence of that string was an easy way to determine if the line was a message. At this point, there are multiple consumers working on our data concurrently.

Next on line 6, you’ll see Flow.map corresponds to the map from MapReduce. This allows us to take each line and transform the data into a desired format. For this task, each raw message needed to be passed through as the sample, and I needed to parse out the message type so I could use it as a key for the message. The transformer function get_key_and_payload uses a cond statement and regex to determine the type of message:

defp get_key_and_payload(line) do
key = get_key(line)
payload = get_payload(line)
{String.to_atom(key), key, payload}
end
defp get_key(line) do
cond do
service?(line) ->
get_service_key(line)
event?(line) ->
get_event_key(line)
stream_event?(line) ->
get_stream_event_key(line)
true ->
get_other_key(line)
end
end
defp service?(line) do
Regex.match?(~r/<\{\"service\"/, line)
end
# see source for helper functions omitted here
# https://gist.github.com/cryic/4854510fc1b5d69cec540b03e106a498

This function’s return value is a tuple with three elements: the message type as an Atom, the message type as a String, and the full message. The Atom message type is due to the next step, Flow.partition, only allowing Integers or Atoms as its hashing key.

Moving down to line 7, Flow.partition is used to direct each line to the same GenStage consumer. By default, Flow distributes data to be worked on as efficiently as possible. However, using partition/1 and providing a value, you tell Flow to send data with the same key to the same consumer. All other values will be distributed as normal. Sending the same type is important so the consumer can reduce it properly without duplicates in the end.

Last in pipeline on line 8,Flow.reduce combines the data into our desired output. In this case, I only needed a single sample, so with Map.put_new, it only adds a new entry into the Map when there is a new message of that specific type. Having to keep processing messages if we already have a sample seems wasteful, but I didn’t have a great guarantee over how many message types were unique.

After mapping and reducing, we can use `Enum.to_list` to bring all the data back together from the disparate consumers and convert into a List.

Results

Using my 2017 MacBook Pro, filtering through the 6GB of log files took 2 minutes 4 seconds, utilizing all 8 processor cores. As a comparison, a version based on Stream took about 4 minutes 10 seconds. There are definitely ways to clean up and tune this Flow pipeline to cut time off of processing. However, given that the time of development was short and the code is simple, this Elixir + Flow solution seems reasonable, and I’m happy with the result.

MapReduce is a very powerful model for data processing. Elixir/Flow makes for a very simple but expressive way to use that model. In the few lines of my workflow and helper functions, Flow builds a multi-stage flow to process my data quickly and efficiently. I’d encourage developers to build a MapReduce workflow with Elixir/Flow to see how it can help you clarify and speed up your tasks.

Resources used and recommended:


Thanks for reading! Want to work on a mission-driven team that loves Elixir and contributing to open source projects? We’re hiring!


Footer top

To learn more about Flatiron School, visit the website, follow us on Facebook and Twitter, and visit us at upcoming events near you.

Flatiron School is a proud member of the WeWork family. Check out our sister technology blogs WeWork Technology and Making Meetup.

Footer bottom