Building Production PySpark Jobs

Data processing, insights and analytics are at the heart of Addictive Mobility, a division of Pelmorex Corp. We take pride in our data expertise and proprietary technology to offer mobile advertising solutions to our clients. Our primary product offering is a Demand Side Platform (DSP). We offer the technology that allows the ability for ad spaces to be bid on in real time and to deliver qualified ads to customers. We build and host customized campaigns for our advertisers based on non personally identifiable data that respects user choices and consent, that is collected across difference sources. The anonymous data that is collected, combined with our technology and analytics, allows us to provide relevant advertisements using our in-house data management platform (DMP).

About two years ago we decided to move from DiscoMR (a traditional Map-Reduce framework) to Apache Spark. Spark took the Big Data world by storm when it was introduced few years ago with its in-memory processing capabilities that significantly reduced processing times for sql like queries, machine learning modelling etc. It has built-in modules for streaming, SQL, machine learning data aggregation, and can speed up applications by up to 100 times when compared to traditional map-reduce engines.

This post describes some of the procedures we followed for designing production ready spark jobs and some challenges faced along the way. Whether you are planning to introduce Spark into your big data ecosystem or you already have a mature practice, this document could be beneficial in either case.

We extensively use Spark for making informed bidding decisions, and predictions about user behaviour. Addictive’s tech stack is python heavy so we naturally decided to use PySpark. The scale of data we are dealing with is a few TB every day which calls for high throughput low latency systems. Since our Spark jobs drive the behaviour of our critical systems, continuous testing and monitoring is needed to ensure quality and intended functionality are attained. Here we describe some of the practises we follow in designing our production jobs:

a.) Job Directory Structure

As we are in the phase of migrating all machine learning and data processing systems to Spark, the codebase is going to be sizeable, which necessitates a set of standard practices in terms of writing modular jobs, building, packaging and handling dependencies, testing etc. such that it scales well with our growing engineering team. We took inspiration from Best Practices Writing Production-Grade PySpark Jobs; the job structure looks like this:

├── spark_job.sh
├── Makefile
├── README.md
├── requirements.txt
├── src
│ ├── main.py
│ ├── jobs
│ │ └── sample_job
│ │ └── __init__.py
│ └── libs
│ │ └── requests
│ │ └── ...
│ └── shared
│ └── __init__.py

b.) Logging

Logging for a Spark application running in Yarn is handled via Apache Log4j service. If log aggregation is turned on (with the yarn.log-aggregation-enable config), container logs are copied to HDFS and deleted on the local machine.These logs can be viewed from anywhere on the cluster with the yarn logs command as follows:

yarn logs -applicationId <appID>

where appID is the Yarn app ID assuming you are using a Hortonworks (can be found on Spark Web UI page in the ‘ID’ column) or Cloudera Spark distribution, otherwise it can also be extracted using SparkContext from within the job. These logs can also be accessed through Spark Web UI under the ‘executors’ tab.

For our use case, we want:

  • the logs to be aggregated under the system’s designated directory in HDFS,
  • job stats to be stored separately from yarn container logs (since they are verbose).
  • add an email client to send ‘critical’ alerts to project owner and Data team members.

To meet the above needs, ‘logging’ module in python’s standard library is utilized along with Log4j (controlled/modified as per project requirements). Logger can be configured via a configuration file which defines the Logger and Handler configuration.

As you may know for each system that goes into production there is a designated path on HDFS to store the relevant project info e.g. logs, data, models etc. Lets demonstrate the logging procedure via a sample project ‘test_logger’. It has the standard directory structure that we follow, and we add logging to it.

Here’s the directory structure with log management files:

Directory Structure of 'test_logger'
├── Makefile
├── Requirements.txt
├── spark_submit.sh
└── src
├── config.json
├── jobs
│ ├── __init__.py
│ └── sample_job
│ ├── __init__.py
│ └── job_method.py
├── libs
│ └── __init__.py
├── log4j.properties
├── logging.json
├── main.py
└── shared
└── __init__.py

Now, let’s break it down, assuming the reader is familiar with this directory structure for a spark application, there are 2 additional files here which are the config files for logging and log4j. If we look at the contents, logging.json defines the logger and handler properties for logging module. As you can see there are 3 handlers; one for console, one for info file and one for error file. Therefore, this logger will generate two files; info.log which contains information regarding the job for e.g. how long it took for completion, number of data points/users etc., and error.log which has higher level filter associated with its handler and it only handles messages which are above the ‘error’ level of importance. Console handler outputs the stdout to console so that user can see it through Spark web UI as well.

logging.json

{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
},

"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout"
},

"info_file_handler": {
"class": "logging.FileHandler",
"level": "INFO",
"formatter": "simple",
"filename": "info.log"
},

"error_file_handler": {
"class": "logging.FileHandler",
"level": "ERROR",
"formatter": "simple",
"filename": "errors.log"
}
},

"loggers": {
"job": {
"handlers": ["console", "info_file_handler", "error_file_handler"]
}
},

"root": {
"level": "INFO",
"handlers": ["console", "info_file_handler", "error_file_handler"]
}
}

Storing Yarn Cluster Logs

To debug memory leaks that may occur in JVM or to extract information about the resource allocation during the job run etc., it is convenient to store Yarn logs in the application directory. Spark uses Apache Log4j service for logging which can be configured to store logs at a specified HDFS path instead of the default location and we can also control the logging level. The configuration file for Log4j looks like this:

log4j.properties

# Root logger option
log4j.rootLogger=WARN, file
# Direct log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=spark_job_log4j.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# configure root logger for stdout
log4j.rootLogger=WARN, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

In the above config. file rootLogger is the logger and and we can add multiple appenders (which are analogous to handlers in python’s standard logging module). There are 2 appenders ‘ConsoleAppender’ for directing stdout to console which is the default behaviour but its logging level can be controlled, and ‘RollingFileAppender’ to direct the logs to a specified file (‘spark_job_log4j.log’ in the above file)

Within the pyspark script it only needs to be invoked once, and it logs all the relevant information from thereon. Add the following blob to a pyspark script:

Log4j initialization

# initialize  logger for yarn cluster logs
log4jLogger = sc._jvm.org.apache.log4j
logger_pyspark = log4jLogger.LogManager.getLogger(__name__)
logger_pyspark.info("pyspark script logger initialized")

Putting it all Together

This logging approach can be seamlessly integrated into any spark application, the user needs to provide two config files i.e. log4j.properties and logging.json, as shown below, which need not change, only thing that will need modification is HDFS destination path which is set in config.json. For example, if the path to project data in HDFS is “/foo/bar” then set

config[“logger_config”][“logging_config_path”] = .”/foo/bar”

In spark_submit.sh add the following argument

--driver-java-options '-Dlog4j.configuration=file:log4j.properties'

Then in the main.py initialize the loggers as shown below:

main.py

import json
import logging
from hdfs3 import HDFileSystem

if __name__ == '__main__':

from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder.appName(args.job_name).getOrCreate()
sc = spark.sparkContext
sqlcontext = SQLContext(sc)

# setup logging to be passed to each module
# logs are stored in the driver directory and then shipped to hdfs once job is complete
import logging.config
import logging

# get logging config file
with open('./logging.json') as log_json:
logging_config = json.load(log_json)
logging.config.dictConfig(logging_config)
logger_main = logging.getLogger(__name__)

# to test various levels
logger_main.debug('debug message')
logger_main.info('info message')
logger_main.warn('warn message')
logger_main.error('error message')
logger_main.critical('critical message')

# initialize log4j for yarn cluster logs
log4jLogger = sc._jvm.org.apache.log4j
logger_pyspark = log4jLogger.LogManager.getLogger(__name__)
logger_pyspark.info("pyspark script logger initialized")

# push logs to hdfs
logpath = job_config['logger_config']['path']
# get the hdfs directory where logs are to be stored
hdfs = HDFileSystem(host=job_config['HDFS_host'], port=port)
current_job_logpath = logpath + f"{datetime.datetime.now():%Y-%m-%d}"
job_run_path = current_job_logpath + '/' + str(int(f"{datetime.datetime.now():%H}") - 2) + '/'

if hdfs.exists(current_job_logpath):
pass
else
:
# create directory for today
hdfs.mkdir(current_job_logpath)
hdfs.chmod(current_job_logpath, mode=0o777)

hdfs.put("info.log", job_run_path + "info.log")
hdfs.put("errors.log", job_run_path + "errors.log")
hdfs.put("spark_job_log4j.log", job_run_path + "spark_job_log4j.log")

loggers are initialized in main.py and then can be used in any of the modules by just referring to them. Once the job is completed, log files are transferred to the designated path in HDFS. In the above code the logs are put under the path “/<logpath>/yyyy-mm-dd/hour”.

To append to “logger_main” initialized above in job.sample_job.job_method.py module,

sample_job.py

import logging


class test_class(object):
def __init__(self):
self.logger_1 = logging.getLogger(__name__)

def analyze(self, sc, spark, datapath, job_config, **kwargs):
self.logger_1.info("inside analyze")

c.) Unit Testing a Spark Application

There’s still no standard practice to unit test spark applications. There are several suggestions on various online forums. Spark’s documentation doesn’t provide explicit guidance but recommends this http://spark.apache.org/docs/2.1.1/programming-guide.html#unit-testing. Here we will provide a guide geared towards writing unit tests for various stages in a spark job (map, reduce and output). It can be extended/ altered to cater to a specific application that may not include all these cases or may have additional implementations such as a ML train/test job.

Step-by-step guide

To begin the reader is assumed to be familiar with python’s unittest module. In this application, one sparkcontext is invoked and is used for all the unittests, since it’s not possible to run multiple sparkcontexts within a job.

Identical directory structure is used as the actual job to run the unit tests so that there is minimal disparity between the two. The unit testing is done under a new folder <main_spark_job_folder>/tests/. As you can see ./src and ./test directories resemble each other in terms of sub folders and main_test.py contains all the test cases.

├── Makefile
├── README.md
├── Requirements.txt
├── spark_submit.sh
├── src
│ ├── config.json
│ ├── config_dev.json
│ ├── jobs
│ │ ├── __init__.py
│ │ ├── job_1
│ │ │ ├── __init__.py
│ │ │ ├── script1.py
│ │ ├── job_2
│ │ │ ├── __init__.py
│ │ │ ├── script2.py
│ ├── libs
│ ├── log4j.properties
│ ├── logging.json
│ ├── main.py
└── tests
├── config.json
├── jobs
│ │ ├── __init__.py
│ │ ├── job_1
│ │ │ ├── __init__.py
│ │ │ ├── script1.py
│ │ ├── job_2
│ │ │ ├── __init__.py
│ │ │ ├── script2.py
├── libs
├── log4j.properties
├── logging.json
├── main_test.py

Now let’s have a glimpse of how a test case is defined:

import unittest


class SparkJobTests(unittest.TestCase):
# Assume that one of the column in a dataframe is supposed to be a country from which this record originated, and if it doesn't belong in the valid list of countries then
def test_consume_map_when_country_is_incorrect(self):
df_test = spark.createDataFrame([("S. Korea", timestamp)],
["geo_country", "time"])
# since S. Korea is not in the valid list, the map function should return the default value
expected_result = DEFAULT_INVALID_VALUE
instance = job_module.job_1(spark, sqlcontext, logs_=df_test)
result = instance.job_controller('map')[0].Value
print(result)
self.assertEqual(result, expected_result)

As shown above, to perform testing with Python’s unittest package, prebuilt unittest.TestCase class is extended with SparkJobTests class. Each method of SparkJobTests represents a specific test case that you’d want to run whether to check a map/reduce function, verify input/output to an external system/database or a logic inside a nested loop.

Skeleton Code for writing PySpark unit tests

main_test.py

import logging
import unittest

# here declare all the variables that are shared among unit test methods.
shared_variable = "foo"

# get the job module to be used given the job name
# for example if we want to tests on job_1
job_to_test = "job_1"
job_module = importlib.import_module('jobs.%s' % job_to_test)
sys.path.insert(0, './jobs/%s' % job_to_test)


# here goes the class and the unit test methods

class SparkJobTests(unittest.TestCase):
# methods below represent the test cases you'd want to run on a section of a job as described in the above code example
def method_1(self
pass

def method_2
(self):
pass

.
.
.
def method_n(self):
pass


# start the testing

if __name__ == '__main__':
# run all tests main
unittest.main()

# only run a specific test lets say test contained in method_k
suite = unittest.TestSuite()
suite.addTest(Test("method_k"))
runner = unittest.TextTestRunner()
runner.run(suite)

To run the test put the following in a bash file let’s say spark_submit.sh (assuming you’re working on a linux server)

sudo pip3 install --upgrade -r Requirements.txt -t ./tests/libs
mkdir ./dist
cp ./tests/main_test.py ./tests/config.json ./dist
cd ./tests && zip -x main_test.py -x config.json -x \*libs\* -r ../dist/jobs.zip .
cd ./tests/libs && zip -r ../../dist/libs.zip .
cd dist
spark-submit --master yarn --deploy-mode cluster \
--driver-memory 28g --executor-memory 28g --num-executors 5 --executor-cores 5 \
--driver-java-options '-Dlog4j.configuration=file:log4j.properties' \
--name Spark Sample Test \
--queue large \
--conf spark.logCong=true \
--py-files jobs.zip,libs.zip \
--files log4j.properties,config.json,logging.json \
main_test.py

and to run it use the following commands in a linux console:

chmod +x spark_submit.sh
./spark_submit.sh

d.) Alerts through Server Density

It can be cumbersome to visit Spark UI over and over to ensure a scheduled run of a certain spark job ran successfully, and to circumvent that we wanted to create an alert system. A job can fail due to myriad of reasons which are intrinsic or extrinsic, and we felt a need to be notified of such occurrence and built this email alerting system facilitated by Server Density, which is a web service platform for security and alerting. Server density runs daemon processes in the server and reads a status file every minute (this setting can be configured to suit the need for a specific application based on the frequency of job status update).

Generating the Status File

Server Density requires reading a status file to build a custom alert, in our case we decided to create a status file and store it in hdfs to be picked up by Server Density. Ideally you want to put this file in the hdfs path of your Spark system. This file needs to have the yarn application id for the spark job in question along with state variables reflecting the desired state the job needs to maintain, and these will be used by Server Density to run the correctness tests.

Lets look at an example alert setup for one of our production systems. For this specific system, a job needs to start every 3 hours and finish before its time for the next one. The status file looks like this:

{
"app_id": "application_1526666425147_0697",
"hour_now": 18,
"hour_start": "18",
"hour_end": "21"
}

Whenever a new job is submitted to the spark cluster, the above status file is generated with a new application id (app_id) as well as hour_now, hour_start and hour_end fields. These hour fields reflect that this job started at 18:00 i.e. hour_start, must end before hour_end i.e. 21:00 and it also stores hour_now field which is used to check if the current hour is more than the hour_end. If so, an alert needs to be generated. Once this file is stored in HDFS, server density kicks in and reads this application id to check the status of the job with

yarn application -status <Application ID>

If the status is “Failed” or “Killed” an alert is issued, as well as if job didn’t finish by hour_end.

Following this routine, we can create a status file for the spark application for which the alert system needs to be set up. Of course the state variables stored in the file will vary depending upon the scenarios in which user/developer wants to get an alert.

After the status file is setup, next step is to create a Server Density plugin to open up the status file and perform some simple manipulation on it and run a Yarn command using the extracted information:

with hdfs.open("<path to status file on hdfs>/status.json") as f:
foo = f.read()
jsonstuff = json.loads(foo)
appid = jsonstuff['app_id']
hournow = jsonstuff['hour_now']
hourstart = jsonstuff['hour_start']
hourend = jsonstuff['hour_end']
if (int(hourend) - int(hourstart)) < 0:
hourdiff = (int(hourend) - int(hournow)) + 24
else:
hourdiff = int(hourend) - int(hournow)

tmp_file = "/tmp/locus"

cmd = "yarn application -status " + appid + " | grep Final-State | cut -d':' -f2 | sed -e 's/^[[:space:]]*//' >%s" % (
tmp_file)
retval = system(cmd)
file = open(tmp_file)
finalstate = file.read().strip()
file.close()
if retval != 0:
finalstate = "yarn command failed"

# build data, marshall the data into a new json structure and send it off to ServerDensity:
data = {
"app_id": appid,
"hour_now": hournow,
"hour_start": hourstart,
"hour_end": hourend,
"hour_diff": hourdiff,
"final_state": finalstate
}

# return it
return data

The final step here is to create the custom alert on the Server Density UI (We won’t go through this here, more details about this step can be found on their website).

Conclusion

Here we’ve shared some of the practices that we follow internally for our production jobs, they ensure certain quality is met, and issues are quickly flagged and dealt with. We took inspiration from many other blogposts, and this is our attempt at contributing back to the community. The practices listed here are a good and simple start, but as jobs grow more complex, many other features should be considered, like advanced scheduling and dependency management, monitoring of jobs storage and time consumption, measures to recover/redo processing when failures occur, and management of configs and third party libraries, etc.

Authors