Indexing into Elasticsearch using Spark — code snippets

  • Indexing data into Elasticsearch via Python through Spark RDDs
  • Indexing data into Elasticsearch via Python through Spark DataFrames
  • Indexing data into Elasticsearch via Scala through Spark DataFrames

Indexing via PySpark RDDs

from pyspark import SparkContext
import json
sc = SparkContext()
rdd = sc.parallelize([{'num': i} for i in xrange(10)])
def remove__id(doc):
# `_id` field needs to be removed from the document
# to be indexed, else configure this in `conf` while
# calling the `saveAsNewAPIHadoopFile` API
doc.pop('_id', '')
return doc
new_rdd = rdd.map(remove__id).map(json.dumps).map(lambda x: ('key', x))
new_rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={
"es.nodes" : 'localhost',
"es.port" : '9200',
"es.resource" : '%s/%s' % ('index_name', 'doc_type_name'),
"es.input.json": 'true'
}
)

Indexing via PySpark DataFrames

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
'org.elasticsearch.spark.sql'
).option(
'es.nodes', 'localhost'
).option(
'es.port', 9200
).option(
'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()

Indexing via Scala DataFrames

import org.bson.Document
import org.elasticsearch.spark.sql._
// Create a dataframe and store it as dfval df_no_id = df.drop("_id")
df_no_id.saveToEs(
"index_name/doc_type_name",
Map(
"es.nodes" -> "localhost:9200")
)

--

--

--

Explorer. Enthusiast.

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Akshesh Doshi

Akshesh Doshi

Explorer. Enthusiast.

More from Medium

Efficient CPU-intensive GRPC service in Python — Part 1

Apache Kafka’s 3 Main Functions

Creating DAG in Apache Airflow

Stream Landing Kafka Data to Object Storage using Terraform