Advent of Code 2019 in Apache Beam (Days 1–3)

Concepts: Pipeline, Map, Combine, Create, FlatMap, CoJoinByKey, Top

Lak Lakshmanan
Dec 7, 2019 · 6 min read

Inspired by Felipe Hoffa who is doing Advent of Code in BigQuery, I decided to do it in Apache Beam. This blog explains my solutions, which you can find on GitHub. If you are learning Apache Beam, I encourage you to look over my solutions, but then try to solve the problems yourself.

Day 1a: Fuel computation (Concepts: Pipeline, Map, Combine)

The problem for Day 1 consists of some simple math operations applied to a large number of modules. We can parallelize the operations over the modules using Beam. I started by writing the fuel computation:

def compute_fuel(mass):
return np.floor(mass/3) - 2

Then, verified that the code works as desired:

for input, output in [(12,2), (14,2), (1969,654), (100756,33583)]:
assert compute_fuel(input) == output

The Beam pipeline doesn’t use any command-line args, and so is:

opts = beam.pipeline.PipelineOptions(flags=[])   p = beam.Pipeline(runner, options=opts)
...
job = p.run()
if runner == 'DirectRunner':
job.wait_until_finish()

The steps read the input consisting of each module’s mass, convert the mass to a floating point number, compute the fuel for the module, sums up the fuel, and writes out the total fuel:

(p
| 'read' >> beam.io.textio.ReadFromText(options.input)
| 'tofloat' >> beam.Map(lambda s: float(s))
| 'fuel' >> beam.Map(compute_fuel)
| 'total' >> beam.CombineGlobally(sum)
| 'output' >> beam.io.textio.WriteToText(options.output)
)

The Map applies the lambda function to each module, and the CombineGlobally does an aggregation of the fuel for all the modules.

Beam will automatically parallelize each of the Map steps. Had we done a CombinePerKey, Beam would do the necessary Map-Reduce to parallelize the combine steps.

Day 1b: Fuel for fuel

The second part of the problem simply involves changing the fuel computation. The rest of the code remains the same:

def compute_fuel(mass):
fuel = np.floor(mass/3) - 2
if fuel > 0:
return (fuel + compute_fuel(fuel))
else:
return 0

Day 2a: Intcode

On Day 2, the problem is pretty much parsing an “intcode”. Unfortunately, the intcode parser is a state machine, and because Beam is a distributed programming framework, there is very little for Beam to do here. We just have to write the Python code to parse the intcode:

def handle_ints(ints, startpos=0):
if ints[startpos] == 99:
return ints
x1 = ints[startpos+1]
x2 = ints[startpos+2]
outpos = ints[startpos+3]
if ints[startpos] == 1:
ints[outpos] = ints[x1] + ints[x2]
elif ints[startpos] == 2:
ints[outpos] = ints[x1] * ints[x2]
return handle_ints(ints, startpos+4)
def handle_intcode(intcode):
input = [int(x) for x in intcode.split(',')]
output = handle_ints(input)
return ','.join([str(x) for x in output])

As before, we use the provided examples to verify that our code is correct:

   assert handle_intcode('1,0,0,0,99') == '2,0,0,0,99'
assert handle_intcode('2,3,0,3,99') == '2,3,0,6,99'
assert handle_intcode('2,4,4,5,99,0') == '2,4,4,5,99,9801'
assert handle_intcode('1,1,1,4,99,5,6,0,99') == '30,1,1,4,2,5,6,0,99'

Once that is done, the problem asks us to replace the first two inputs and obtain the result at the first position. So:

def run_1202(intcode):
input = [int(x) for x in intcode.split(',')]
input[1] = 12
input[2] = 2
output = handle_ints(input)
return output[0]

The Beam code is trivial because there is only input and it needs to have the run_1202 function applied to it:

(p
| 'read' >> beam.io.textio.ReadFromText(options.input)
| 'run_1202' >> beam.Map(run_1202)
| 'output' >> beam.io.textio.WriteToText(options.output)
)

Day 2b: Filtering (Concepts: Create, FlatMap)

The second part of the problem requires parallel processing, and Beam is back in play. Instead of the hardcoded 12 and 2 in Day 2a, we have to make them parameters:

def run_noun_verb(intcode, noun, verb):
input = [int(x) for x in intcode.split(',')]
input[1] = noun
input[2] = verb
output = handle_ints(input)
return (noun, verb, output[0])

Because we have to find the noun-verb that results in a specific value of output, we return a tuple consists of the two inputs and the output.

The Beam pipeline starts with generating all the possible values of the noun and verb:

nouns_and_verbs = [(x,[y for y in range(0,99)]) for x in range(0,99)]
...
(p
| 'input' >> beam.Create(nouns_and_verbs)

The Python generator creates inputs of the form (3, [0,1,2,…,99]), and Beam.Create makes the Python list of tuples a PCollection of tuples.

Each of the tuples themselves has a list in the second position. So, the next step is to split the second part. Here, because the input is a single tuple, and the output has 100, we need to use a FlatMap (use a Map for 1:1 transformations, FlatMap for 1:many):

'noun_verb' >> beam.FlatMap(split_verbs)

where split_verbs is:

def split_verbs(xys):
x, ys = xys
for y in ys:
yield (x,y)

As before, we then run the program for each (noun, verb):

'run_noun_verb' >> beam.Map(lambda nv: run_noun_verb(intcode, nv[0], nv[1]) )

Note the idiom here when using Map — just pass in a function name if it takes only one parameter, but it takes multiple parameters, then use a lamda to specify the positional/named parameters.

Then, we filter the set of outputs for the desired output value:

'filter' >> beam.FlatMap(filter_for_desired)

where filter_for_desired is:

def filter_for_desired(noun_verb_output):
noun, verb, output = noun_verb_output
if output == 19690720:
yield noun*100 + verb

Here, again, we see that we are using FlatMap because it is not a 1:1 transform. The input consists of the results of processing 100x100 intcodes and the output is a single one that matches the desired output.

Day 3a (Concepts: GroupByKey, CoGroupByKey, Top)

This time, we are given a set of navigation and need to first retrieve a set of points. We can do it by building a Python list:

def find_locations(wire):
positions = [(0,0)]
for nav in wire.split(','):
dir = nav[0]
if dir == 'R':
update = (1, 0)
elif dir == 'U':
update = (0, 1)
elif dir == 'L':
update = (-1, 0)
else:
update = (0, -1)

n = int(nav[1:])
for x in range(n):
lastpos = positions[-1]
newpos = (lastpos[0] + update[0],
lastpos[1] + update[1])
positions.append(newpos)

return positions[1:] # remove the 0,0

Once we have the locations for the two wires, we need to find all the intersection points. A scalable way to do this is to find the intersection points in each row. For that, we will make up a key-value collection for each wire where the key is the row number and the value is the columns that the wire touches the grid:

locations = {'wire1': 
(p | 'create1' >> beam.Create(find_locations(wires[0]))
| 'group1' >> beam.GroupByKey()),
'wire2':
(p | 'create2' >> beam.Create(find_locations(wires[1]))
| 'group2' >> beam.GroupByKey())
}

Then, to find the intersections of the wires, we co-join the two collections by row:

(locations 
| 'cogroup' >> beam.CoGroupByKey()

This yields something that looks like this, where for row #217, wire1 has 9 points and wire2 has only one point:

(217, {'wire1': [[4, 5, 6, 7, 8, 9, 10, 11]], 'wire2': [[-12]]})

Given this structure, we can easily find the intersection points — this is any column that appears in both wire1 and wire2:

def find_intersection(kv):
row, d = kv
if d['wire1'] and d['wire2']:
wire1 = d['wire1'][0]
wire2 = d['wire2'][0]
for col in wire1:
if col in wire2:
yield (row, col)

Once we find the intersection, then finding the Manhattan distance is quite easy:

(locations 
| 'cogroup' >> beam.CoGroupByKey()
| 'intersect' >> beam.FlatMap(find_intersection)
| 'distance' >> beam.Map(manhattan)
| 'mindist' >> beam.CombineGlobally(beam.transforms.combiners.TopCombineFn(1, reverse=True))
| 'output' >> beam.io.textio.WriteToText(options.output)
)

Note that to find the minimum distance, I’m using the TopCombineFunction, asking Beam to find the top 1, and because Top normally finds the greatest, I tell it to reverse the comparison.

Day 3b: signal delay

The second part basically changes the metric. Instead of the Manhattan distance, we need to minimize the total number of steps to reach the intersection. While we could compute the Manhattan distance simply from the row, col of the points, now we have to keep track of the steps as well. We can modify the code from the previous section to store the column as well as the step number:

for x in range(n):
row, (col, steps) = positions[-1]
newpos = (row + update[0],
(col + update[1], steps+1))
positions.append(newpos)

Then, when doing the search, we make sure to store the total number of steps, which is the signal delay:

def find_intersection(kv):
row, d = kv
if d['wire1'] and d['wire2']:
wire1 = d['wire1'][0]
wire2 = d['wire2'][0]
for col1, steps1 in wire1:
for col2, steps2 in wire2:
if col1 == col2:
yield (row, col1, steps1+steps2)

Instead of the Manhattan distance, minimize this signal delay and you’ve got it.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade