Data Preprocess with AWS Glue
If you are a newbie on AWS, It’s very confusing to use AWS services. I can bet you won’t know where or how to start your mission. If you have done some getting started examples, and still don’t know concepts or mechanisms then I recommend reading AWS Glue FAQ also. It will be a bit boring though, I heard FAQ is very important for SAA(AWS Certified Solutions Architect; Associate Certification). You don’t need to understand everything, AWS World would be bigger than you thought.
Development Environment
In most cases your mission will be more complicated than basic examples, You would need custom scripts out of the Glue Studio GUI. AWS Glue Studio is not convenient to test and debug source code. So it would be easier to use a docker image for development locally.
https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/
But there are some limitations, this supports glue 1.0 only now and does not support AWS Glue Parquet writer (format=“glueparquet”).
Without a container, if you want to develop locally and check more limitations, follow the next link
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-libraries.html
If you succeed to create a docker container with a glue image, you can access jupyter notebook via http://localhost:8888 . In my case, I’ve already had jupyter notebook port 8888. So I’d changed to port 8080.
docker run -itd -p 8080:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
You should be done ‘aws configure’ before running this command.
You can select new notebook as PySpark, Python 3, Spark, and SparkR. Default Glue Scripts includes ‘from pyspark.context import SparkContext’ PySpark would be the right choice.
At first, I recommend using Job Visual Editor of Glue Studio. After setting basic things such as S3 bucket source and target, reusing an auto-generated script is helpful. Copy or download it.
There is one you should fix for Auto-generated script in the development environment because of no ‘sys.argv’ values.
## @params: [JOB_NAME]
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = { "JOB_NAME": "job0"}
When only importing script lines as a cell runs correctly, You can see the below output image including ‘SparkSeesion available’.
Mission
My mission was to append weather data to a time-series dataset. 3 datasets are used.
- OBS: Observations have their ID, name, location, and so on.
- Weathers: Daily Weather data contains a timestamp, average temperature, raindrop, average wind speed by observations
- Auctions: Daily time-series dataset contains item_id, timestamp, target_value, latitude, and longitude.
It needs 2 steps.
1. Finding closest OBS
2. Joining weather data with OBS
Using udf
This is basic usage of udf(from pyspark.sql.functions). DataSource1 has a location field and an example of its value is ‘35.1234_127.1234’. I had to split and set to a new column. Types are from ‘pyspark.sql.types’.
get_lat = udf(lambda x: float(x.split('_')[0]), FloatType())
get_lon = udf(lambda x: float(x.split('_')[1]), FloatType())df1 = DataSource1.toDF()
df1 = df1.withColumn('lat', get_lat(df1['location']))\
.withColumn('lon', get_lon(df1['location']))\
.drop('location')df1.printSchema()
DynamicFrame.fromDF
To write the result to S3, it’s required DynamicFrame from ‘awsglue.dynamicframe’. And you can not use ‘glueparquet’ format in this development environment as mentioned previously. Change this at production mode in console or you can use parameters.
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(df1, glueContext, 'OBS'), connection_type = "s3", format = "csv", connection_options = {"path": "s3://target/to/path/", "partitionKeys": []}, transformation_ctx = "DataSink0")
job.commit()
Glue 1.0 vs Glue 2.0
AWS Glue version 2.0 featuring 10x faster job start times and 1-minute minimum billing duration. In my case, Glue 1.0 took about 5 minutes more to run a job on AWS cloud. I couldn’t find the reason not to use Glue 2.0. However, we can not escape Glue 1.0 in the development environment using docker container yet. Our test environment is also fast though. Both support Spark 2.4.3, but sometimes these showed different results that one succeeds in development(Glue 1.0) and the other fails in AWS Glue console run(Glue 2.0) when ‘udf’ return type was written as a string.
AttributeError: 'NoneType' object has no attribute '_jvm'
This may be part of it. Nevertheless, the glue 2.0 setting is recommended in a console environment as production. Yes, it’s still going to be a bit confusing.
withColumn
This is some source code finding the closest observation with latitude and longitude. ‘df0’ is observations and ‘df1’ is auctions dataset which has a location. As a result, that’s appending the closest observation’s ID to df1. At first, I thought it needs row iterations as using 2 columns and double loops, but it’s possible using a ‘withColumn’.
# source from https://stackoverflow.com/a/41337005/14539284
def distance(lat1, lon1, lat2, lon2):
p = 0.017453292519943295
hav = 0.5 - cos((lat2-lat1)*p)/2 + cos(lat1*p)*cos(lat2*p) * (1-cos((lon2-lon1)*p)) / 2
return 12742 * asin(sqrt(hav))def closest(data, v):
return min(data, key=lambda p: distance(v['lat'],v['lon'],p['lat'],p['lon']))obs = list(map(lambda row: row.asDict(), df0.collect()))
set_obs = udf(lambda x, y: closest(obs, { "lat": x, "lon": y })['id'])df1 = df1.withColumn('obs', set_obs(df1["lat"], df1["lon"]))
df1.printSchema()
iterrows
Actually, I’m also a newbie on spark. Just, I had the experience of using ‘applyInPandas’ with spark 3.0. but I failed most methods in this environment. We are in the limited environment AWS Glue 1.0 which supports spark 2.4. (Recently Glue 3.0 is available in the console, supports spark 3.1). There are methods of looping rows with spark dataframe, but I couldn’t do these. I may have applied something wrong, but it failed, at least in my development environment. You can check more pyspark techniques as the next links.
- https://towardsdatascience.com/5-ways-to-add-a-new-column-in-a-pyspark-dataframe-4e75c2fd8c08
- https://sparkbyexamples.com/pyspark/pyspark-loop-iterate-through-rows-in-dataframe/
Restart kernel or spark.stop()
During work, you may see the next errors. After doing your job, just run ‘spark.stop()’ on every task or restart the kernel to continue works.
An error was encountered:
Invalid status code '404' from http://localhost:8998/sessions/8 with error payload: {"msg":"Session '8' not found."}
Joining weather data
Finally, created a new job with the result of the above and weather dataset. ‘Left join’ was used obs(observation id) and timestamp as keys. It’s done using Glue Studio GUI only.