---------------------------------------------------------------
Spark Basics
---------------------------------------------------------------
2. From spark context, we will create a RDD
3. Create RDD in 2 ways
-> from collections (List, Seq, Set ,....)
-> from datasets (csv, tsv, text, json, hive, hdfs, cassandra....)
---------------------------------------------------------------
Start the spark shell using 3 approaches:
1. scala command => spark-shell
2. python command => pyspark
3. R command => sparkR
Spark-1.6.x is compatable with Scala-2.10 & Scala-2.11
Spark-2.0.x is compatable with Scala-2.11
Spark context available as sc.
SQL context available as sqlContext.
------------------------------------------------------------------
RDD => Resilient Distributed DataSets
RDD features:
--------------
1. Immutable
2. Lazy Evaluated (bottom to top approach)
3. Cacheable
4. Type Infer
RDD Operations:
--------------------
1. Transformations
old rdd ====> new rdd
2. Actions
rdd ====> result
---------------------------------------------
Examples in RDD:
--------------------
list <-- {1, 2, 3, 4}
Ex1: Transformations
-------------------------
f(x) => x + 1
f(list) <-- {2, 3, 4, 5}
f(x) => x * x
f(list) <-- {1, 4, 9, 16}
Ex2: Actions
-------------------------
min(list) <-- 1
max(list) <-- 4
sum(list) <-- 10
avg(list) <-- 2.5
---------------------------------------------
1.SparkContext(sc) is the main entry point for spark operaions
2. From spark context, we will create a RDD
3. Create RDD in 2 ways
-> from collections (List, Seq, Set ,....)
-> from datasets (csv, tsv, text, json, hive, hdfs, cassandra....)
---------------------------------------------
Start the spark shell using 3 approaches:
1. scala command => spark-shell
2. python command => pyspark
3. R command => sparkR
Spark-1.6.x is compatable with Scala-2.10 & Scala-2.11
Spark-2.0.x is compatable with Scala-2.11
Spark context available as sc.
SQL context available as sqlContext.
---------------------------------------------
val list = List(1,2,3,4)
val rdd = sc.parallelize(list)
---------------------------------------------
scala> val list = List(1,2,3,4)
list: List[Int] = List(1, 2, 3, 4)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29
scala> rdd.getNumPartitions
res0: Int = 4
scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:29
scala> rdd.getNumPartitions
res1: Int = 2
---------------------------------------------
scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.map(x => x + 1).collect
res4: Array[Int] = Array(2, 3, 4, 5)
scala> rdd.map(x => x + 1)
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:32
scala> rdd.map(x => x + 1).collect
res6: Array[Int] = Array(2, 3, 4, 5)
scala> rdd.map(x => x * x).collect
res7: Array[Int] = Array(1, 4, 9, 16)
scala> rdd.min
res8: Int = 1
scala> rdd.max
res9: Int = 4
scala> rdd.sum
res10: Double = 10.0
---------------------------------------------
1. create a rdd with 1 to 10 numbers
val rdd = sc.parallelize(1 to 10)
2. add number 1 to each element
val addRdd = rdd.map(x => x + 1)
3. filter even number in above data
val filterRdd = addRdd.filter(x => x % 2 == 1)
4. sum of the all numbers in above data
val sum = filterRdd.sum
5. print all the numbers in above data
filterRdd.foreach(x => println(x))
filterRdd.foreach(println)
6. save this data into local file system / hdfs
val output = "file:///home/orienit/work/output/data"
val output = "hdfs://localhost:8020/output/data"
filterRdd.saveAsTextFile(output)
7. repartition the data
val repartitionRdd = filterRdd.repartition(1)
8. save this data into local file system / hdfs
val output = "file:///home/orienit/work/output/data1"
val output = "hdfs://localhost:8020/output/data1"
repartitionRdd.saveAsTextFile(output)
---------------------------------------------
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> rdd.collect
res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val addRdd = rdd.map(x => x + 1)
addRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:29
scala> addRdd.collect
res12: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
scala> val filterRdd = addRdd.filter(x => x % 2 == 1)
filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at filter at <console>:31
scala> filterRdd.collect
res13: Array[Int] = Array(3, 5, 7, 9, 11)
scala> val sum = filterRdd.sum
sum: Double = 35.0
scala> filterRdd.foreach(x => println(x))
3
5
9
11
7
scala> filterRdd.foreach(println)
7
3
5
9
11
scala> val output = "file:///home/orienit/work/output/data"
output: String = file:///home/orienit/work/output/data
scala> filterRdd.saveAsTextFile(output)
scala> val repartitionRdd = filterRdd.repartition(1)
repartitionRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at repartition at <console>:33
scala> val output = "file:///home/orienit/work/output/data1"
output: String = file:///home/orienit/work/output/data1
scala> repartitionRdd.saveAsTextFile(output)
scala> val output = "hdfs://localhost:8020/output/data1"
output: String = hdfs://localhost:8020/output/data1
scala> repartitionRdd.saveAsTextFile(output)
---------------------------------------------
Creating a RDD from DataSets:
--------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val rdd = sc.textFile(input)
scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput
scala> val rdd = sc.textFile(input)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at textFile at <console>:29
scala> rdd.getNumPartitions
res20: Int = 2
scala> val rdd = sc.textFile(input, 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[23] at textFile at <console>:29
scala> rdd.getNumPartitions
res21: Int = 1
scala> rdd.collect
res22: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)
scala> rdd.collect.foreach(println)
I am going
to hyd
I am learning
hadoop course
---------------------------------------------
WordCount in Spark using Scala:
-------------------------------------
// input path
val input = "file:///home/orienit/work/input/demoinput"
// creating a rdd from input
val fileRdd = sc.textFile(input, 1)
// read the file line by line
// split the line into words
val words = fileRdd.flatMap(line => line.split(" "))
// assign count(1) to each word
val counts = words.map(word => (word, 1))
// sum the list of values
val wordcount = counts.reduceByKey((a,b) => a + b)
// save the output into local file system / hdfs
val output = "file:///home/orienit/work/output/wordcount"
val output = "hdfs://localhost:8020/output/wordcount"
wordcount.saveAsTextFile(output)
scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput
scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at textFile at <console>:29
scala> val words = fileRdd.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at flatMap at <console>:31
scala> val counts = words.map(word => (word, 1))
counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:33
scala> val wordcount = counts.reduceByKey((a,b) => a + b)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:35
scala> val output = "file:///home/orienit/work/output/wordcount"
output: String = file:///home/orienit/work/output/wordcount
scala> wordcount.saveAsTextFile(output)
scala> val output = "hdfs://localhost:8020/output/wordcount"
output: String = hdfs://localhost:8020/output/wordcount
scala> wordcount.saveAsTextFile(output)
------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/wordcount"
val fileRdd = sc.textFile(input, 1)
val words = fileRdd.flatMap(line => line.split(" "))
val counts = words.map(word => (word, 1))
val wordcount = counts.reduceByKey((a,b) => a + b)
wordcount.saveAsTextFile(output)
------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/wordcount1"
val fileRdd = sc.textFile(input, 1)
val wordcount = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)
wordcount.saveAsTextFile(output)
Grep in Spark using Scala:
-------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/grepjob"
val fileRdd = sc.textFile(input, 1)
val filterRdd = fileRdd.filter(line => line.contains("am"))
filterRdd.saveAsTextFile(output)
WordCount in Spark using Python:
-------------------------------------
input = "file:///home/orienit/work/input/demoinput"
output = "file:///home/orienit/work/output/wordcount-py"
fileRdd = sc.textFile(input, 1)
words = fileRdd.flatMap(lambda line : line.split(" "))
counts = words.map(lambda word : (word, 1))
wordcount = counts.reduceByKey(lambda a,b : a + b)
wordcount.saveAsTextFile(output)
>>> input = "file:///home/orienit/work/input/demoinput"
>>> output = "file:///home/orienit/work/output/wordcount-py"
>>> fileRdd = sc.textFile(input, 1)
16/11/27 15:08:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/11/27 15:08:17 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/11/27 15:08:17 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:42322 (size: 13.9 KB, free: 3.4 GB)
16/11/27 15:08:17 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
>>> words = fileRdd.flatMap(lambda line : line.split(" "))
>>> counts = words.map(lambda word : (word, 1))
>>> wordcount = counts.reduceByKey(lambda a,b : a + b)
>>> wordcount.saveAsTextFile(output)
---------------------------------------------
Spark SQL:
---------------------
DataFrame <=> Table
orienit@kalyan:~$ cat /home/orienit/spark/input/student.json
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}
val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
scala> val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> df.show()
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.show()
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
---------------------------------------------
DataFrame supports 2 operations
1. DSL - Domain Specific Language
2. SQL - Structure Query Language
---------------------------------------------
DSL Examples:
------------------
scala> df.show()
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
scala> df.show(3)
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
+------+---+------+----+
only showing top 3 rows
scala> df.limit(3)
res32: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.limit(3).show()
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
+------+---+------+----+
scala> df.select("name", "id")
res34: org.apache.spark.sql.DataFrame = [name: string, id: int]
scala>
scala> df.select("name", "id").show()
+------+---+
| name| id|
+------+---+
| anil| 1|
|anvith| 5|
| dev| 6|
| raj| 3|
| sunil| 4|
|venkat| 2|
+------+---+
scala> df.select("name", "id").limit(2).show()
+------+---+
| name| id|
+------+---+
| anil| 1|
|anvith| 5|
+------+---+
scala> df.filter("id > 4").show()
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
+------+---+------+----+
scala> df.where("id > 4").show()
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
+------+---+------+----+
---------------------------------------------
SQL Examples:
---------------------
Register DataFrame as Temp Table
scala> df.registerTempTable("student")
scala> sqlContext.sql("select * from student")
res42: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> sqlContext.sql("select * from student").show()
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
scala> sqlContext.sql("select name, id from student").show()
+------+---+
| name| id|
+------+---+
| anil| 1|
|anvith| 5|
| dev| 6|
| raj| 3|
| sunil| 4|
|venkat| 2|
+------+---+
scala> sqlContext.sql("select * from student where id > 4").show()
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
+------+---+------+----+
scala> sqlContext.sql("select year, count(*) from student group by year").show()
+----+---+
|year|_c1|
+----+---+
|2015| 3|
|2016| 3|
+----+---+
scala> sqlContext.sql("select year, count(*) as cnt from student group by year").show()
+----+---+
|year|cnt|
+----+---+
|2015| 3|
|2016| 3|
+----+---+
---------------------------------------------
case class Contact(cid: Int, name: String, loc: String, pincode:Int)
case class Orders(oid: Int, cid: Int, status: String)
val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()
val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()
---------------------------------------------
scala> case class Contact(cid: Int, name: String, loc: String, pincode:Int)
defined class Contact
scala> case class Orders(oid: Int, cid: Int, status: String)
defined class Orders
scala>
scala> val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
contact: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[113] at map at <console>:27
scala> val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()
cdf: org.apache.spark.sql.DataFrame = [cid: int, name: string, loc: string, pincode: int]
scala>
scala> val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
orders: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[118] at map at <console>:27
scala> val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()
odf: org.apache.spark.sql.DataFrame = [oid: int, cid: int, status: string]
---------------------------------------------
scala> cdf.show()
+---+------+----+-------+
|cid| name| loc|pincode|
+---+------+----+-------+
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3| anil| hyd| 500071|
| 4| raj| hyd| 500075|
| 5| arun| hyd| 500074|
| 6| vani|bang| 600072|
| 7| vamsi|bang| 600073|
| 8|prasad|bang| 600076|
| 9|anvith|bang| 600075|
| 10| swamy|bang| 600071|
+---+------+----+-------+
scala> odf.show()
+---+---+-------+
|oid|cid| status|
+---+---+-------+
|111| 1|success|
|112| 1|failure|
|113| 2|success|
|114| 3|success|
|115| 2|failure|
|116| 3|failure|
|117| 2|success|
|118| 5|success|
|119| 6|failure|
|120| 2|success|
|121| 3|failure|
|122| 7|success|
|123| 3|failure|
|124| 2|success|
|125| 1|failure|
|126| 5|success|
+---+---+-------+
---------------------------------------------
scala> cdf.registerTempTable("customers")
scala> odf.registerTempTable("orders")
scala> sqlContext.sql("select customers.*, orders.* from customers join orders on customers.cid == orders.cid").show()
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 1|kalyan| hyd| 500072|125| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 2|venkat| hyd| 500073|117| 2|success|
| 2|venkat| hyd| 500073|120| 2|success|
| 2|venkat| hyd| 500073|124| 2|success|
| 3| anil| hyd| 500071|114| 3|success|
| 3| anil| hyd| 500071|116| 3|failure|
| 3| anil| hyd| 500071|121| 3|failure|
| 3| anil| hyd| 500071|123| 3|failure|
| 5| arun| hyd| 500074|118| 5|success|
| 5| arun| hyd| 500074|126| 5|success|
| 6| vani|bang| 600072|119| 6|failure|
| 7| vamsi|bang| 600073|122| 7|success|
+---+------+----+-------+---+---+-------+
---------------------------------------------
scala> val jdf = sqlContext.sql("select customers.cid, customers.name, customers.loc, customers.pincode, orders.oid, orders.status from customers join orders on customers.cid == orders.cid")
jdf: org.apache.spark.sql.DataFrame = [cid: int, name: string, loc: string, pincode: int, oid: int, status: string]
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val url = "jdbc:mysql://localhost:3306/kalyan"
jdf.write.jdbc(url, "jointable", prop)
---------------------------------------------
Share this article with your friends.
Very great Blog
ReplyDeleteWe are making the Best Software training in bangalore.
Software Training Institute in Bangalore
Hadoop Training in Bangalore
Nice blog, thanks For sharing this useful article I liked this.
ReplyDeleteMBBS In Abroad
Mba In B Schools
MS In Abroad
GRE Training In Hyderabad
PTE Training In Hyderabad
Toefl Training In Hyderabad
Ielts Training In Hyderabad