SPARK SQL Practice
------------------------------------------------------------
To integrate Spark with Hive ... we need to copy 'hive-site.xml' from '$HIVE_HOME/conf' folder to '$SPARK_HOME/conf' folder
hive> select * from test.student;
OK
arun 1 cse 1
sunil 2 cse 1
raj 3 cse 1
naveen 4 cse 1
venki 5 cse 2
prasad 6 cse 2
sudha 7 cse 2
ravi 1 mech 1
raju 2 mech 1
roja 3 mech 1
anil 4 mech 2
rani 5 mech 2
anvith 6 mech 2
madhu 7 mech 2
Time taken: 0.143 seconds, Fetched: 14 row(s)
hive> select year, count(*) from test.``student group by year;
OK
1 7
2 7
Time taken: 21.865 seconds, Fetched: 2 row(s)
scala> sqlContext.sql("select year, count(*) from test.student group by year").show
+----+---+
|year|_c1|
+----+---+
| 1| 7|
| 2| 7|
+----+---+
--------------------------------------------------------------------
Convert any data into `DataFrame` then
we can execute all SQL queries like select, join, group by, order by .....
----------------------------------------------------------------------
val df = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/hadoop/spark/input/student.parquet")
scala> val df = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> val df = sqlContext.read.parquet("file:///home/hadoop/spark/input/student.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
scala> df.filter(df("id") > 3)
res12: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.filter(df("id") > 3).show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| sunil| 4|hadoop|2015|
+------+---+------+----+
scala> df.filter(df("id") > 3).filter(df("id") < 5)
res14: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.filter(df("id") > 3).filter(df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil| 4|hadoop|2015|
+-----+---+------+----+
scala> df.filter(df("id") > 3 and df("id") < 5)
res16: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.filter(df("id") > 3 and df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil| 4|hadoop|2015|
+-----+---+------+----+
scala> df.where(df("id") > 3 and df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil| 4|hadoop|2015|
+-----+---+------+----+
scala> df.select(df("name"), df("year"))
res20: org.apache.spark.sql.DataFrame = [name: string, year: int]
scala> df.select(df("name"), df("year")).show
+------+----+
| name|year|
+------+----+
| anil|2016|
|anvith|2015|
| dev|2015|
| raj|2016|
| sunil|2015|
|venkat|2016|
+------+----+
// Register DataFrame as Table
scala> df.registerTempTable("studentdf")
// Retrieve the data from Table using Sql Queries
scala> sqlContext.sql("select * from studentdf")
res23: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> sqlContext.sql("select * from studentdf").show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
// Retrieve the data from DataFrame using DSL
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
scala> sqlContext.sql("select year, count(*) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015| 3|
|2016| 3|
+----+---+
scala> sqlContext.sql("select year, sum(id) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015| 15|
|2016| 6|
+----+---+
scala> sqlContext.sql("select year, min(id) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015| 4|
|2016| 1|
+----+---+
----------------------------------------------------------
scala> val hiveDF = sqlContext.sql("select * from test.student")
hiveDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> val parDF = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
parDF: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> hiveDF.registerTempTable("hivetbl")
scala> parDF.registerTempTable("partbl")
scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id = partbl.id").show
+------+---+------+----+------+---+------+----+
| name| id|course|year|course| id| name|year|
+------+---+------+----+------+---+------+----+
| arun| 1| cse| 1| spark| 1| anil|2016|
| sunil| 2| cse| 1| spark| 2|venkat|2016|
| raj| 3| cse| 1| spark| 3| raj|2016|
|naveen| 4| cse| 1|hadoop| 4| sunil|2015|
| venki| 5| cse| 2|hadoop| 5|anvith|2015|
|prasad| 6| cse| 2|hadoop| 6| dev|2015|
| ravi| 1| mech| 1| spark| 1| anil|2016|
| raju| 2| mech| 1| spark| 2|venkat|2016|
| roja| 3| mech| 1| spark| 3| raj|2016|
| anil| 4| mech| 2|hadoop| 4| sunil|2015|
| rani| 5| mech| 2|hadoop| 5|anvith|2015|
|anvith| 6| mech| 2|hadoop| 6| dev|2015|
+------+---+------+----+------+---+------+----+
scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id != partbl.id").show
+------+---+------+----+------+---+------+----+
| name| id|course|year|course| id| name|year|
+------+---+------+----+------+---+------+----+
| arun| 1| cse| 1|hadoop| 5|anvith|2015|
| arun| 1| cse| 1|hadoop| 6| dev|2015|
| arun| 1| cse| 1| spark| 3| raj|2016|
| arun| 1| cse| 1|hadoop| 4| sunil|2015|
| arun| 1| cse| 1| spark| 2|venkat|2016|
| sunil| 2| cse| 1| spark| 1| anil|2016|
| sunil| 2| cse| 1|hadoop| 5|anvith|2015|
| sunil| 2| cse| 1|hadoop| 6| dev|2015|
| sunil| 2| cse| 1| spark| 3| raj|2016|
| sunil| 2| cse| 1|hadoop| 4| sunil|2015|
| raj| 3| cse| 1| spark| 1| anil|2016|
| raj| 3| cse| 1|hadoop| 5|anvith|2015|
| raj| 3| cse| 1|hadoop| 6| dev|2015|
| raj| 3| cse| 1|hadoop| 4| sunil|2015|
| raj| 3| cse| 1| spark| 2|venkat|2016|
|naveen| 4| cse| 1| spark| 1| anil|2016|
|naveen| 4| cse| 1|hadoop| 5|anvith|2015|
|naveen| 4| cse| 1|hadoop| 6| dev|2015|
|naveen| 4| cse| 1| spark| 3| raj|2016|
|naveen| 4| cse| 1| spark| 2|venkat|2016|
+------+---+------+----+------+---+------+----+
only showing top 20 rows
scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id != partbl.id").foreach(println)
[arun,1,cse,1,hadoop,5,anvith,2015]
[arun,1,cse,1,hadoop,6,dev,2015]
[arun,1,cse,1,spark,3,raj,2016]
[arun,1,cse,1,hadoop,4,sunil,2015]
[arun,1,cse,1,spark,2,venkat,2016]
[sunil,2,cse,1,spark,1,anil,2016]
[sunil,2,cse,1,hadoop,5,anvith,2015]
[sunil,2,cse,1,hadoop,6,dev,2015]
[sunil,2,cse,1,spark,3,raj,2016]
[sunil,2,cse,1,hadoop,4,sunil,2015]
[raj,3,cse,1,spark,1,anil,2016]
[raj,3,cse,1,hadoop,5,anvith,2015]
[raj,3,cse,1,hadoop,6,dev,2015]
[raj,3,cse,1,hadoop,4,sunil,2015]
[raj,3,cse,1,spark,2,venkat,2016]
[naveen,4,cse,1,spark,1,anil,2016]
[naveen,4,cse,1,hadoop,5,anvith,2015]
[naveen,4,cse,1,hadoop,6,dev,2015]
[naveen,4,cse,1,spark,3,raj,2016]
[naveen,4,cse,1,spark,2,venkat,2016]
[venki,5,cse,2,spark,1,anil,2016]
[venki,5,cse,2,hadoop,6,dev,2015]
[venki,5,cse,2,spark,3,raj,2016]
[venki,5,cse,2,hadoop,4,sunil,2015]
[venki,5,cse,2,spark,2,venkat,2016]
[prasad,6,cse,2,spark,1,anil,2016]
[prasad,6,cse,2,hadoop,5,anvith,2015]
[prasad,6,cse,2,spark,3,raj,2016]
[prasad,6,cse,2,hadoop,4,sunil,2015]
[prasad,6,cse,2,spark,2,venkat,2016]
[sudha,7,cse,2,spark,1,anil,2016]
[sudha,7,cse,2,hadoop,5,anvith,2015]
[sudha,7,cse,2,hadoop,6,dev,2015]
[sudha,7,cse,2,spark,3,raj,2016]
[sudha,7,cse,2,hadoop,4,sunil,2015]
[sudha,7,cse,2,spark,2,venkat,2016]
[ravi,1,mech,1,hadoop,5,anvith,2015]
[ravi,1,mech,1,hadoop,6,dev,2015]
[ravi,1,mech,1,spark,3,raj,2016]
[ravi,1,mech,1,hadoop,4,sunil,2015]
[ravi,1,mech,1,spark,2,venkat,2016]
[raju,2,mech,1,spark,1,anil,2016]
[raju,2,mech,1,hadoop,5,anvith,2015]
[raju,2,mech,1,hadoop,6,dev,2015]
[raju,2,mech,1,spark,3,raj,2016]
[raju,2,mech,1,hadoop,4,sunil,2015]
[roja,3,mech,1,spark,1,anil,2016]
[roja,3,mech,1,hadoop,5,anvith,2015]
[roja,3,mech,1,hadoop,6,dev,2015]
[roja,3,mech,1,hadoop,4,sunil,2015]
[roja,3,mech,1,spark,2,venkat,2016]
[anil,4,mech,2,spark,1,anil,2016]
[anil,4,mech,2,hadoop,5,anvith,2015]
[anil,4,mech,2,hadoop,6,dev,2015]
[anil,4,mech,2,spark,3,raj,2016]
[anil,4,mech,2,spark,2,venkat,2016]
[rani,5,mech,2,spark,1,anil,2016]
[rani,5,mech,2,hadoop,6,dev,2015]
[rani,5,mech,2,spark,3,raj,2016]
[rani,5,mech,2,hadoop,4,sunil,2015]
[rani,5,mech,2,spark,2,venkat,2016]
[anvith,6,mech,2,spark,1,anil,2016]
[anvith,6,mech,2,hadoop,5,anvith,2015]
[anvith,6,mech,2,spark,3,raj,2016]
[anvith,6,mech,2,hadoop,4,sunil,2015]
[anvith,6,mech,2,spark,2,venkat,2016]
[madhu,7,mech,2,spark,1,anil,2016]
[madhu,7,mech,2,hadoop,5,anvith,2015]
[madhu,7,mech,2,hadoop,6,dev,2015]
[madhu,7,mech,2,spark,3,raj,2016]
[madhu,7,mech,2,hadoop,4,sunil,2015]
[madhu,7,mech,2,spark,2,venkat,2016]
mysql> select * from student;
+----+-------------+-------+------+
| id | name | class | mark |
+----+-------------+-------+------+
| 1 | John Deo | Four | 75 |
| 2 | Max Ruin | Three | 85 |
| 3 | Arnold | Three | 55 |
| 4 | Krish Star | Four | 60 |
| 5 | John Mike | Four | 60 |
| 6 | Alex John | Four | 55 |
| 7 | My John Rob | Fifth | 78 |
| 8 | Asruid | Five | 85 |
| 9 | Tes Qry | Six | 78 |
| 10 | Big John | Four | 55 |
| 11 | Ronald | Six | 89 |
| 12 | Recky | Six | 94 |
| 13 | Kty | Seven | 88 |
| 14 | Bigy | Seven | 88 |
| 15 | Tade Row | Four | 88 |
| 16 | Gimmy | Four | 88 |
| 17 | Tumyu | Six | 54 |
| 18 | Honny | Five | 75 |
| 19 | Tinny | Nine | 18 |
| 20 | Jackly | Nine | 65 |
| 21 | Babby John | Four | 69 |
| 22 | Reggid | Seven | 55 |
| 23 | Herod | Eight | 79 |
| 24 | Tiddy Now | Seven | 78 |
| 25 | Giff Tow | Seven | 88 |
| 26 | Crelea | Seven | 79 |
| 27 | Big Nose | Three | 81 |
| 28 | Rojj Base | Seven | 86 |
| 29 | Tess Played | Seven | 55 |
| 30 | Reppy Red | Six | 79 |
| 31 | Marry Toeey | Four | 88 |
| 32 | Binn Rott | Seven | 90 |
| 33 | Kenn Rein | Six | 96 |
| 34 | Gain Toe | Seven | 69 |
| 35 | Rows Noump | Six | 88 |
+----+-------------+-------+------+
35 rows in set (0.00 sec)
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/sqoop", "student", prop)
jdbcDF.show()
scala> val prop = new java.util.Properties
prop: java.util.Properties = {}
scala> prop.setProperty("driver","com.mysql.jdbc.Driver")
res35: Object = null
scala> prop.setProperty("user","root")
res36: Object = null
scala> prop.setProperty("password","hadoop")
res37: Object = null
scala>
scala> val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/sqoop", "student", prop)
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string, class: string, mark: int]
scala>
scala> jdbcDF.show()
+---+-----------+-----+----+
| id| name|class|mark|
+---+-----------+-----+----+
| 1| John Deo| Four| 75|
| 2| Max Ruin|Three| 85|
| 3| Arnold|Three| 55|
| 4| Krish Star| Four| 60|
| 5| John Mike| Four| 60|
| 6| Alex John| Four| 55|
| 7|My John Rob|Fifth| 78|
| 8| Asruid| Five| 85|
| 9| Tes Qry| Six| 78|
| 10| Big John| Four| 55|
| 11| Ronald| Six| 89|
| 12| Recky| Six| 94|
| 13| Kty|Seven| 88|
| 14| Bigy|Seven| 88|
| 15| Tade Row| Four| 88|
| 16| Gimmy| Four| 88|
| 17| Tumyu| Six| 54|
| 18| Honny| Five| 75|
| 19| Tinny| Nine| 18|
| 20| Jackly| Nine| 65|
+---+-----------+-----+----+
only showing top 20 rows
Share this article with your friends.
No comments :
Post a Comment