Write data from HDFS to Elasticsearch using Apache Spark in a kerberized cluster
This article aims to explain how to index data from HDFS to elasticsearch using Apache Spark.
Many Hadoop users store their data on HDFS, which come with highly fault-tolerance and native support of large data sets.
To process data stored on HDFS, you may run MapReduce jobs or equivalent. If these jobs are very efficient for some use cases, however they are not appropriate for others.
Imagine you have a retail website with millions of users and you want to let your users search within seconds across billions of items. How do you deal with this?
Definitely MapReduce jobs cannot give you the performance you expect.
In such cases Elasticsearch gives a very powerful search engine, highly scalable and full-text search capability which allows you to store, search, and analyze big volumes of data quickly and in near real time.
So, how do you write your data from HDFS to an Elasticsearch cluster?
Many solutions are available here, in this article I explain how to do it using elasticsearch-hadoop libraries which provide a native integration between Elasticsearch and Apache Spark.
Stack
For this example I am using Hortonworks kerberized cluster HDP 2.5.3 which comes with Apache Spark 1.6.2 The version of our elasticsearch cluster is 5.4.3
Diving into the code
For this examples our data is stored on HDFS as a Hive table. My spark job is written in Scala and I am using sbt 1.1.2 to build the project.
Dependencies: Every dependency is declared in build.sbt
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2" % "provided" libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.2" % "provided" libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.6.2" % "provided" libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-13" % "5.4.3"
Main class: As our cluster is using MIT Kerberos for authentication, we need to configure the application for this
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf") System.setProperty("sun.security.krb5.debug","true")
We start by defining elasticsearch node and port
Val esnodes = “localhost” Val esport = “9200”
Then we set some application configurations. We first import two classes
import org.apache.spark.{SparkConf, SparkContext} val conf = new SparkConf().setAppName("Write Data To ES")
We activate Kerberos for the application
conf.set("spark.hadoop.hadoop.security.authentication", "kerberos") conf.set("spark.hadoop.hadoop.security.authorization", "true")
and set serializer for Apache Spark
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Then we set elasticsearch cluster configuration and the index creation property, the index will be created if it does not exist already.
conf.set("es.index.auto.create", "true") conf.set("es.nodes", esNodes) conf.set("es.port", esPort)
To run our job we need to create a sparkContext, which is the main entrance of any Apache Spark job (Since Spark 2.0, sparkSession became the single point of entry to interact with Apache Spark).
val sc = new SparkContext(conf)
As our data is stored in a Hive table and since we are using DataFrames, we will declare as well an sqlContext and a HiveContext. For this we import SQLContext and HiveContext classes first
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext
then we define the sqlContext and hiveContext
val sqlContext = new SQLContext(sc) val hiveContext = new HiveContext(sc)
Now as everything is set, we will use the DataFrame API to get the data from the Hive table.
val hiveDF = hiveContext.sql("select * from mydatabase.mytable")
Spark uses a lazy evaluation so at this stage nothing is done. We need to call a spark action on our DataFrame.
For this last action, we will use “saveToES” method provided by the native elasticsearch-hadoop library
For this we need to import first some classes
import org.elasticsearch.spark.sql._
and then call saveToES method
hiveDF.saveToEs("myindex/mytype")
How to use ?
Package the project using >sbt assembly, and then submit your jar using spark submit.
After spark job finishes you can check that your data is correctly indexed using Elasticsearch search API:
curl -XGET 'localhost:9200/myindex/_search’
You can find the original code on my github here .
Conclusion
In this article we saw that indexing data to elasticsearch using the native elasticsearch-hadoop library is very easy.
You can now enjoy the power of elasticsearch.
For more information about this subject you can read this documentation from elastic website.