Invoke Jobs in Matillion using AWS Lambda in Python

Harish Sankaranarayanan
BI3 Technologies
Published in
6 min readSep 17, 2021

When a new object is added to S3, a Matillion ETL process is immediately triggered to load the object into a fact table, transform it, and append the modified data to it.

FILES IN S3:

A file could be uploaded to a bucket from any source and of any format, such as json, RDBMS, csv, and so on.

HOW TO INITIALIZE AN AWS LAMBDA FUNCTION:

  1. When a new object arrives in S3, Lambda functions can be invoked. Some metadata, including the object path, is also supplied to the function.
  2. When a message comes on a SQS queue, Matillion ETL can launch a job. The message tells the tool which job to run and what variables it needs.(In this case, the only variable we require is the filename that just came in S3.)
The diagram outlines the basic architecture
The diagram outlines the basic architecture

GET STARTED WITH LAMBDA FUNCTIONS

  1. When you open the AWS Management Console, navigate to Lambda under Services.
  2. Select “Create a Lambda Function.”
  3. Choose s3-get-object-python.
  4. Configure the bucket’s S3 source.
Configure the Lambda to S3

AWS Lambda console: Create a Lambda function (using the lex-order-flowers-python blueprint) and test it with sample event data.

To create the Lambda function (console):

  1. Go to https://console.aws.amazon.com/lambda/. and log in to the AWS Management Console.
  2. Choose the Create function.
  3. On the Create function page, choose Use a blueprint. These functions are available in Node.js as well as Python. Using the Python-based blueprint is the best way to go for this activity.
  4. Do the following on the Basic Information page.
  • Give your function a name in Lambda
  • For the execution role, choose Create a new role with basic Lambda permissions.
  • The rest of the settings can be left as-is.

5. Choose the Create function.

SAMPLE CODE FOR LAMBDA:

Eg. for Lambda function to trigger Matillion
import boto3
import json

sqs = boto3.resource(‘sqs’)
queue = sqs.get_queue_by_name(QueueName=’ETL_Launchpad’)
def handler (event, context):

for record in event[‘Records’]:
filename_from_event = record[‘s3’][‘object’][‘key’]
sqs_msg = {
“group”: “GROUP_NAME_MATILLION” (eg:AWS)
,”project”: “ PROJECT_NAME_MATILLION” (eg:LAMBDA_TEST)
,”version”: “default” (It is ‘default’ by default)
,”environment”: “ENV_NAME” (eg:live_db)
,”job”: “JOB_NAME” (eg: RunETL)
,”variables”: {
“file_to_load”: filename_from_event
}
}
queue.send_message(MessageBody=json.dumps(sqs_msg))
return event

The following are the variables utilised in the example above. Matillion ETL resource names must be identical to the text name strings used in your Matillion ETL.

Name of Queue: As the name suggests, this refers to the Amazon SQS Queue that will be used to store and pass the message (s).

Group: Matillion ETL’s project group name.

Project: The name of the project in Matillion ETL.

Version: The name of the Matillion ETL project’s version.

Environment: The name of the environment in which Matillion ETL is to be executed.

Job: The name of the orchestration job in Matillion ETL.

Variables: Matillon ETL’s variables that are passed on to the job However, you can pass as many variables as you want.

To create the SQS queue:

  1. Go to https://console.aws.amazon.com/sqs/ to access the Amazon SQS console.
  2. Choose to create a queue.
  3. On the Create queue page, specify the correct region.
  4. The Standard queue type is selected by default. Choose FIFO. You can’t change the queue type after you create a queue.
  5. Enter a name for your queue. The name of a FIFO queue must end with the .fifo suffix.
  6. To create your queue with the default parameters, scroll to the bottom and choose Create Queue. Amazon SQS creates the queue and displays the queue’s Details page.
  7. Amazon SQS propagates information about the new queue across the system. Because Amazon SQS is a distributed system, you may experience a slight delay before the queue is displayed on the Queues page.
Creating queue in AWS SQS
Give Access Policy for queue

Consider adding more checking, commenting, etc. For these functions to work properly, you need to understand the format of the event and context objects that are passed in, and that depends to some degree on the services that trigger the function.

Functions need to have a role. S3 bucket monitoring and SQS message sending are required for this role. With the IAM console, you can easily manage all of this with simple managed policies.

1. In the Role dropdown, create a new S3 Execution Role. Once this is created, we will also need to modify this role to allow SQS access: — formalized paraphrase

  • In Services → IAM → Roles, select the role you created.
  • Click on Attach Policy and add AmazonSQSFullAccess.

2. Select Next, then Review and Create Function.

Extract, load and transform data by triggering an ETL job.

Each time a new file is received, the Matillion ETL job will load the data. On each run, it truncates the target table and loads the most recent file into it. In addition to adding calculated fields, it also looks up information about the airline and airport and finally adds the results to the final fact table.

Job for ETL

Define variables:

We want to load a different file each time this runs. So we can define a variable for that — we already saw in the Lambda function that we intend to pass a variable named “file to load,” so we should define that within Matillion ETL and provide a default value for testing the job.

a) Go to Project > Environment Variables.

b) Make an environment variable as shown below.

Create Job Variables

In the S3 Load Component Load Latest File, the S3 Object Prefix parameter refers to this. The lambda function provides the full path to the file in this case.

S3 Load Component

The job can be tested in isolation because all variables must have a default value. Then, to have this run whenever a new Queue message arrives (via our Lambda function), we can configure SQS in Matillion ETL by selecting Project SQS.

SQS Configuration

The Listen Queue is the queue to which our Lambda function writes. Matillion ETL also uses the Instance Credentials associated with the EC2 Instance — it could have used a manual access key and secret key, but the credentials must be able to read from SQS to pick up the message and read from S3 to read the data files.

As a result, we can use the Lambda function to trigger Matillion from AWS. But since errors can occur anywhere, we can add “success” or “failure” to SQS for ease. We can also send an SNS message indicating whether a load was successful or unsuccessful.

About Us

Bi3 has been recognized for being one of the fastest-growing companies in Australia. Our team has delivered substantial and complex projects for some of the largest organizations around the globe and we’re quickly building a brand that is well known for superior delivery.

Website : https://bi3technologies.com/

Follow us on,
LinkedIn : https://www.linkedin.com/company/bi3technologies
Instagram :
https://www.instagram.com/bi3technologies/
Twitter :
https://twitter.com/Bi3Technologies

--

--