AWS Glue Job with PySpark

Yuvaraj Ravikumar
3 min readOct 24, 2019

— How to create a custom glue job and do ETL by leveraging Python and Spark for Transformations.[PySpark]

Here I am going to extract my data from S3 and my target is also going to be in S3 and transformations using PySpark in AWS Glue.

Let me first upload my file to S3 — source bucket. Below is the content of the file.

Input File

Here we need to Transform the file by adding a timestamp column to the end and converting the Name column values to the Upper case. Let’s go ahead and upload the file to the respective S3 location.

S3 Input file uploaded

Let’s proceed to create a table in the glue and write the transformation job.

Add Table in Glue
Create Table in Glue console

Once the table is created proceed for writing the Job.

Create a new job — script authored by you and paste the below code.

#<Source Code — — — Begins>

import sys
import datetime

import json
from collections import Iterable, OrderedDict
from itertools import product
import logging

import boto3
import pyspark
from pyspark.sql import Row
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import array, ArrayType, IntegerType, NullType, StringType, StructType
from pyspark.sql.functions import col, concat_ws, collect_list, explode, lit, split, when, upper

###################GLUE import##############
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import Relationalize
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from awsglue.transforms import *

#### ###creating spark and gluecontext ###############

sc = pyspark.SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

logger = logging.getLogger(__name__)
start_dttm=datetime.datetime.now()

print(“Job Execution Started…”)

## ###Creating glue dynamic frame from the catalog ###########
ds = glueContext.create_dynamic_frame.from_catalog(database = “test-only”, table_name = “mytable”, transformation_ctx = “datasource0”)

##### ##Creating spark data frame from the glue context ########
ds_df = ds.toDF()
ds_df.show()

time_stamp_final = datetime.datetime.now()
ds_df.select(“sno”,”Name”,”City”).show()
ds_df1 = ds_df.select(‘sno’,upper(col(‘Name’)),’City’)
ds_df2 = ds_df1.withColumn(“Timestamp”,lit(time_stamp_final))
datasource0 = DynamicFrame.fromDF(ds_df2, glueContext, “datasource0”)
datasink2 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = “s3”, connection_options = {“path”: “s3://xxxxxxxxxxxxxxx/xxxx/outputfile”},format = “json”, transformation_ctx = “datasink2”)
job.commit()

#<Source Code — — — Ends>

Glue Job

Run the job and once the job is successful.

Here is the transformed output file in the target S3 location.

Output file in JSON

Please comment below for any queries. Thanks for visiting my page.

Keep Learning!

--

--