Showing posts with label Spark Basics. Show all posts
Showing posts with label Spark Basics. Show all posts

Friday, 7 April 2017

SPARK BASICS Practice on 02 Apr 2017

`Spark` is meant for `In-Memory Distributed Computing`

`Spark` provides 4 libraries:
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX

`Spark Context` is the Entry point for any `Spark Operations`

`Resilient Distributed DataSets` => RDD

RDD features:
-------------------
1. immutability
2. lazy evaluation
3. cacheable
4. type infer

RDD operations:
-----------------
1. Transformations
<old rdd> ----> <new rdd>

2. Actions
<rdd> ---> <result>


Examples on RDD:
-------------------------
list <- {1,2,3,4,5}

1. Transformations:
---------------------
Ex1:
-----
f(x) <- {x + 1}

f(list) <- {2,3,4,5,6}


Ex2:
-----
f(x) <- {x * x}

f(list) <- {1,4,9,16,25}


2. Actions:
---------------------

sum(list) -> 15
min(list) -> 1
max(list) -> 5



How to Start the Spark:
-----------------------------
scala => spark-shell
python => pyspark
R => sparkR

Spark-1.x:
--------------------------------------
Spark context available as 'sc'
Spark Sql Context available as 'sqlContext'


Spark-2.x:
--------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'


How to Create a RDD:
---------------------------------------
We can create RDD 2 ways
1. from collections (List, Seq, Set, ....)
2. from data sets (text, csv, tsv, json, ...)


1. from collections:
---------------------------------------
val list = List(1,2,3,4,5)

val rdd = sc.parallelize(list)

val rdd = sc.parallelize(list, 2)

Syntax:
-----------------------
val rdd = sc.parallelize(<collection object>, <no.of partitions>)


2. from datasets:
---------------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val rdd = sc.textFile(file)

val rdd = sc.textFile(file, 1)


Syntax:
-----------------------
val rdd = sc.textFile(<file path>, <no.of partitions>)

------------------------------------------------------

scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.getNumPartitions
res0: Int = 4

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> rdd.getNumPartitions
res1: Int = 2


------------------------------------------------------

scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput

scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[3] at textFile at <console>:26

scala> rdd.getNumPartitions
res2: Int = 2

scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[5] at textFile at <console>:26

scala> rdd.getNumPartitions
res3: Int = 1

------------------------------------------------------
Examples on RDD
------------------------------------------------------
val list = List(1,2,3,4,5)

val rdd1 = sc.parallelize(list, 2)

------------------------------------------------------

val rdd2 = rdd1.map(x => x + 1)

val rdd3 = rdd1.map(x => x +* x)

------------------------------------------------------

scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> val rdd1 = sc.parallelize(list, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

------------------------------------------------------

scala> val rdd2 = rdd1.map(x => x + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:28

scala> rdd1.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd2.collect
res5: Array[Int] = Array(2, 3, 4, 5, 6)


scala> val rdd3 = rdd1.map(x => x * x)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:28

scala> rdd3.collect
res6: Array[Int] = Array(1, 4, 9, 16, 25)

------------------------------------------------------

rdd1.min

rdd1.max

rdd1.sum


------------------------------------------------------
scala> rdd1.min
res7: Int = 1

scala> rdd1.max
res8: Int = 5

scala> rdd1.sum
res9: Double = 15.0

scala> rdd1.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd1.count
res11: Long = 5

------------------------------------------------------
Word Count in Spark using Scala
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-op"

val fileRdd = sc.textFile(input, 1)

val wordsRdd = fileRdd.flatMap(line => line.split(" "))

val tuplesRdd = wordsRdd.map(word => (word, 1))

val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(output)

------------------------------------------------------
Optimize the Code :
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-op"

val fileRdd = sc.textFile(input, 1)

val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(output)


------------------------------------------------------

scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput

scala> val output = "file:///home/orienit/work/output/spark-op"
output: String = file:///home/orienit/work/output/spark-op

scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[11] at textFile at <console>:26

scala> fileRdd.collect
res12: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)

scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at <console>:28

scala> wordsRdd.collect
res13: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)

scala> val tuplesRdd = wordsRdd.map(word => (word, 1))
tuplesRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:30

scala> tuplesRdd.collect
res14: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))

scala> val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:32

scala> wordCountRdd.collect
res15: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> wordCountRdd.saveAsTextFile(output)




------------------------------------------------------
Grep Job in Spark using Scala:
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-grep-op"

val fileRdd = sc.textFile(input, 1)

val grepRdd = fileRdd.filter(line => line.contains("am"))

grepRdd.saveAsTextFile(output)



------------------------------------------------------
Sed Job in Spark using Scala:
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-sed-op"

val fileRdd = sc.textFile(input, 1)

val sedRdd = fileRdd.map(line => line.replaceAll("am", "xyz"))

sedRdd.saveAsTextFile(output)

------------------------------------------------------
Spark SQL:
------------------------------------------------------

hive> select * from kalyan.student;
scala> spark.sql("select * from kalyan.student").show

------------------------------------------------------

hive> select year, count(*) from kalyan.student group by year;
scala> spark.sql("select year, count(*) from kalyan.student group by year").show

------------------------------------------------------
Data Frames in Spark
------------------------------------------------------
val hiveDf = spark.sql("select * from kalyan.student")

hiveDf.show


hiveDf.registerTempTable("hivetbl")


val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)

jdbcDf.show

jdbcDf.registerTempTable("jdbctbl")

------------------------------------------------------
scala> val hiveDf = spark.sql("select * from kalyan.student")
hiveDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]

scala> hiveDf.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  arun|  1|   cse|   1|
| sunil|  2|   cse|   1|
|   raj|  3|   cse|   1|
|naveen|  4|   cse|   1|
| venki|  5|   cse|   2|
|prasad|  6|   cse|   2|
| sudha|  7|   cse|   2|
|  ravi|  1|  mech|   1|
|  raju|  2|  mech|   1|
|  roja|  3|  mech|   1|
|  anil|  4|  mech|   2|
|  rani|  5|  mech|   2|
|anvith|  6|  mech|   2|
| madhu|  7|  mech|   2|
|  arun|  1|    it|   3|
| sunil|  2|    it|   3|
|   raj|  3|    it|   3|
|naveen|  4|    it|   3|
| venki|  5|    it|   4|
|prasad|  6|    it|   4|
+------+---+------+----+
only showing top 20 rows

------------------------------------------------------

scala> val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]

scala> jdbcDf.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+

------------------------------------------------------

scala> hiveDf.registerTempTable("hivetbl")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> jdbcDf.registerTempTable("jdbctbl")
warning: there was one deprecation warning; re-run with -deprecation for details

------------------------------------------------------

spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show

------------------------------------------------------

scala> spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show
+------+---+------+----+------+---+------+----+
|  name| id|course|year|  name| id|course|year|
+------+---+------+----+------+---+------+----+
|  anil|  4|   ece|   4|  anil|  1| spark|2016|
|  anil|  4|  mech|   2|  anil|  1| spark|2016|
|anvith|  6|   ece|   4|anvith|  5|hadoop|2015|
|anvith|  6|  mech|   2|anvith|  5|hadoop|2015|
|   raj|  3|    it|   3|   raj|  3| spark|2016|
|   raj|  3|   cse|   1|   raj|  3| spark|2016|
| sunil|  2|    it|   3| sunil|  4|hadoop|2015|
| sunil|  2|   cse|   1| sunil|  4|hadoop|2015|
+------+---+------+----+------+---+------+----+



------------------------------------------------------

val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()

casDf.show

------------------------------------------------------


scala> val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()
casDf: org.apache.spark.sql.DataFrame = [name: string, course: string ... 2 more fields]

scala> casDf.show
+------+------+---+----+
|  name|course| id|year|
+------+------+---+----+
|  anil| spark|  1|2016|
|   raj| spark|  3|2016|
|anvith|hadoop|  5|2015|
|   dev|hadoop|  6|2015|
| sunil|hadoop|  4|2015|
|venkat| spark|  2|2016|
|    kk|hadoop|  7|2015|
+------+------+---+----+

------------------------------------------------------
INSERT INTO kalyan.student(name, id, course, year) VALUES ('rajesh', 8, 'hadoop', 2017);

------------------------------------------------------

scala> casDf.show
+------+------+---+----+
|  name|course| id|year|
+------+------+---+----+
|  anil| spark|  1|2016|
|   raj| spark|  3|2016|
|anvith|hadoop|  5|2015|
|   dev|hadoop|  6|2015|
| sunil|hadoop|  4|2015|
|venkat| spark|  2|2016|
|    kk|hadoop|  7|2015|
|rajesh|hadoop|  8|2017|
+------+------+---+----+

------------------------------------------------------

cqlsh:kalyan> select year, count(*) from kalyan.student group by year;
SyntaxException: line 1:42 missing EOF at 'group' (...(*) from kalyan.student [group] by...)

------------------------------------------------------

casDf.registerTempTable("castbl")


spark.sql("select year, count(*) from castbl group by year").show

------------------------------------------------------

scala> casDf.registerTempTable("castbl")
warning: there was one deprecation warning; re-run with -deprecation for details


scala> spark.sql("select year, count(*) from castbl group by year").show 
+----+--------+
|year|count(1)|
+----+--------+
|2015|       4|
|2016|       3|
|2017|       1|
+----+--------+



------------------------------------------------------
spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
------------------------------------------------------

scala> spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
+------+------+---+----+------+---+------+----+
|  name|course| id|year|  name| id|course|year|
+------+------+---+----+------+---+------+----+
|  anil| spark|  1|2016|  anil|  1| spark|2016|
|anvith|hadoop|  5|2015|anvith|  5|hadoop|2015|
|   dev|hadoop|  6|2015|   dev|  6|hadoop|2015|
|   raj| spark|  3|2016|   raj|  3| spark|2016|
| sunil|hadoop|  4|2015| sunil|  4|hadoop|2015|
|venkat| spark|  2|2016|venkat|  2| spark|2016|
+------+------+---+----+------+---+------+----+


------------------------------------------------------
scala> casDf.toJSON.collect.foreach(println)
{"name":"anil","course":"spark","id":1,"year":2016}
{"name":"raj","course":"spark","id":3,"year":2016}
{"name":"anvith","course":"hadoop","id":5,"year":2015}
{"name":"dev","course":"hadoop","id":6,"year":2015}
{"name":"sunil","course":"hadoop","id":4,"year":2015}
{"name":"venkat","course":"spark","id":2,"year":2016}
{"name":"kk","course":"hadoop","id":7,"year":2015}
{"name":"rajesh","course":"hadoop","id":8,"year":2017}


scala> hiveDf.toJSON.collect.foreach(println)
{"name":"arun","id":1,"course":"cse","year":1}
{"name":"sunil","id":2,"course":"cse","year":1}
{"name":"raj","id":3,"course":"cse","year":1}
{"name":"naveen","id":4,"course":"cse","year":1}
{"name":"venki","id":5,"course":"cse","year":2}
{"name":"prasad","id":6,"course":"cse","year":2}
{"name":"sudha","id":7,"course":"cse","year":2}
{"name":"ravi","id":1,"course":"mech","year":1}
{"name":"raju","id":2,"course":"mech","year":1}
{"name":"roja","id":3,"course":"mech","year":1}
{"name":"anil","id":4,"course":"mech","year":2}
{"name":"rani","id":5,"course":"mech","year":2}
{"name":"anvith","id":6,"course":"mech","year":2}
{"name":"madhu","id":7,"course":"mech","year":2}


scala> jdbcDf.toJSON.collect.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}

------------------------------------------------------

Sunday, 19 February 2017

SPARK BASICS Practice on 05 Feb 2017

Spark:
--------------------
`Spark Context` => Entry point for Spark Operations

`RDD` => Resilient Distributed DataSets

`RDD` features:
----------------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer

`RDD` operations:
----------------------
1. Transformations

(input is RDD) => (ouput is RDD)


2. Actions

(input is RDD) => (output is Result)


Examples on RDD:
-----------------------
list <- {1,2,3,4}

Transformations:
----------------------
f(x) <- {x + 1}
f(list) <- {2,3,4,5}

f(x) <- {x * x}
f(list) <- {1,4,9,16}

Actions:
----------------------
sum(list) <- 10
min(list) <- 1
max(list) <- 4

Spark Libraries:
------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX


Command Line approach:
---------------------------
Scala => spark-shell
Python => pyspark
R   => sparkR

Spark context available as sc
SQL context available as sqlContext


How to Create RDD from Spark Context:
----------------------------------------
We can create RDD from Spark Context, using
1. From Collections (List, Seq, Set, ..)
2. From Data Sets (text, csv, tsv, ...)


Creating RDD from Collections:
-------------------------------
val list = List(1,2,3,4)
val rdd = sc.parallelize(list)

-------------------------------------------

scala> val list = List(1,2,3,4)
list: List[Int] = List(1, 2, 3, 4)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> rdd.getNumPartitions
res1: Int = 4

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:29

scala> rdd.getNumPartitions
res2: Int = 2

-------------------------------------------

scala> rdd.collect()
res3: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.collect().foreach(println)
1
2
3
4

scala> rdd.foreach(println)
3
4
1
2

scala> rdd.foreach(println)
1
2
3
4

-------------------------------------------

scala> rdd.collect
res10: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.map(x => x + 1)
res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:32

scala> rdd.map(x => x + 1).collect
res12: Array[Int] = Array(2, 3, 4, 5)

scala> rdd.map(x => x * x).collect
res13: Array[Int] = Array(1, 4, 9, 16)


-------------------------------------------

scala> rdd.collect
res14: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.sum
res15: Double = 10.0

scala> rdd.min
res16: Int = 1

scala> rdd.max
res17: Int = 4

scala> rdd.count
res18: Long = 4

-------------------------------------------

Creating RDD from DataSets:
-------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val rdd = sc.textFile(file)

-------------------------------------------

scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput

scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at <console>:29

scala> rdd.getNumPartitions
res19: Int = 2

scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:29

scala> rdd.getNumPartitions
res20: Int = 1

-------------------------------------------
Word Count Job in Spark
-------------------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/demooutput"

// creating a rdd from text file
val rdd = sc.textFile(file)

// get all the words from each line
val words = rdd.flatMap(line => line.split(" "))

// create tuple for each word
val tuple = words.map(word => (word, 1))

// final word & count from tuples
val wordcount = tuple.reduceByKey((a,b) => a + b)

// sort the data based on word
val sorted = wordcount.sortBy(word => word)

// save the data into output folder
sorted.saveAsTextFile(output)


-------------------------------------------

scala> rdd.collect
res21: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)

scala> rdd.flatMap(line => line.split(" ")).collect
res22: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).collect
res23: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).collect
res24: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word)
res25: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at sortBy at <console>:32

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word).collect
res26: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hadoop,1), (hyd,1), (learning,1), (to,1))


scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word).collect.foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)

-------------------------------------------


-------------------------------------------
Grep Job in Spark
-------------------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/demooutput-1"

// creating a rdd from text file
val rdd = sc.textFile(file)

// filter the data based on pattern
val filterData = rdd.filter(line => line.contains("am"))

// save the data into output folder
filterData.saveAsTextFile(output)

-------------------------------------------

val df1 = sqlContext.sql("SELECT * from kalyan.src");

val df2 = sqlContext.sql("SELECT * from student")

df1.show

df2.show

-------------------------------------------

df1.createOrReplaceTempView("tbl1")

df2.createOrReplaceTempView("tbl2")


sqlContext.sql("SELECT * from tbl1").show()

sqlContext.sql("SELECT * from tbl2").show()

sqlContext.sql("SELECT tbl1.*, tbl2.* from tbl1 join tbl2 on tbl1.key = tbl2.id").show()


-------------------------------------------

Sunday, 27 November 2016

SPARK BASICS Practice on 27 Nov 2016



---------------------------------------------------------------
Spark Basics
---------------------------------------------------------------

1.SparkContext(sc) is the main entry point for spark operaions

2. From spark context, we will create a RDD

3. Create RDD in 2 ways
  -> from collections (List, Seq, Set ,....)
  -> from datasets (csv, tsv, text, json, hive, hdfs, cassandra....)

---------------------------------------------------------------
Start the spark shell using 3 approaches:

1. scala command => spark-shell

2. python command => pyspark

3. R command => sparkR


Spark-1.6.x is compatable with Scala-2.10 & Scala-2.11

Spark-2.0.x is compatable with Scala-2.11

Spark context available as sc.

SQL context available as sqlContext.

------------------------------------------------------------------
RDD => Resilient Distributed DataSets

RDD features:
--------------
1. Immutable
2. Lazy Evaluated (bottom to top approach)
3. Cacheable
4. Type Infer


RDD Operations:
--------------------
1. Transformations

old rdd ====> new rdd

2. Actions

rdd ====> result


---------------------------------------------
Examples in RDD:
--------------------
list <-- {1, 2, 3, 4}

Ex1: Transformations
-------------------------
f(x) => x + 1

f(list) <-- {2, 3, 4, 5}


f(x) => x * x

f(list) <-- {1, 4, 9, 16}



Ex2: Actions
-------------------------
min(list) <--  1

max(list) <--  4

sum(list) <--  10

avg(list) <--  2.5


---------------------------------------------

1.SparkContext(sc) is the main entry point for spark operaions

2. From spark context, we will create a RDD

3. Create RDD in 2 ways
  -> from collections (List, Seq, Set ,....)
  -> from datasets (csv, tsv, text, json, hive, hdfs, cassandra....)

---------------------------------------------
Start the spark shell using 3 approaches:

1. scala command => spark-shell

2. python command => pyspark

3. R command => sparkR


Spark-1.6.x is compatable with Scala-2.10 & Scala-2.11

Spark-2.0.x is compatable with Scala-2.11

Spark context available as sc.

SQL context available as sqlContext.

---------------------------------------------

val list = List(1,2,3,4)

val rdd = sc.parallelize(list)

---------------------------------------------
scala> val list = List(1,2,3,4)
list: List[Int] = List(1, 2, 3, 4)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> rdd.getNumPartitions
res0: Int = 4

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:29

scala> rdd.getNumPartitions
res1: Int = 2


---------------------------------------------

scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.map(x => x + 1).collect
res4: Array[Int] = Array(2, 3, 4, 5)

scala> rdd.map(x => x + 1)
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:32

scala> rdd.map(x => x + 1).collect
res6: Array[Int] = Array(2, 3, 4, 5)

scala> rdd.map(x => x * x).collect
res7: Array[Int] = Array(1, 4, 9, 16)


scala> rdd.min
res8: Int = 1

scala> rdd.max
res9: Int = 4

scala> rdd.sum
res10: Double = 10.0

---------------------------------------------
1. create a rdd with 1 to 10 numbers

val rdd = sc.parallelize(1 to 10)

2. add number 1 to each element

val addRdd = rdd.map(x => x + 1)

3. filter even number in above data

val filterRdd = addRdd.filter(x => x % 2 == 1)

4. sum of the all numbers in above data

val sum = filterRdd.sum

5. print all the numbers in above data

filterRdd.foreach(x => println(x))
filterRdd.foreach(println)


6. save this data into local file system / hdfs

val output = "file:///home/orienit/work/output/data"
val output = "hdfs://localhost:8020/output/data"

filterRdd.saveAsTextFile(output)


7. repartition the data

val repartitionRdd = filterRdd.repartition(1)


8. save this data into local file system / hdfs

val output = "file:///home/orienit/work/output/data1"
val output = "hdfs://localhost:8020/output/data1"

repartitionRdd.saveAsTextFile(output)

---------------------------------------------

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27

scala> rdd.collect
res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


scala> val addRdd = rdd.map(x => x + 1)
addRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:29

scala> addRdd.collect
res12: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)


scala> val filterRdd = addRdd.filter(x => x % 2 == 1)
filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at filter at <console>:31

scala> filterRdd.collect
res13: Array[Int] = Array(3, 5, 7, 9, 11)


scala> val sum = filterRdd.sum
sum: Double = 35.0


scala> filterRdd.foreach(x => println(x))
3
5
9
11
7

scala> filterRdd.foreach(println)
7
3
5
9
11



scala> val output = "file:///home/orienit/work/output/data"
output: String = file:///home/orienit/work/output/data

scala> filterRdd.saveAsTextFile(output)


scala> val repartitionRdd = filterRdd.repartition(1)
repartitionRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at repartition at <console>:33

scala> val output = "file:///home/orienit/work/output/data1"
output: String = file:///home/orienit/work/output/data1

scala> repartitionRdd.saveAsTextFile(output)

scala> val output = "hdfs://localhost:8020/output/data1"
output: String = hdfs://localhost:8020/output/data1

scala> repartitionRdd.saveAsTextFile(output)

---------------------------------------------

Creating a RDD from DataSets:
--------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val rdd = sc.textFile(input)


scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput

scala> val rdd = sc.textFile(input)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at textFile at <console>:29

scala> rdd.getNumPartitions
res20: Int = 2

scala> val rdd = sc.textFile(input, 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[23] at textFile at <console>:29

scala> rdd.getNumPartitions
res21: Int = 1


scala> rdd.collect
res22: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)

scala> rdd.collect.foreach(println)
I am going
to hyd
I am learning
hadoop course

---------------------------------------------
WordCount in Spark using Scala:
-------------------------------------
// input path
val input = "file:///home/orienit/work/input/demoinput"

// creating a rdd from input
val fileRdd = sc.textFile(input, 1)

// read the file line by line
// split the line into words
val words = fileRdd.flatMap(line => line.split(" "))

// assign count(1) to each word
val counts = words.map(word => (word, 1))


// sum the list of values
val wordcount = counts.reduceByKey((a,b) => a + b)

// save the output into local file system / hdfs
val output = "file:///home/orienit/work/output/wordcount"
val output = "hdfs://localhost:8020/output/wordcount"
wordcount.saveAsTextFile(output)





scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput

scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at textFile at <console>:29

scala> val words = fileRdd.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at flatMap at <console>:31

scala> val counts = words.map(word => (word, 1))
counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:33

scala> val wordcount = counts.reduceByKey((a,b) => a + b)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:35

scala> val output = "file:///home/orienit/work/output/wordcount"
output: String = file:///home/orienit/work/output/wordcount

scala> wordcount.saveAsTextFile(output)

scala> val output = "hdfs://localhost:8020/output/wordcount"
output: String = hdfs://localhost:8020/output/wordcount

scala> wordcount.saveAsTextFile(output)


------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/wordcount"

val fileRdd = sc.textFile(input, 1)
val words = fileRdd.flatMap(line => line.split(" "))
val counts = words.map(word => (word, 1))
val wordcount = counts.reduceByKey((a,b) => a + b)
wordcount.saveAsTextFile(output)

------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/wordcount1"

val fileRdd = sc.textFile(input, 1)
val wordcount = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)
wordcount.saveAsTextFile(output)





Grep in Spark using Scala:
-------------------------------------

val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/grepjob"

val fileRdd = sc.textFile(input, 1)
val filterRdd = fileRdd.filter(line => line.contains("am"))
filterRdd.saveAsTextFile(output)



WordCount in Spark using Python:
-------------------------------------
input = "file:///home/orienit/work/input/demoinput"
output = "file:///home/orienit/work/output/wordcount-py"

fileRdd = sc.textFile(input, 1)
words = fileRdd.flatMap(lambda line : line.split(" "))
counts = words.map(lambda word : (word, 1))
wordcount = counts.reduceByKey(lambda a,b : a + b)
wordcount.saveAsTextFile(output)


>>> input = "file:///home/orienit/work/input/demoinput"
>>> output = "file:///home/orienit/work/output/wordcount-py"
>>> fileRdd = sc.textFile(input, 1)
16/11/27 15:08:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/11/27 15:08:17 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/11/27 15:08:17 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:42322 (size: 13.9 KB, free: 3.4 GB)
16/11/27 15:08:17 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
>>> words = fileRdd.flatMap(lambda line : line.split(" "))
>>> counts = words.map(lambda word : (word, 1))
>>> wordcount = counts.reduceByKey(lambda a,b : a + b)
>>> wordcount.saveAsTextFile(output)


---------------------------------------------
Spark SQL:
---------------------
DataFrame <=> Table


orienit@kalyan:~$ cat /home/orienit/spark/input/student.json

{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}


val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")


scala> val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]

scala> df.show()
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
| spark|  1|  anil|2016|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  3|   raj|2016|
|hadoop|  4| sunil|2015|
| spark|  2|venkat|2016|
+------+---+------+----+


scala> val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


---------------------------------------------

DataFrame supports 2 operations
1. DSL - Domain Specific Language
2. SQL - Structure Query Language

---------------------------------------------

DSL Examples:
------------------

scala> df.show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


scala> df.show(3)
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+
only showing top 3 rows


scala> df.limit(3)
res32: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.limit(3).show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


scala> df.select("name", "id")
res34: org.apache.spark.sql.DataFrame = [name: string, id: int]

scala> 

scala> df.select("name", "id").show()
+------+---+
|  name| id|
+------+---+
|  anil|  1|
|anvith|  5|
|   dev|  6|
|   raj|  3|
| sunil|  4|
|venkat|  2|
+------+---+


scala> df.select("name", "id").limit(2).show()
+------+---+
|  name| id|
+------+---+
|  anil|  1|
|anvith|  5|
+------+---+


scala> df.filter("id > 4").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


scala> df.where("id > 4").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


---------------------------------------------
SQL Examples:
---------------------
Register DataFrame as Temp Table

scala> df.registerTempTable("student")


scala> sqlContext.sql("select * from student")
res42: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> sqlContext.sql("select * from student").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


scala> sqlContext.sql("select name, id from student").show()
+------+---+
|  name| id|
+------+---+
|  anil|  1|
|anvith|  5|
|   dev|  6|
|   raj|  3|
| sunil|  4|
|venkat|  2|
+------+---+


scala> sqlContext.sql("select * from student where id > 4").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


scala> sqlContext.sql("select year, count(*) from student group by year").show()
+----+---+
|year|_c1|
+----+---+
|2015|  3|
|2016|  3|
+----+---+


scala> sqlContext.sql("select year, count(*) as cnt from student group by year").show()
+----+---+
|year|cnt|
+----+---+
|2015|  3|
|2016|  3|
+----+---+




---------------------------------------------

case class Contact(cid: Int, name: String, loc: String, pincode:Int)
case class Orders(oid: Int, cid: Int, status: String)

val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()

val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()

---------------------------------------------

scala> case class Contact(cid: Int, name: String, loc: String, pincode:Int)
defined class Contact

scala> case class Orders(oid: Int, cid: Int, status: String)
defined class Orders

scala> 

scala> val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
contact: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[113] at map at <console>:27

scala> val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()
cdf: org.apache.spark.sql.DataFrame = [cid: int, name: string, loc: string, pincode: int]

scala> 

scala> val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
orders: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[118] at map at <console>:27

scala> val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()
odf: org.apache.spark.sql.DataFrame = [oid: int, cid: int, status: string]


---------------------------------------------

scala> cdf.show()
+---+------+----+-------+
|cid|  name| loc|pincode|
+---+------+----+-------+
|  1|kalyan| hyd| 500072|
|  2|venkat| hyd| 500073|
|  3|  anil| hyd| 500071|
|  4|   raj| hyd| 500075|
|  5|  arun| hyd| 500074|
|  6|  vani|bang| 600072|
|  7| vamsi|bang| 600073|
|  8|prasad|bang| 600076|
|  9|anvith|bang| 600075|
| 10| swamy|bang| 600071|
+---+------+----+-------+


scala> odf.show()
+---+---+-------+
|oid|cid| status|
+---+---+-------+
|111|  1|success|
|112|  1|failure|
|113|  2|success|
|114|  3|success|
|115|  2|failure|
|116|  3|failure|
|117|  2|success|
|118|  5|success|
|119|  6|failure|
|120|  2|success|
|121|  3|failure|
|122|  7|success|
|123|  3|failure|
|124|  2|success|
|125|  1|failure|
|126|  5|success|
+---+---+-------+



---------------------------------------------

scala> cdf.registerTempTable("customers")

scala> odf.registerTempTable("orders")

scala> sqlContext.sql("select customers.*, orders.* from customers join orders on customers.cid == orders.cid").show()
+---+------+----+-------+---+---+-------+
|cid|  name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
|  1|kalyan| hyd| 500072|111|  1|success|
|  1|kalyan| hyd| 500072|112|  1|failure|
|  1|kalyan| hyd| 500072|125|  1|failure|
|  2|venkat| hyd| 500073|113|  2|success|
|  2|venkat| hyd| 500073|115|  2|failure|
|  2|venkat| hyd| 500073|117|  2|success|
|  2|venkat| hyd| 500073|120|  2|success|
|  2|venkat| hyd| 500073|124|  2|success|
|  3|  anil| hyd| 500071|114|  3|success|
|  3|  anil| hyd| 500071|116|  3|failure|
|  3|  anil| hyd| 500071|121|  3|failure|
|  3|  anil| hyd| 500071|123|  3|failure|
|  5|  arun| hyd| 500074|118|  5|success|
|  5|  arun| hyd| 500074|126|  5|success|
|  6|  vani|bang| 600072|119|  6|failure|
|  7| vamsi|bang| 600073|122|  7|success|
+---+------+----+-------+---+---+-------+


---------------------------------------------

scala> val jdf = sqlContext.sql("select customers.cid, customers.name, customers.loc, customers.pincode, orders.oid, orders.status from customers join orders on customers.cid == orders.cid")
jdf: org.apache.spark.sql.DataFrame = [cid: int, name: string, loc: string, pincode: int, oid: int, status: string]


val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val url = "jdbc:mysql://localhost:3306/kalyan"

jdf.write.jdbc(url, "jointable", prop)


---------------------------------------------

















Related Posts Plugin for WordPress, Blogger...