Getting Started with Luigi—What, Why & How

M Haseeb Asif
Big Data Processing
5 min readOct 1, 2022

This is the first of a two-part series about getting started with Luigi. The second part is Getting started with Luigi — Building Pipelines.

The amount of data is multiplying, and no organization hasn’t recognized the potential benefit of leveraging the data. However, processing data from various sources and providing insights to different groups within an enterprise requires orchestrating a plethora of other data pipelines.

Data pipelines are doing most of the data movement across the organization, either ETL or ELT. E is about extracting data from data sources such as databases, file systems, connected IoT devices, websites, or real-time user activity. T is for transforming the data, such as modeling it, cleaning it, and then applying different aggregations and filtering. Finally, l is loading or storing the data into a specific system, such as a database, blog storage, etc.

Software, data engineers, or anyone building these pipelines translate the business logic into a set of interrelated computational batch or streaming jobs. These jobs consist of multi-step tasks that must be completed in a specific order. With the growing needs of organizations, it’s hard to keep track of tasks, their dependencies, scheduled times, etc.

Most of the pipelines are DAG (Directed acyclic graph). It means any data pipeline has input, output, and intermediate processing results while the data is being processed with specific business rules. For example, the following is a sample data pipeline with the database and JSON file as input which is processed and generates intermediate results. Later stages are using those intermediate results and generating the required insights.

DAG for a simple data pipeline

When pipelines grow, they become more complex and have multiple interdependent nodes in their DAG. In such cases, failure at any single stage requires re-running the whole pipeline without any manual interventions. To avoid that, we need different workflow management tools such as Luigi or airflow.

Workflow management software (WMS) helps you to automate all the processes. Directed acyclic graphs, or DAGs, are one way to design the data workflows and keep track of the interlinked tasks. WMS helps teams manage the complex web of DAGs across the organization, taking care of failures and preventing downstream tasks from continuing until clearing the previous failures. Apache Luigi and Apache Airflow are two widely used WMS in the industry.

Apache Luigi

Luigi is a workflow management system to launch a group of tasks with defined dependencies efficiently. It is a Python based API that Spotify® developed to build and execute pipelines. Developers can use it to create workflows with any external jobs written in R or Scala, or Spark. Luigi enables complex data pipelines for batch jobs, dependency resolution, workflow management, pipeline visualization, handling failures, command line integration, and more.

It is easy to learn, and a strong community supports regular development. Some of the features offered by Luigi are:

  • Ease the pipeline building
  • Separation of concerns
  • Comment event and failure handling
  • Integration of task with other eco-system jobs such as Spark

Install Luigi

We need to have Python 3 for the latest versions since it is written in Python. As you guessed, we will use the pip to install the Luigi on the machine.

pip install luigi

But as I always recommend, we should create a virtual environment. So we will create the virtual environment, enable it, and install the Luigi package.

python -m venv luigienv
source luigienv/bin/activate
pip install luigi

Luigi Pipeline componets

Luigi doesn’t use DAG compared to other Airflow; instead, Luigi pipeline has two building blocks, Task and Target. A task is like a transformation or performing an operation that generates a target, such as an output file. Targets are both the results of a task and the input for the next task.

Tasks are the basic unit of work in the pipeline. It can also require another task to create a dependency, similar to a DAG. Each task generates the output targets, which are leveraged by the other tasks downstream. A task is considered complete only if its target exists.

A target could be the final output or intermediary output for other tasks to consume to produce the final result. For example, it can be a file on the file system or a database entry.

Tasks are developed using the Task class in Luigi. It has three essential methods

  • Requires: to specify the dependencies on the other task
  • Run: what is the business logic or different operations that need to be performed
  • Output: defines the output or where to store the output

Writing your first pipeline

Task is the base class for all the tasks in the luigi. we define the target of the task through the output method on the task class. Following is the simple hello world extending the luigi task

import luigi
from luigi import Task

class HelloWorld(Task):

def output(self):
return luigi.LocalTarget('result.txt')

def run(self):
print("hello")
with self.output().open('w') as f:
f.write('Hello world')

if __name__ == '__main__':
luigi.run(['HelloWorld', '--local-scheduler'])

The run method is the core where all the business logic or operations happen within a Luigi class, while output is used to send the data. Currently, it will write the hello world to a local file. So, the run method is doing the actual work, processing, and writing the results to the target.

Finally, we have run the Luigi pipeline from the main program. But, of course, you can run it from your IDE or the command line as well.

luigi --local-scheduler HelloWorld --module file_name TaskName

Luigi run needs to know which pipeline to run and where to run. We said that we want to run the hello world pipeline on the local system.-local-scheduler indicates that you want to run this task locally i.e., without connecting to luigi server

Writing a pipeline with multiple tasks

Now we know how to create a pipeline in the Luigi, we will create another example pipeline based on the Luigi documentation. This example has two tasks, and one is generating the words while the second task is counting the letters. But, first, let’s have a look at the code.

class GenerateWords(luigi.Task):    def output(self):
return luigi.LocalTarget('words.txt')

def run(self):
# write a dummy list of words to output file
words = ['apple', 'banana','grapefruit']
with self.output().open('w') as f:
for word in words:
f.write('{word}\n'.format(word=word))

class CountLetters(luigi.Task):

def requires(self):
return GenerateWords()

def output(self):
return luigi.LocalTarget('letter_counts.txt')

def run(self):
# read in file as list
with self.input().open('r') as infile:
words = infile.read().splitlines()
# write each word to output file with letter count
with self.output().open('w') as outfile:
for word in words:
outfile.write('{word} | {letter_count}\n'.format(word=word,letter_count=len(word)))

So the CountLetters is the primary task depending on the GenerateWords Task. So, first, GenerateWords() will be executed, which doesn’t have any dependency. It generates a target of a text file, words.txt. CountLetters() starts its execution while reading the input from self.input() which represents the targets specified in the required method. It reads all the input and counts the letters, and writes to a target of letter_counts.txt

This article explains what Luigi is and why we need it. Later, it shows how to use Luigi to create pipelines. We created a basic Luigi pipeline. Next, we will discuss how to make more complicated, interdependent, and parallel pipelines in Luigi.

References

--

--

M Haseeb Asif
Big Data Processing

Technical writer, teacher and passionate data engineer. Love to talk, write and code with Apache Spark, Flink or anything related to data