Spark SQL: How to consume json data from a REST service as DataFrame
On Spark 1.6:
If you are on Python, use the requests library to get the information and then just create an RDD from it. There must be some similar library for Scala (relevant thread). Then just do:
json_str = '{"executorCores": 2, "kind": "pyspark", "driverMemory": 1000}'
rdd = sc.parallelize([json_str])
json_df = sqlContext.jsonRDD(rdd)
json_df
Code for Scala:
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
This is from: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
here you go :- spark 2.2
import org.apache.spark.sql._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
object SparkRestApi {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("blah")
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName("blah")
.config("spark.sql.warehouse.dir", "C:\\Temp\\hive")
.master("local[2]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val url = "https://api.github.com/users/hadley/orgs"
val result2 = List(scala.io.Source.fromURL(url).mkString)
val githubRdd2=spark.sparkContext.makeRDD(result2)
val gitHubDF2=spark.read.json(githubRdd2)
println(gitHubDF2)
gitHubDF2.show()
spark.stop()
}
}
Spark cannot parse an arbitrary json to dataframe, because json is hierarchical structure and dataframe as flat. If your json is not created by spark, chances are that it does not comply to condition "Each line must contain a separate, self-contained valid JSON object" and hence will need to be parsed using your custom code and then feed to dataframe as collection of case-class objects or spark sql Rows.
You can download like:
import scalaj.http._
val response = Http("proto:///path/to/json")
.header("key", "val").method("get")
.execute().asString.body
and then parse your json as shown in this answer. And then create a Seq of objects of your case-class (say seq) and create a dataframe as
seq.toDF