Getting Started with Luigi — Building Pipelines

M Haseeb Asif
Big Data Processing
5 min readOct 5, 2022

This article is the second in the series about getting started with Luigi. The first article introduced Luigi, what it is, why we need it, and some of the essential components. We talked about the Luigi task being the core of all the pipelines. A task has three essential methods

  • Require — to describe the dependencies before execution of the current task
  • Output — Defines the target or output for the current task
  • Run — this is where we define the business logic or different type of transformations on the data

Luigi tasks have unique names, and they are atomic. Also, tasks are idempotent — which means running them, again and again, should produce the same result. Furthermore, Luigi’s tasks are processing checkpoints because it marks the completion of a stage in the pipeline. If the pipeline fails, it will continue where it left off instead of starting from scratch.

So, If you want to execute the task again, the output file for the task needs to be removed. Otherwise, Luigi will not rerun the task assuming that the last execution has been successful.

In the last article, we created a virtual environment and ran our hello world pipeline. Now we will use the same environment and build a few more pipelines with varying dependencies, sequences, and parallelism.

Sequential Pipelines

So, we will create dummy employee datasets and generate different analytical reports, such as counting the total salary per employee or how much the company pays to all the employees. The pipeline will have three stages, Generate data, Process Salaries, and then Generate summary reports. (It could be simpler, but I have added multiple stages for learning purposes.)

Fig 1. Sequential Luigi pipeline

The following will be code for all three different tasks.

import luigi
from luigi import Task, LocalTarget

class GenerateData(Task):
def output(self):
return LocalTarget('input.csv')

def run(self):
with self.output().open('w') as f:
#name, age, month, salary, country
print('Emma,27,6,500,Sweden', file=f)
print('Emma,27,7, 600,Sweden', file=f)
print('August,29,6,2000,Sweden', file=f)
....
class ProcessSalaries(Task):
def requires(self):
return GenerateData()
def output(self):
return LocalTarget("salaries.csv")
def run(self):
emp ={}
for line in self.input().open():
cols = line.split(',')
if cols[0] in emp:
emp[cols[0]] += int(cols[3])
else:
emp[cols[0]] = int(cols[3])
print(emp)
with self.output().open('w') as f:
for e in emp:
print("{},{}".format(e,emp[e]), file=f)

class SummarizeReport(Task):
def requires(self):
return ProcessSalaries()
def output(self):
return LocalTarget("summary.txt")
def run(self):
total = 0.0
for line in self.input().open():
month, amount = line.split(',')
total += float(amount)
with self.output().open('w') as f:
f.write(str(total))

if __name__ == '__main__':
luigi.run(['SummarizeReport','--local-scheduler'])

As you can see in the above code listing, Generate Data has no dependencies, creating input.csv as an output file. Next, Process salaries depend on Generate Data task because it reads from the input.csv. It sums all the salaries for each employee and stores it into a salaries.csv file. Finally, the summary report task summarizes all the employees’ salaries and stores them in the summary.txt file.

Parallel Pipelines

Let’s say we have a requirement to see the total salary paid to each employee, the number of employees for each country, and the average age of all the employees. In this case, we don’t need a sequential pipeline, but we can execute all the tasks in parallel. So, we will design our pipeline as follows.

Fig 2. Luigi Pipeline with Dependant tasks

As you can see in fig 2, we have to Generate an input task feeding the data to three tasks, which are running in parallel, and their output is consumed by the Summarise report, which is similar to a dashboard for a company.

Generate Data and process salaries will have the same code, but for the other two tasks, we have the code as follows.

class CountryCount(Task):
def requires(self):
return GenerateData()
def output(self):
return LocalTarget("countrycount.csv")
def run(self):
cntry = {}
for line in self.input().open():
cols = line.split(',')
country = cols[4].strip()
if country in cntry:
cntry[country] += 1
else:
cntry[country] = 1

with self.output().open('w') as f:
for c in cntry:
print("{},{}".format(c, cntry[c]), file=f)
class AverageAge(Task):
def requires(self):
return GenerateData()
def output(self):
return LocalTarget("age.csv")
def run(self):
emp = {}
total = 0.0
count = 0
for line in self.input().open():
cols = line.split(',')
if cols[0] not in emp:
emp[cols[0]] = 0
total += int(cols[1])
count += 1

with self.output().open('w') as f:
f.write(str(total / count))

So, now we have all three different tasks up and running. The only update is the summarised tasks, where require method will have three tasks. Additionally, since we will have more inputs to the tasks, we will use indexing while accessing the Task inputs. So self.input() will be updated to self.input()[0] as per the order in the require method. Following is the updated summarised task code.

class SummarizeReport(Task):
def requires(self):
return [ProcessSalaries(),
CountryCount(),
AverageAge()]
def output(self):
return LocalTarget("summary.txt")
def run(self):
total_salary = 0.0
average_age = 0.0
for line in self.input()[0].open():
month, amount = line.split(',')
total_salary += float(amount)

for line in self.input()[2].open():
average_age = float(line)

with self.output().open('w') as f:
print("Total salary payout is {}".format(total_salary), file=f)
print("Average age is {}".format(average_age), file=f)

for line in self.input()[1].open():
country, count = line.split(',')
print("Total {} employees in {}".format(count.strip(),country), file=f)

Finally, instead of only writing to the files, we can write to S3Target or a database.

Wrapper Task

The WrapperTask is a dummy task whose purpose is to define a workflow of tasks. Therefore it doesn’t perform any actions by itself. Instead, by defining several requirements, this task will be complete when every requirement listed in the requires method has been completed. The main difference between this WrapperTask to a regular Task is that you don’t need to define an output method to signal that this task succeeded.

Open Source Task

We want to do many things in routine, such as connecting to various databases and uploading or downloading files from different cloud providers. Most of the time, it’s the repetition of the efforts. So there are quite a few reusable tasks available contributed by the community under the contrib sub-module, which can be used for getting things done quickly. Some of them are Hadoop, MongoDB, Kubernetes, and Docker.

Finally, I would like to mention that instead of running on the local schedular, you can run the Luigi centrally and submit all the pipelines to the central instance. It also allows you to visualize the pipelines. So, first, you have to run the Luigi server with luigid command. Then, you can access the server on http://localhost:8082 . Once you run your job again, without the local schedular, you will be able to visualize the job, its dependencies, execution times, and much more details. You can switch to the central schedular by updating the main method as follow where — local-scheduler has been removed.

if __name__ == '__main__':
luigi.run(['SummarizeReport'])

The next step is to customize and configure the Luigi per the organizational needs, which we will discuss in the following article.

--

--

M Haseeb Asif
Big Data Processing

Technical writer, teacher and passionate data engineer. Love to talk, write and code with Apache Spark, Flink or anything related to data