Spark troubleshooting from the trenches

Part II — Tricks and external data source management

Yann Moisan
Dec 7, 2018 · 9 min read
Image for post
Image for post

This post is the second episode from the “Spark from the trenches” article series. In the previous post, we’ve covered best practices and optimization tips.

We will continue to dig into some real-world situations that we have dealt with and focus on two topics:

  • First, we will see some operation tricks we actively use for troubleshooting. At Teads, we embrace the following motto: You build it, you run it. We had to make sure that we have the right tools to look at our system’s health and understand what’s going on.
  • Then, we will talk about best practices to use external data sources in your workflows with JDBC.

1- Operation tricks

Monitoring Spark applications

Spark includes a configurable metrics system based on the dropwizard.metrics library. It is set up via the Spark configuration. As we already are heavy users of Graphite and Grafana, we use the provided Graphite sink.

The Graphite sink needs to be used with caution. This is due to the fact that, for each metric, Graphite creates a fixed-size database to store data points. These zeroed-out Whisper files consume quite a lot of disk space.

By default, the application id is added to the namespace, which means that every spark-submit operation creates a new metric. Thanks to SPARK-5847 it is now possible to mitigate the Whisper behavior and remove the spark.app.id from the namespace.

spark.metrics.namespace=$name

Going further, we have built an in-house deploy stack that will automatically call spark-submit using this configuration. As developers, we have nothing to do to enable the monitoring and we are able to manage dozens of jobs.

On our Grafana dashboard (see below), we can choose the job we want to inspect. We especially observe memory consumption and task-related metrics.

With one quick look at the number of active tasks per executors, we are able to check if the cluster resources are efficiently used. Here, the visible drop on the graph is due to a shuffle and can help to identify bottlenecks.

Image for post
Image for post
Task monitoring

We monitor memory issues using two metrics: PS Scavenge (Minor GC) and PS MarkSweep (full GC). When we observe a simultaneous decrease of active tasks with an increase of full GCs we know that we need to review the sizing of our cluster.

Image for post
Image for post
Garbage Collector monitoring

This first set of dashboards is useful to detect memory issues. However, on the CPU side, we could go further by using flame-graphs to observe the executors. This could help detect hotspots and faulty instructions. Tools like Babar (open sourced by Criteo) can be used to aggregate Spark flame-graphs.

Log management

At Teads, we use Sumologic, a cloud-based solution, to manage our logs. With our deploy stack, a log4j configuration is generated for all our jobs using the SyslogAppender. So once again, there is nothing to do when a developer creates a new Spark job.

A log configuration is enforced for all our jobs to send data to our log collector. When an exception occurs, we can quickly know how many jobs are impacted, how many executions of these jobs are running, etc.

In the example below, we can see a read timeout on Amazon S3 that affects a job retrying continuously.

Image for post
Image for post
Read timeout on Amazon S3

Troubleshooting with logs

Logs are the best troubleshooting material there is but it can be a pain to investigate them.

Let’s dig into a real case study. In this example, one of our jobs was failing. After a quick look in the driver’s log, we can see some errors:

[WARN ] 2018–08–02 22:03:23,672 org.apache.spark.deploy.yarn.YarnAllocator — Container marked as failed: container_1532943674849_0820_01_000012 on host: ip-10–20–109–19.eu-west-1.compute.internal. Exit status: -100. Diagnostics: Container released on a *lost* node
[WARN ] 2018–08–02 22:03:23,674 org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint — Requesting driver to remove executor 5 for reason Container marked as failed: container_1532943674849_0820_01_000012 on host: ip-10–20–109–19.eu-west-1.compute.internal. Exit status: -100. Diagnostics: Container released on a *lost* node

Apparently, some spark executors died (Container released on a *lost* node), however, it remains to be explained …

The usual suspect is the memory, let’s have a look at the GC logs.

With our deploy stack, GC logs are activated on all production jobs as it doesn’t have an impact on performance. Also, our clusters are configured to archive log files to S3.

aws s3 cp — recursive “s3://teads-hadoop/engine/archive-logs/j-2FUFXCE83DD2B/containers/application_1532943674849_0820/container_1532943674849_0820_01_000038/” ../

Unfortunately, GC logs are stored in what you could call a cryptic text file. These are actually pretty hard to analyze without the appropriate tooling.

I have found a very useful service named GCeasy that helps me a lot for this task. GCeasy creates graphs and meaningful aggregates based on your logs. As this data (GC logs) isn’t sensitive for us I find it quite handy.

Image for post
Image for post
One of the graphs generated by GCeasy, it also offers the option to export a nice PDF report.

As we can see on the graph, there is a plateau from 12:01:20, meaning that we fail to free enough memory.

The solution here was to increase the number of partitions so that each task has to deal with less memory.

2- Using external data sources with JDBC

It’s sometimes handy to be able to work with external data sources. Spark has a built-in support for JDBC data sources. At Teads, we use MySQL to store data of reasonable size like:

  • Reference data (e.g. currency exchange rates),
  • Ad campaign parameters and so on …

Let’s see some read and write use cases.

JDBC Read

There are two cases in which we want to read a table

  • To join it with an existing DataFrame (so we need to have a DataFrame),
  • To use it in a User Defined Function (UDF). In this case, we need to have a plain object, for example, to transform a column (e.g. convert money into a reference currency).

A classic use case is to load a tiny reference table, like exchange rates, in order to use it in a UDF.

Spark
.read
.jdbc(url, table, props)
.collect()

By default, the reading operation happens on a single executor and then a collect operation sends data over the network to the driver.

In that case, it’s faster to directly load the table on the driver with a JDBC library. Coding a Spark job does not mean that we must use Spark to do everything.

JDBC Read subquery

Spark SQL has a useful feature that isn’t well-known, allowing to load data with a subquery.

From the Spark documentation we can read:

The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.

Here is a working example that we previously used to load the last known exchange rates for a given date.

val table =
s”””(SELECT dest_cur, rate
|FROM currency_exchange_rates
|WHERE time = (
| SELECT MAX(time) FROM currency_exchange_rates WHERE time < ‘${dtf.format(dt)}’
|)) AS currency_exchange_rates”””.stripMargin
spark.read.jdbc(url, table, props)

Note: with Spark 2.4.0, it is even easier, cf SPARK-24423

JDBC Write

In Spark, there are 4 save modes: Append, Overwrite, ErrorIfExists and Ignore.

Let’s consider the following use case: a job needs to replace the entire content of a table. Some tables are read by online services like the buying-engine, one of our services that handles bid requests.

These two modes are interesting for us :

  • Append: for adding new lines in a table,
  • Overwrite: to overwrite the full table (this could be misleading because existing rows that are not in the DataFrame will be deleted).

Note: Updating only a subset of rows is not supported.

JDBC Write — Naive approach

dataset
.write
.mode(SaveMode.Overwrite)
.jdbc(…)

Using the overwrite mode, Spark will drop and recreate the table. So all metadata information will be lost (like index or foreign keys). For that reason, we cannot use this mode.

JDBC Write — A slightly better way

This approach consists in deleting everything using a standard JDBC client (like scalikejdbc) and then let Spark insert data using a built-in feature.

db.autoCommit { implicit session =>
sql”””DELETE FROM my_table”””.update.apply()
}
dataset
.write
.mode(“append”)
.jdbc(…)

The main drawback is that it’s not atomic. If something bad happens in-between, the table will stay empty.

JDBC Write — A better way

Spark has an option to truncate a table.

dataset
.write
.mode(SaveMode.Overwrite)
.option(“truncate”, “true”)
.jdbc(…)

A little warning, as the truncate operation requires to have DROP privileges some database users might not be authorized to perform it.

Also, truncate is still not an ideal solution. It looks like a single operation but it’s actually performed in three steps, in the following order:

  1. The table is dropped,
  2. The dataset is computed,
  3. The table is filled up.

Between step 1 and 3, the table is empty. This is a critical issue for online services that need to read these data.

JDBC Write — Preferred solution

All examples below are based on the tiny and convenient scalikejdbc library.

If the volume of data is reasonable, it’s possible to collect the whole dataset on the driver and use a classic JDBC client to perform the job. Hence, all operations will be atomic.

As we can see, it does not even use Spark:

db.localTx { implicit session =>
sql”DELETE FROM my_table”.update().apply()
sql”””INSERT INTO my_table (…) VALUES (…)”””.batch(batchParams:_*).apply()
}

It shows that when using a high-level tool like Spark, we still need to understand what happens under the hood.

Coalesce vs repartition

In the literature, it’s often mentioned that coalesce should be preferred over repartition to reduce the number of partitions because it avoids a shuffle step in some cases.

But coalesce has some limitations (outside the scope of this article): it cannot increase the number of partitions and may generate skew partitions.

Here is one case where a repartition should be preferred. In this case, we filter most of a dataset.

df.doSth().coalesce(10).write(…)
Image for post
Image for post

The good point about coalesce is that it avoids a shuffle. However, all the computation is done by only 10 tasks.

This is due to the fact that the number of tasks depends on the number of partitions of the output of the stage, each one computing a big bunch of data. So a maximum of 10 nodes will perform the computation.

df.doSth().repartition(10).write(…)
Image for post
Image for post

Using repartition we can see that the total duration is way shorter (a few seconds instead of 31 minutes). The filtering is done by 200 tasks, each one working on a small subset of data. It’s also way smoother from a memory point a view, as we can see in the graph below.

Image for post
Image for post
JVM Heap monitoring

Takeaways

We have seen how important it is to take into account operation and build tools that ensure all jobs are started with the right parameters for debugging.

On external data source management, Spark brings a lot of magic but that shouldn’t always be taken for granted. As usual, we can gain more reliability by getting a better understanding of the underlying mechanisms.

By Yann Moisan and Benjamin DAVY

See also our other articles about Apache Spark on Teads Engineering blog:

Bibliography and links

Teads Engineering

150+ innovators building the future of digital advertising

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store