Save apache spark dataframe to database

Sujee
Sujee
Jul 28, 2017 · 2 min read

Some of my readers asked about saving Spark dataframe to database. You’d be surprised if I say that it can be done in a single line with the new spark JDBC datasource API. It is true. Let’s look at it in details.

In this example, I’m going to save a sample dataframe to my local MySQL database.

  • Define MySQL connection properties.
private static final String MYSQL_USERNAME = "expertuser";
private static final String MYSQL_PWD = "expertuser123";
private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://localhost:3306/employees?user=" + MYSQL_USERNAME + "&password=" + MYSQL_PWD;
  • Create Spark context and sql context.
private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkSaveToDb").setMaster("local[*]"));private static final SQLContext sqlContext = new SQLContext(sc);
  • Now we’ll create a sample dataframe which will be later saved to MySQL in a table. In this example, I’m going to load a sample JSON file from local filesystem as a dataframe. Any dataframes can be used here. Refer Spark SQL documentation to learn about other ways of creating dataframes.
DataFrame usersDf = sqlContext.jsonFile("src/main/resources/users.json");
  • Spark jdbc datasource API provides 2 options to save dataframe to a database.
  • Option 1: Create new table and insert all records using “createJDBCTable” function.
def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit

This function will create a new table with the given name. The table’s schema will be based on the schema of the dataframe. Once the table is created, it will insert all records.

parameters
url: connection URL
table: table name
allowExisting: If true, drop any table with same name. If false and there is already a table with same name, it will throw an exception

e.g.

usersDf.createJDBCTable(MYSQL_CONNECTION_URL, "users", true);
  • Option 2: Insert all records to an existing table using “insertIntoJDBC” function.
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit

This function is useful if you already have a table and you want to insert records there. Please note that dataframe’s schema and existing table’s schema should match.

parameters
url: connection URL
table: table name
overwrite: If true, it will truncate the table before inserting records.

e.g.

usersDf.insertIntoJDBC(MYSQL_CONNECTION_URL, "users", false);
  • Now run the program and verify that “users” table is created and all records are there.

Complete application is available at my GitHub repo https://github.com/sujee81/SparkApps.

Spark Experts

Apache Spark Blog

Sujee

Written by

Sujee

Java developer. Apache Spark enthusiast.

Spark Experts

Apache Spark Blog

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade