Indexing into Elasticsearch using Spark — code snippets

Akshesh Doshi
3 min readOct 4, 2018

--

Here at Innoplexus, we have recently taken upon ourselves the task to create the largest search over Life Sciences data. We’ve been using Elasticsearch for our search requirements in many of our earlier products but this time the challenge was to scale our existing capabilities to search over Terabytes, if not Petabytes, of raw data. We were hit hard when we tried using our traditional Python scripts to index this huge amount of data into ES. We reckoned that we need to distribute the job of indexing this data — else it would take us weeks, or even months, to index even a single collection of our data sources.

When we decided to write our “scalable” Elasticsearch indexer the technology that we chose to have at its core was Apache Spark due to the various flexibilities and ease of task distribution it provides. I started searching for Spark-Elasticsearch connectors and soon realised that I had entered a region with quite poor documentation and there was nothing readily available. Hence, now that I have shipped a few stable version of our indexer, I would like to compile and share the minimalistic code required to stitch these technologies and make it work!

During the journey I kept improving the stack we were using and there are 3 snippets that I have to share:

  • Indexing data into Elasticsearch via Python through Spark RDDs
  • Indexing data into Elasticsearch via Python through Spark DataFrames
  • Indexing data into Elasticsearch via Scala through Spark DataFrames

These snippets can be used in various ways including spark-shell, pyspark or spark-submit clients. One thing that is common among these snippets is that they (necessarily) require the Elasticsearch-Hadoop jar file to run. ES-Hadoop is a way to connect ES with various Hadoop services and the below snippets effectively only use the Spark-ES connector. For instance, to use it with pyspark the command would be pyspark --jars elasticsearch-hadoop-5.6.4.jar --driver-class-path elasticsearch-hadoop-5.6.4.jar

Indexing via PySpark RDDs

Open a PySpark shell or use spark-submit to run this code.

from pyspark import SparkContext
import json
sc = SparkContext()
rdd = sc.parallelize([{'num': i} for i in xrange(10)])
def remove__id(doc):
# `_id` field needs to be removed from the document
# to be indexed, else configure this in `conf` while
# calling the `saveAsNewAPIHadoopFile` API
doc.pop('_id', '')
return doc
new_rdd = rdd.map(remove__id).map(json.dumps).map(lambda x: ('key', x))
new_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={
"es.nodes" : 'localhost',
"es.port" : '9200',
"es.resource" : '%s/%s' % ('index_name', 'doc_type_name'),
"es.input.json": 'true'
}
)

Indexing via PySpark DataFrames

Open a PySpark shell or use spark-submit to run this code.

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
'org.elasticsearch.spark.sql'
).option(
'es.nodes', 'localhost'
).option(
'es.port', 9200
).option(
'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()

Indexing via Scala DataFrames

Open a Spark shell or use spark-submit to run this code.

import org.bson.Document
import org.elasticsearch.spark.sql._
// Create a dataframe and store it as dfval df_no_id = df.drop("_id")
df_no_id.saveToEs(
"index_name/doc_type_name",
Map(
"es.nodes" -> "localhost:9200")
)

I would like to thank Gaurav Tripathi, Prashant Bhatwadekar and Manish Kumar Pal for their constant help while creating this module. :)

Few other resources that might be useful:
https://stackoverflow.com/questions/31410608/does-spark-not-support-arraylist-when-writing-to-elasticsearch/50942356#50942356
https://stackoverflow.com/questions/46762678/how-to-push-a-spark-dataframe-to-elastic-search-pyspark/52199097#52199097

--

--