Spark Day 2 Practice:
==============================================
To work with Spark Sql + Hive :
----------------------------------------------------------
1. copy "hive-site.xml" from "$HIVE_HOME/conf" folder to "$SPARK_HOME/conf"
2. copy "mysq connector jar" from "$HIVE_HOME/lib" folder to "$SPARK_HOME/lib
----------------------------------------------------------
hive (kalyan)> select course, count(*) from kalyan.student group by course;
OK
cse 7
mech 7
Time taken: 22.567 seconds, Fetched: 2 row(s)
----------------------------------------------------------
sqlContext.sql("select course, count(*) from kalyan.student group by course").show()
scala> sqlContext.sql("select course, count(*) from kalyan.student group by course").show()
+------+---+
|course|_c1|
+------+---+
| mech| 7|
| cse| 7|
+------+---+
scala> sqlContext.sql("select year, count(*) from kalyan.student group by year").show()
+----+---+
|year|_c1|
+----+---+
| 1| 7|
| 2| 7|
+----+---+
----------------------------------------------------------
scala> sqlContext.sql("select course, count(*) from kalyan.student group by course")
res7: org.apache.spark.sql.DataFrame = [course: string, _c1: bigint]
hive (kalyan)> describe kalyan.student;
OK
name string student name
id int student id
course string student course
year int student year
Time taken: 0.349 seconds, Fetched: 4 row(s)
scala> sqlContext.sql("select * from kalyan.student")
res8: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> val df = sqlContext.sql("select * from kalyan.student")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
scala> df.select("name","id").show
+------+---+
| name| id|
+------+---+
| arun| 1|
| sunil| 2|
| raj| 3|
|naveen| 4|
| venki| 5|
|prasad| 6|
| sudha| 7|
| ravi| 1|
| raju| 2|
| roja| 3|
| anil| 4|
| rani| 5|
|anvith| 6|
| madhu| 7|
+------+---+
scala> df.filter(df("year") > 1).show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
----------------------------------------------------------
scala> df.filter(df("year") > 1).filter(df("id") === 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|venki| 5| cse| 2|
| rani| 5| mech| 2|
+-----+---+------+----+
scala> df.filter(df("year") > 1).where(df("id") === 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|venki| 5| cse| 2|
| rani| 5| mech| 2|
+-----+---+------+----+
scala> df.filter(df("year") > 1).where(df("id") === 5).select("name","id")show
warning: there were 1 feature warning(s); re-run with -feature for details
+-----+---+
| name| id|
+-----+---+
|venki| 5|
| rani| 5|
+-----+---+
scala> val gd = df.groupBy(df("year"), df("course"))
gd: org.apache.spark.sql.GroupedData = org.apache.spark.sql.GroupedData@cab0abd
scala> gd.count.show
+----+------+-----+
|year|course|count|
+----+------+-----+
| 1| mech| 3|
| 1| cse| 4|
| 2| mech| 4|
| 2| cse| 3|
+----+------+-----+
----------------------------------------------------------
scala> df.registerTempTable("student")
scala> sqlContext.sql("select * from student")
res36: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> sqlContext.sql("select * from student").show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
scala> sqlContext.sql("select year, course, count(*) from student group by year, course").show
+----+------+---+
|year|course|_c2|
+----+------+---+
| 1| mech| 3|
| 1| cse| 4|
| 2| mech| 4|
| 2| cse| 3|
+----+------+---+
----------------------------------------------------------
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
----------------------------------------------------------
scala> val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
val hiveDF = sqlContext.sql("select * from kalyan.student")
scala> val hiveDF = sqlContext.sql("select * from kalyan.student")
hiveDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> hiveDF.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
----------------------------------------------------------
val jsonDF = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val parquetDF = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
----------------------------------------------------------
scala> val jsonDF = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
jsonDF: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> jsonDF.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
----------------------------------------------------------
scala> val parquetDF = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
parquetDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> parquetDF.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
----------------------------------------------------------
From RDMBS => jdbcDF => create table => jdbcstudent
From HIVE => hiveDF => create table => hivestudent
From JSON => jsonDF => create table => jsonstudent
From PARQUET => parquetDF => create table => parquetstudent
jdbcDF.registerTempTable("jdbcstudent")
hiveDF.registerTempTable("hivestudent")
jsonDF.registerTempTable("jsonstudent")
parquetDF.registerTempTable("parquetstudent")
val query1 = "select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name"
sqlContext.sql(query1).show
scala> val query1 = "select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name"
query1: String = select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name
scala> sqlContext.sql(query1).show
[Stage 61:===========================================> (161 + 4 +------+---+------+----+------+---+------+----+
| name| id|course|year| name| id|course|year|
+------+---+------+----+------+---+------+----+
| raj| 3| spark|2016| raj| 3| cse| 1|
|anvith| 5|hadoop|2015|anvith| 6| mech| 2|
| sunil| 4|hadoop|2015| sunil| 2| cse| 1|
| anil| 1| spark|2016| anil| 4| mech| 2|
+------+---+------+----+------+---+------+----+
----------------------------------------------------------
val query2 = "select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name"
sqlContext.sql(query2).show
----------------------------------------------------------
scala> val query2 = "select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name"
query2: String = select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name
scala>
scala> sqlContext.sql(query2).show
+------+---+------+----+------+---+------+----+
|course| id| name|year| name| id|course|year|
+------+---+------+----+------+---+------+----+
| spark| 1| anil|2016| anil| 1| spark|2016|
|hadoop| 5|anvith|2015|anvith| 5|hadoop|2015|
|hadoop| 6| dev|2015| dev| 6|hadoop|2015|
| spark| 3| raj|2016| raj| 3| spark|2016|
|hadoop| 4| sunil|2015| sunil| 4|hadoop|2015|
| spark| 2|venkat|2016|venkat| 2| spark|2016|
+------+---+------+----+------+---+------+----+
----------------------------------------------------------
==============================================
To work with Spark Sql + Hive :
----------------------------------------------------------
1. copy "hive-site.xml" from "$HIVE_HOME/conf" folder to "$SPARK_HOME/conf"
2. copy "mysq connector jar" from "$HIVE_HOME/lib" folder to "$SPARK_HOME/lib
----------------------------------------------------------
hive (kalyan)> select course, count(*) from kalyan.student group by course;
OK
cse 7
mech 7
Time taken: 22.567 seconds, Fetched: 2 row(s)
----------------------------------------------------------
sqlContext.sql("select course, count(*) from kalyan.student group by course").show()
scala> sqlContext.sql("select course, count(*) from kalyan.student group by course").show()
+------+---+
|course|_c1|
+------+---+
| mech| 7|
| cse| 7|
+------+---+
scala> sqlContext.sql("select year, count(*) from kalyan.student group by year").show()
+----+---+
|year|_c1|
+----+---+
| 1| 7|
| 2| 7|
+----+---+
----------------------------------------------------------
scala> sqlContext.sql("select course, count(*) from kalyan.student group by course")
res7: org.apache.spark.sql.DataFrame = [course: string, _c1: bigint]
hive (kalyan)> describe kalyan.student;
OK
name string student name
id int student id
course string student course
year int student year
Time taken: 0.349 seconds, Fetched: 4 row(s)
scala> sqlContext.sql("select * from kalyan.student")
res8: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> val df = sqlContext.sql("select * from kalyan.student")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
scala> df.select("name","id").show
+------+---+
| name| id|
+------+---+
| arun| 1|
| sunil| 2|
| raj| 3|
|naveen| 4|
| venki| 5|
|prasad| 6|
| sudha| 7|
| ravi| 1|
| raju| 2|
| roja| 3|
| anil| 4|
| rani| 5|
|anvith| 6|
| madhu| 7|
+------+---+
scala> df.filter(df("year") > 1).show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
----------------------------------------------------------
scala> df.filter(df("year") > 1).filter(df("id") === 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|venki| 5| cse| 2|
| rani| 5| mech| 2|
+-----+---+------+----+
scala> df.filter(df("year") > 1).where(df("id") === 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|venki| 5| cse| 2|
| rani| 5| mech| 2|
+-----+---+------+----+
scala> df.filter(df("year") > 1).where(df("id") === 5).select("name","id")show
warning: there were 1 feature warning(s); re-run with -feature for details
+-----+---+
| name| id|
+-----+---+
|venki| 5|
| rani| 5|
+-----+---+
scala> val gd = df.groupBy(df("year"), df("course"))
gd: org.apache.spark.sql.GroupedData = org.apache.spark.sql.GroupedData@cab0abd
scala> gd.count.show
+----+------+-----+
|year|course|count|
+----+------+-----+
| 1| mech| 3|
| 1| cse| 4|
| 2| mech| 4|
| 2| cse| 3|
+----+------+-----+
----------------------------------------------------------
scala> df.registerTempTable("student")
scala> sqlContext.sql("select * from student")
res36: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> sqlContext.sql("select * from student").show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
scala> sqlContext.sql("select year, course, count(*) from student group by year, course").show
+----+------+---+
|year|course|_c2|
+----+------+---+
| 1| mech| 3|
| 1| cse| 4|
| 2| mech| 4|
| 2| cse| 3|
+----+------+---+
----------------------------------------------------------
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
----------------------------------------------------------
scala> val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
val hiveDF = sqlContext.sql("select * from kalyan.student")
scala> val hiveDF = sqlContext.sql("select * from kalyan.student")
hiveDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> hiveDF.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
+------+---+------+----+
----------------------------------------------------------
val jsonDF = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val parquetDF = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
----------------------------------------------------------
scala> val jsonDF = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
jsonDF: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> jsonDF.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
----------------------------------------------------------
scala> val parquetDF = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
parquetDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> parquetDF.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
----------------------------------------------------------
From RDMBS => jdbcDF => create table => jdbcstudent
From HIVE => hiveDF => create table => hivestudent
From JSON => jsonDF => create table => jsonstudent
From PARQUET => parquetDF => create table => parquetstudent
jdbcDF.registerTempTable("jdbcstudent")
hiveDF.registerTempTable("hivestudent")
jsonDF.registerTempTable("jsonstudent")
parquetDF.registerTempTable("parquetstudent")
val query1 = "select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name"
sqlContext.sql(query1).show
scala> val query1 = "select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name"
query1: String = select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name
scala> sqlContext.sql(query1).show
[Stage 61:===========================================> (161 + 4 +------+---+------+----+------+---+------+----+
| name| id|course|year| name| id|course|year|
+------+---+------+----+------+---+------+----+
| raj| 3| spark|2016| raj| 3| cse| 1|
|anvith| 5|hadoop|2015|anvith| 6| mech| 2|
| sunil| 4|hadoop|2015| sunil| 2| cse| 1|
| anil| 1| spark|2016| anil| 4| mech| 2|
+------+---+------+----+------+---+------+----+
----------------------------------------------------------
val query2 = "select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name"
sqlContext.sql(query2).show
----------------------------------------------------------
scala> val query2 = "select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name"
query2: String = select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name
scala>
scala> sqlContext.sql(query2).show
+------+---+------+----+------+---+------+----+
|course| id| name|year| name| id|course|year|
+------+---+------+----+------+---+------+----+
| spark| 1| anil|2016| anil| 1| spark|2016|
|hadoop| 5|anvith|2015|anvith| 5|hadoop|2015|
|hadoop| 6| dev|2015| dev| 6|hadoop|2015|
| spark| 3| raj|2016| raj| 3| spark|2016|
|hadoop| 4| sunil|2015| sunil| 4|hadoop|2015|
| spark| 2|venkat|2016|venkat| 2| spark|2016|
+------+---+------+----+------+---+------+----+
----------------------------------------------------------
Share this article with your friends.
TINNIN® Titanium Screw Comb with SURE HAND OPENING
ReplyDeleteThis ford edge titanium 2021 lightweight man titanium bracelet tool features a long knurled brass knurled nier titanium alloy handle allowing for easy blade manipulation. This titanium dioxide double tittanium edge scraper is suitable for