Serverless Transactional Email Setup with Analytics using AWS SES, Glue, Athena, QuickSight

Pratik Bhopal
Aug 18 · 6 min read
CREATE EXTERNAL TABLE `csv_emails_log`(
`msg_id` string,
`event` string,
`event_timestamp` string,
`subject` string,
`to` string,
`from_email` string,
`link_clicked` string,
`tag` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
MAP KEYS TERMINATED BY 'undefined'
WITH SERDEPROPERTIES (
'collection.delim'='undefined')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://YOUR-S3-PATH'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='false',
'has_encrypted_data'='false')
id-1|Click|2018-11-20 22:30:28|Email subject 1|toemail@gmail.com|fromemail@gmail.com|http://google.com|crm|
id-2|Send|2018-11-19 22:30:28|Email subject 2|toemail2@gmail.com|fromemail2@gmail.com||crm|
id-1|Open|2018-11-19 21:30:28|Email subject 1|toemail@gmail.com|fromemail@gmail.com||crm|
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "raw_data", table_name = "your_csv_file_table", transformation_ctx = "datasource0")
rec_df = datasource0.toDF()
if len(rec_df.head(1)) != 0:
rec_df.createOrReplaceTempView("tempTable")
rec_sql_df = spark.sql("SELECT *, year(event_timestamp) as year_p, month(event_timestamp) as month_p, day(event_timestamp) as day_p FROM tempTable")
mapped_dyF = DynamicFrame.fromDF(rec_sql_df, glueContext, "mapped_dyF")
applymapping1 = ApplyMapping.apply(frame = mapped_dyF, mappings = [("msg_id", "string", "msg_id", "string"), ("event", "string", "event", "string"), ("event_timestamp", "string", "event_timestamp", "timestamp"), ("subject", "string", "subject", "string"), ("to", "string", "to", "string"), ("from_email", "string", "from_email", "string"), ("link_clicked", "string", "link_clicked", "string"), ("tag", "string", "tag", "string"), ("year_p", "int", "year", "int"), ("month_p", "int", "month", "int"), ("day_p", "int", "day", "int")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://path_to_store_parquet_files","partitionKeys": ["year","month","day"]}, format = "parquet", transformation_ctx = "datasink4")

Pratik Bhopal

Written by

Web enthusiast | Engineering Manager | Founder BookZeal

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade