Getting Started with Luigi — Setting up Pipelines

M Haseeb Asif
Big Data Processing
4 min readJan 30, 2023

This article is the third and the last in the series about getting started with Luigi. The first article introduced Luigi, what it is, why we need it, and some of the essential components. The second article talked about building pipelines — sequential, parallel and more complex interdependent pipelines. This article will cover how to configure the pipelines, pass different configuration and then execute them in different ways.

In the previous articles, we have created different pipelines by using the Luigi task but most of them configurations was hardcoded. Now, let’s assume the same country pipeline we built last time, if we want to filter the country based on the passed parameter. We will update our task to accept the specific country.

Luigi has few different type of parameters which is self describing by their names. These parameters are

  • IntParameter for integers values
  • BoolParameter for boolean values
  • Parameter for strings and other values
  • DateParameter for date data types
  • ListParameter for passing the list
  • DictParameter for a dictionary of items

Parameters makes your pipelines more generic and avoids the code duplication. For example, in our case, instead of writing a separate pipeline for each country, we can have a single pipeline with country as a parameter and you can execute it for a specific country by passing a parameter. If we want to update our pipeline to count the employees in a country under the certain age, we will add two params to the same pipeline as follows

class CountryCount(Task):
country_name = Parameter()

def requires(self):
..............

Adding a parameter in the Luigi is as simple adding a variable of the type of parameter you want. Now this task will require us to pass the two parameters so we will have to update the parent task if there is any, summarise report in our case, or pass it through the pipeline.

So, we have to updated the summarise report pipeline to accept the parameter which is then passed on to the CountryCount as follows

class SummariseReport(Task):
country_name = Parameter()

def requires(self):
return [ProcessSalaries(),
CountryCount(self.country_name),
AverageAge()]

There are two ways to pass parameters to a pipeline — command line or through the config file. We have been using the command line to run the pipeline so far, hence we will add the another parameter to it as follows

python -m luigi --module pipelines.wc SummarizeReport --country sweden --local-scheduler

Other approach is to use the config file, which is the recommended approach because once the number of parameters grows, it’s a challenge to manage them. Furthermore, config files can be used to do the additional settings as well other than passing the params to the pipeline such as log level, connection details with different databases, and other configurations. Finally, it is easy to have multiple config files for each environment.

Luigi looks for the configuration from a file name luigi.cfg file in the working directory. You can configure custom config directory as well with LUIGI_CONFIG_DIR environment variable.

So, we will create the luigi.cfg file with module name inside the brackets as [SummarizeReport] since we can have multiple configurations inside a single config file. The file will look as follows for me

[SummarizeReport]
country_name=Sweden

Once we have the file, we will execute the same command without the parameter and it should work as expected.

python -m luigi --module pipelines.wc SummarizeReport --local-scheduler

Some pipeline needs to be run all the time while others needs to be scheduled as first thing in the morning or at night when load on the cluster is at its lowest. Also, there could be certain pipelines that can take more than an hour, or certain days to complete, it is good to have a mechanism to track the pipeline progess.

We have been developing the pipeline so far, probalby on our local machine or in the development envrionemnt. It is different to run those pipelines in production envrionment.

Running pipeline in development or production

We can use local schedular with hard-coded parameters while doing the development but it is recommend to use the Luigi daemon in the production. Also, you can run pipelines with the Luigid running on a different host as well.

Futhermore, it is recommend to use the LUIGI_CONFIG_PATH to point to the config file in different environments to avoid running production pipelines with the development code or vice-versa. One common approach is to append the environment with the config file such as luigi.dev.cfg or luigi.prod.cfg and then point to the environment specific file. You can use the following command to set the config file (assuming Luigid is running)

python -m luigi --module pipelines.wc SummarizeReport

In nutshell, you can create configurable pipeline using the luigi where they can take the params using different ways. Also, luigi allows you to run the pipeline in single pipeline execution or in a daemon mode as well.

--

--

M Haseeb Asif
Big Data Processing

Technical writer, teacher and passionate data engineer. Love to talk, write and code with Apache Spark, Flink or anything related to data