Luigi : A workflow management system for Spark jobs

Prasanth Lade
3 min readNov 13, 2017

--

Luigi: A workflow management API from Spotify ®

Luigi is a workflow management system to efficiently launch a group of tasks with defined dependencies between them. It is a Python based API that was developed by Spotify® to build and execute pipelines of Hadoop jobs, but it can also be used to create workflows with any external jobs written in R or Scala or Spark.

Some of the cool features it provides are: Workflow definition, Failure handling, Common event handling, Task tracking, Smooth integration of regular tasks and Spark jobs

Below is an overview of how Luigi works and we will dive deep into each of these pieces in detail.

Luigi client helps in executing tasks built in python/R and Luigi server helps in monitoring

1. Install luigi

Since luigi is written in Python, you can install it using pip:

pip install luigi

2. Define luigi task

Here is an example task defined in the file luigi_example.py that checks if a file exists:

3. Execute your task

PYTHONPATH='.' luigi --module luigi_example FileExistsTask --file-path test_data.csv --local-scheduler

where each of the options are explained below:

PYTHONPATH=’.’ : You can add any modules that luigi task may need

--module luigi_example : Name of the python script or module that contains luigi task

FileExistsTask : Name of the luigi task you want to execute

--file-path test_data.csv : The luigi parameter that the FileExistsTask requires

--local-scheduler : Indicates that you want to run this task locally i.e. without connecting to luigi server

4. Send email notifications

If you want luigi to send an email in case of an error you need to set the following parameters in luigi config file (named as luigi.cfg or client.cfg). This file should be in the folder where luigi is launched.

[email]
receiver:receiver_email_address
sender:sender_email_address
[smtp]
host:$SMTP_SERVER_HOST

If you are within your company network find out your company’s smtp server address and add it to the file. Now relaunch the luigi task by modifying the command by adding and extra parameter --email-force-sendas below:

PYTHONPATH='.' luigi --module luigi_example FileExistsTask --file-path test_data.csv --local-scheduler --email-force-send

5. Visualize luigi tasks

In order to track and visualize all your luigi tasks, you may want to connect to a centralized luigi server that is up and running.

Start luigi server:

Luigi server is installed when you install luigi through pip . Start the installed server as:

luigid --background --logdir tmp

where tmp is the directory to store the luigi log files.

Edit luigi config file:

Add the following extra parameter to the luigi.cfg file so that all future luigi tasks can be tracked.

[email]
receiver:receiver_email_address
sender:sender_email_address
[smtp]
host:$SMTP_SERVER_HOST
[core]
default-scheduler-host:$LUIGI_SERVER_HOST

where $LUIGI_SERVER_HOST is the name or IP address of the machine on which you have launched the luigi server. Now try to relaunch the luigi task using the aforementioned command.

Visualize the tasks:

You can visualize the luigi server using the url http://$LUIGI_SERVER_HOST:8082 and below is a screenshot of the server:

Screenshot of luigi server

6. Launch Spark jobs

One of the most interesting features of luigi is the ability to launch Spark jobs as luigi tasks. Luigi API provides a PySparkTask class that can be extended to write custom Spark jobs. Below is an example of one such task where we try to copy a file from a regular filesystem to HDFS.

Before we launch these tasks we need to add the paths to Hadoop and Spark libraries to the luigi.cfg config file:

[email]
receiver:receiver_email_address
sender:sender_email_address
[smtp]
host:your_company_smtp_server
[core]
default-scheduler-host:name_of_your_luigi_server
python-home-dir:$PYTHON_HOME
[spark]
spark-submit:$SPARK_HOME/bin/spark-submit
hadoop-conf-dir:$HADOOP_HOME
yarn-conf-dir:$YARN_HOME
master:yarn
num-executors:10

You have to know the paths to your $PYTHON_HOME , $HADOOP_HOME, $SPARK_HOME and $YARN_HOME to launch Spark based tasks. You can now launch the Spark task as below:

PYTHONPATH='$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.1-src.zip:.' luigi --module luigi_example MoveFileTask --input-filepath test_data.csv --output-filepath test_data_hdfs

--

--