Write data from HDFS to Elasticsearch using Apache Spark in a kerberized cluster

Elasticsearch

This article aims to explain how to index data from HDFS to elasticsearch using Apache Spark.

Many Hadoop users store their data on HDFS, which come with highly fault-tolerance and native support of large data sets.

To process data stored on HDFS, you may run MapReduce jobs or equivalent. If these jobs are very efficient for some use cases, however they are not appropriate for others.


Imagine you have a retail website with millions of users and you want to let your users search within seconds across billions of items. How do you deal with this?

Definitely MapReduce jobs cannot give you the performance you expect.

In such cases Elasticsearch gives a very powerful search engine, highly scalable and full-text search capability which allows you to store, search, and analyze big volumes of data quickly and in near real time.


So, how do you write your data from HDFS to an Elasticsearch cluster?

Many solutions are available here, in this article I explain how to do it using elasticsearch-hadoop libraries which provide a native integration between Elasticsearch and Apache Spark.


Stack

For this example I am using Hortonworks kerberized cluster HDP 2.5.3 which comes with Apache Spark 1.6.2 The version of our elasticsearch cluster is 5.4.3


Diving into the code

For this examples our data is stored on HDFS as a Hive table.
My spark job is written in Scala and I am using sbt 1.1.2 to build the project.

Dependencies: Every dependency is declared in build.sbt

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.2" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.6.2" % "provided"
libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-13" % "5.4.3"

Main class: As our cluster is using MIT Kerberos for authentication, we need to configure the application for this

System.setProperty("java.security.krb5.conf", "/etc/krb5.conf")
System.setProperty("sun.security.krb5.debug","true")

We start by defining elasticsearch node and port

Val esnodes = “localhost”
Val esport = “9200”

Then we set some application configurations.
We first import two classes

import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("Write Data To ES")

We activate Kerberos for the application

conf.set("spark.hadoop.hadoop.security.authentication", "kerberos")
conf.set("spark.hadoop.hadoop.security.authorization", "true")

and set serializer for Apache Spark

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Then we set elasticsearch cluster configuration and the index creation property, the index will be created if it does not exist already.

conf.set("es.index.auto.create", "true")
conf.set("es.nodes", esNodes)
conf.set("es.port", esPort)

To run our job we need to create a sparkContext, which is the main entrance of any Apache Spark job (Since Spark 2.0, sparkSession became the single point of entry to interact with Apache Spark).

val sc = new SparkContext(conf)

As our data is stored in a Hive table and since we are using DataFrames, we will declare as well an sqlContext and a HiveContext.
For this we import SQLContext and HiveContext classes first

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext

then we define the sqlContext and hiveContext

val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc)

Now as everything is set, we will use the DataFrame API to get the data from the Hive table.

val hiveDF = hiveContext.sql("select * from mydatabase.mytable")

Spark uses a lazy evaluation so at this stage nothing is done. We need to call a spark action on our DataFrame.

For this last action, we will use “saveToES” method provided by the native elasticsearch-hadoop library

For this we need to import first some classes

import org.elasticsearch.spark.sql._

and then call saveToES method

hiveDF.saveToEs("myindex/mytype")


How to use ?

Package the project using >sbt assembly, and then submit your jar using spark submit.

After spark job finishes you can check that your data is correctly indexed using Elasticsearch search API:

curl -XGET 'localhost:9200/myindex/_search’

You can find the original code on my github here .


Conclusion

In this article we saw that indexing data to elasticsearch using the native elasticsearch-hadoop library is very easy. You can now enjoy the power of elasticsearch.
For more information about this subject you can read this documentation from elastic website.

FIND ME

© Copyright Yacine A. 2017
Designed by Yacine A.