`Spark` is meant for `In-Memory Distributed Computing`
`Spark` provides 4 libraries:
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX
`Spark Context` is the Entry point for any `Spark Operations`
`Resilient Distributed DataSets` => RDD
RDD features:
-------------------
1. immutability
2. lazy evaluation
3. cacheable
4. type infer
RDD operations:
-----------------
1. Transformations
<old rdd> ----> <new rdd>
2. Actions
<rdd> ---> <result>
Examples on RDD:
-------------------------
list <- {1,2,3,4,5}
1. Transformations:
---------------------
Ex1:
-----
f(x) <- {x + 1}
f(list) <- {2,3,4,5,6}
Ex2:
-----
f(x) <- {x * x}
f(list) <- {1,4,9,16,25}
2. Actions:
---------------------
sum(list) -> 15
min(list) -> 1
max(list) -> 5
How to Start the Spark:
-----------------------------
scala => spark-shell
python => pyspark
R => sparkR
Spark-1.x:
--------------------------------------
Spark context available as 'sc'
Spark Sql Context available as 'sqlContext'
Spark-2.x:
--------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'
How to Create a RDD:
---------------------------------------
We can create RDD 2 ways
1. from collections (List, Seq, Set, ....)
2. from data sets (text, csv, tsv, json, ...)
1. from collections:
---------------------------------------
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)
val rdd = sc.parallelize(list, 2)
Syntax:
-----------------------
val rdd = sc.parallelize(<collection object>, <no.of partitions>)
2. from datasets:
---------------------------------------
val file = "file:///home/orienit/work/input/demoinput"
val rdd = sc.textFile(file)
val rdd = sc.textFile(file, 1)
Syntax:
-----------------------
val rdd = sc.textFile(<file path>, <no.of partitions>)
------------------------------------------------------
scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.getNumPartitions
res0: Int = 4
scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> rdd.getNumPartitions
res1: Int = 2
------------------------------------------------------
scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput
scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[3] at textFile at <console>:26
scala> rdd.getNumPartitions
res2: Int = 2
scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[5] at textFile at <console>:26
scala> rdd.getNumPartitions
res3: Int = 1
------------------------------------------------------
Examples on RDD
------------------------------------------------------
val list = List(1,2,3,4,5)
val rdd1 = sc.parallelize(list, 2)
------------------------------------------------------
val rdd2 = rdd1.map(x => x + 1)
val rdd3 = rdd1.map(x => x +* x)
------------------------------------------------------
scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala> val rdd1 = sc.parallelize(list, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26
------------------------------------------------------
scala> val rdd2 = rdd1.map(x => x + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:28
scala> rdd1.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5)
scala> rdd2.collect
res5: Array[Int] = Array(2, 3, 4, 5, 6)
scala> val rdd3 = rdd1.map(x => x * x)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:28
scala> rdd3.collect
res6: Array[Int] = Array(1, 4, 9, 16, 25)
------------------------------------------------------
rdd1.min
rdd1.max
rdd1.sum
------------------------------------------------------
scala> rdd1.min
res7: Int = 1
scala> rdd1.max
res8: Int = 5
scala> rdd1.sum
res9: Double = 15.0
scala> rdd1.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5)
scala> rdd1.count
res11: Long = 5
------------------------------------------------------
Word Count in Spark using Scala
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-op"
val fileRdd = sc.textFile(input, 1)
val wordsRdd = fileRdd.flatMap(line => line.split(" "))
val tuplesRdd = wordsRdd.map(word => (word, 1))
val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)
wordCountRdd.saveAsTextFile(output)
------------------------------------------------------
Optimize the Code :
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-op"
val fileRdd = sc.textFile(input, 1)
val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)
wordCountRdd.saveAsTextFile(output)
------------------------------------------------------
scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput
scala> val output = "file:///home/orienit/work/output/spark-op"
output: String = file:///home/orienit/work/output/spark-op
scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[11] at textFile at <console>:26
scala> fileRdd.collect
res12: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)
scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at <console>:28
scala> wordsRdd.collect
res13: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)
scala> val tuplesRdd = wordsRdd.map(word => (word, 1))
tuplesRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:30
scala> tuplesRdd.collect
res14: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))
scala> val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:32
scala> wordCountRdd.collect
res15: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))
scala> wordCountRdd.saveAsTextFile(output)
------------------------------------------------------
Grep Job in Spark using Scala:
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-grep-op"
val fileRdd = sc.textFile(input, 1)
val grepRdd = fileRdd.filter(line => line.contains("am"))
grepRdd.saveAsTextFile(output)
------------------------------------------------------
Sed Job in Spark using Scala:
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-sed-op"
val fileRdd = sc.textFile(input, 1)
val sedRdd = fileRdd.map(line => line.replaceAll("am", "xyz"))
sedRdd.saveAsTextFile(output)
------------------------------------------------------
Spark SQL:
------------------------------------------------------
hive> select * from kalyan.student;
scala> spark.sql("select * from kalyan.student").show
------------------------------------------------------
hive> select year, count(*) from kalyan.student group by year;
scala> spark.sql("select year, count(*) from kalyan.student group by year").show
------------------------------------------------------
Data Frames in Spark
------------------------------------------------------
val hiveDf = spark.sql("select * from kalyan.student")
hiveDf.show
hiveDf.registerTempTable("hivetbl")
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDf.show
jdbcDf.registerTempTable("jdbctbl")
------------------------------------------------------
scala> val hiveDf = spark.sql("select * from kalyan.student")
hiveDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]
scala> hiveDf.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
| arun| 1| it| 3|
| sunil| 2| it| 3|
| raj| 3| it| 3|
|naveen| 4| it| 3|
| venki| 5| it| 4|
|prasad| 6| it| 4|
+------+---+------+----+
only showing top 20 rows
------------------------------------------------------
scala> val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]
scala> jdbcDf.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
------------------------------------------------------
scala> hiveDf.registerTempTable("hivetbl")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> jdbcDf.registerTempTable("jdbctbl")
warning: there was one deprecation warning; re-run with -deprecation for details
------------------------------------------------------
spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show
------------------------------------------------------
scala> spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show
+------+---+------+----+------+---+------+----+
| name| id|course|year| name| id|course|year|
+------+---+------+----+------+---+------+----+
| anil| 4| ece| 4| anil| 1| spark|2016|
| anil| 4| mech| 2| anil| 1| spark|2016|
|anvith| 6| ece| 4|anvith| 5|hadoop|2015|
|anvith| 6| mech| 2|anvith| 5|hadoop|2015|
| raj| 3| it| 3| raj| 3| spark|2016|
| raj| 3| cse| 1| raj| 3| spark|2016|
| sunil| 2| it| 3| sunil| 4|hadoop|2015|
| sunil| 2| cse| 1| sunil| 4|hadoop|2015|
+------+---+------+----+------+---+------+----+
------------------------------------------------------
val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()
casDf.show
------------------------------------------------------
scala> val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()
casDf: org.apache.spark.sql.DataFrame = [name: string, course: string ... 2 more fields]
scala> casDf.show
+------+------+---+----+
| name|course| id|year|
+------+------+---+----+
| anil| spark| 1|2016|
| raj| spark| 3|2016|
|anvith|hadoop| 5|2015|
| dev|hadoop| 6|2015|
| sunil|hadoop| 4|2015|
|venkat| spark| 2|2016|
| kk|hadoop| 7|2015|
+------+------+---+----+
------------------------------------------------------
INSERT INTO kalyan.student(name, id, course, year) VALUES ('rajesh', 8, 'hadoop', 2017);
------------------------------------------------------
scala> casDf.show
+------+------+---+----+
| name|course| id|year|
+------+------+---+----+
| anil| spark| 1|2016|
| raj| spark| 3|2016|
|anvith|hadoop| 5|2015|
| dev|hadoop| 6|2015|
| sunil|hadoop| 4|2015|
|venkat| spark| 2|2016|
| kk|hadoop| 7|2015|
|rajesh|hadoop| 8|2017|
+------+------+---+----+
------------------------------------------------------
cqlsh:kalyan> select year, count(*) from kalyan.student group by year;
SyntaxException: line 1:42 missing EOF at 'group' (...(*) from kalyan.student [group] by...)
------------------------------------------------------
casDf.registerTempTable("castbl")
spark.sql("select year, count(*) from castbl group by year").show
------------------------------------------------------
scala> casDf.registerTempTable("castbl")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> spark.sql("select year, count(*) from castbl group by year").show
+----+--------+
|year|count(1)|
+----+--------+
|2015| 4|
|2016| 3|
|2017| 1|
+----+--------+
------------------------------------------------------
spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
------------------------------------------------------
scala> spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
+------+------+---+----+------+---+------+----+
| name|course| id|year| name| id|course|year|
+------+------+---+----+------+---+------+----+
| anil| spark| 1|2016| anil| 1| spark|2016|
|anvith|hadoop| 5|2015|anvith| 5|hadoop|2015|
| dev|hadoop| 6|2015| dev| 6|hadoop|2015|
| raj| spark| 3|2016| raj| 3| spark|2016|
| sunil|hadoop| 4|2015| sunil| 4|hadoop|2015|
|venkat| spark| 2|2016|venkat| 2| spark|2016|
+------+------+---+----+------+---+------+----+
------------------------------------------------------
scala> casDf.toJSON.collect.foreach(println)
{"name":"anil","course":"spark","id":1,"year":2016}
{"name":"raj","course":"spark","id":3,"year":2016}
{"name":"anvith","course":"hadoop","id":5,"year":2015}
{"name":"dev","course":"hadoop","id":6,"year":2015}
{"name":"sunil","course":"hadoop","id":4,"year":2015}
{"name":"venkat","course":"spark","id":2,"year":2016}
{"name":"kk","course":"hadoop","id":7,"year":2015}
{"name":"rajesh","course":"hadoop","id":8,"year":2017}
scala> hiveDf.toJSON.collect.foreach(println)
{"name":"arun","id":1,"course":"cse","year":1}
{"name":"sunil","id":2,"course":"cse","year":1}
{"name":"raj","id":3,"course":"cse","year":1}
{"name":"naveen","id":4,"course":"cse","year":1}
{"name":"venki","id":5,"course":"cse","year":2}
{"name":"prasad","id":6,"course":"cse","year":2}
{"name":"sudha","id":7,"course":"cse","year":2}
{"name":"ravi","id":1,"course":"mech","year":1}
{"name":"raju","id":2,"course":"mech","year":1}
{"name":"roja","id":3,"course":"mech","year":1}
{"name":"anil","id":4,"course":"mech","year":2}
{"name":"rani","id":5,"course":"mech","year":2}
{"name":"anvith","id":6,"course":"mech","year":2}
{"name":"madhu","id":7,"course":"mech","year":2}
scala> jdbcDf.toJSON.collect.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}
------------------------------------------------------
`Spark` provides 4 libraries:
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX
`Spark Context` is the Entry point for any `Spark Operations`
`Resilient Distributed DataSets` => RDD
RDD features:
-------------------
1. immutability
2. lazy evaluation
3. cacheable
4. type infer
RDD operations:
-----------------
1. Transformations
<old rdd> ----> <new rdd>
2. Actions
<rdd> ---> <result>
Examples on RDD:
-------------------------
list <- {1,2,3,4,5}
1. Transformations:
---------------------
Ex1:
-----
f(x) <- {x + 1}
f(list) <- {2,3,4,5,6}
Ex2:
-----
f(x) <- {x * x}
f(list) <- {1,4,9,16,25}
2. Actions:
---------------------
sum(list) -> 15
min(list) -> 1
max(list) -> 5
How to Start the Spark:
-----------------------------
scala => spark-shell
python => pyspark
R => sparkR
Spark-1.x:
--------------------------------------
Spark context available as 'sc'
Spark Sql Context available as 'sqlContext'
Spark-2.x:
--------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'
How to Create a RDD:
---------------------------------------
We can create RDD 2 ways
1. from collections (List, Seq, Set, ....)
2. from data sets (text, csv, tsv, json, ...)
1. from collections:
---------------------------------------
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)
val rdd = sc.parallelize(list, 2)
Syntax:
-----------------------
val rdd = sc.parallelize(<collection object>, <no.of partitions>)
2. from datasets:
---------------------------------------
val file = "file:///home/orienit/work/input/demoinput"
val rdd = sc.textFile(file)
val rdd = sc.textFile(file, 1)
Syntax:
-----------------------
val rdd = sc.textFile(<file path>, <no.of partitions>)
------------------------------------------------------
scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.getNumPartitions
res0: Int = 4
scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> rdd.getNumPartitions
res1: Int = 2
------------------------------------------------------
scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput
scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[3] at textFile at <console>:26
scala> rdd.getNumPartitions
res2: Int = 2
scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[5] at textFile at <console>:26
scala> rdd.getNumPartitions
res3: Int = 1
------------------------------------------------------
Examples on RDD
------------------------------------------------------
val list = List(1,2,3,4,5)
val rdd1 = sc.parallelize(list, 2)
------------------------------------------------------
val rdd2 = rdd1.map(x => x + 1)
val rdd3 = rdd1.map(x => x +* x)
------------------------------------------------------
scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala> val rdd1 = sc.parallelize(list, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26
------------------------------------------------------
scala> val rdd2 = rdd1.map(x => x + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:28
scala> rdd1.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5)
scala> rdd2.collect
res5: Array[Int] = Array(2, 3, 4, 5, 6)
scala> val rdd3 = rdd1.map(x => x * x)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:28
scala> rdd3.collect
res6: Array[Int] = Array(1, 4, 9, 16, 25)
------------------------------------------------------
rdd1.min
rdd1.max
rdd1.sum
------------------------------------------------------
scala> rdd1.min
res7: Int = 1
scala> rdd1.max
res8: Int = 5
scala> rdd1.sum
res9: Double = 15.0
scala> rdd1.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5)
scala> rdd1.count
res11: Long = 5
------------------------------------------------------
Word Count in Spark using Scala
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-op"
val fileRdd = sc.textFile(input, 1)
val wordsRdd = fileRdd.flatMap(line => line.split(" "))
val tuplesRdd = wordsRdd.map(word => (word, 1))
val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)
wordCountRdd.saveAsTextFile(output)
------------------------------------------------------
Optimize the Code :
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-op"
val fileRdd = sc.textFile(input, 1)
val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)
wordCountRdd.saveAsTextFile(output)
------------------------------------------------------
scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput
scala> val output = "file:///home/orienit/work/output/spark-op"
output: String = file:///home/orienit/work/output/spark-op
scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[11] at textFile at <console>:26
scala> fileRdd.collect
res12: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)
scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at <console>:28
scala> wordsRdd.collect
res13: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)
scala> val tuplesRdd = wordsRdd.map(word => (word, 1))
tuplesRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:30
scala> tuplesRdd.collect
res14: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))
scala> val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:32
scala> wordCountRdd.collect
res15: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))
scala> wordCountRdd.saveAsTextFile(output)
------------------------------------------------------
Grep Job in Spark using Scala:
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-grep-op"
val fileRdd = sc.textFile(input, 1)
val grepRdd = fileRdd.filter(line => line.contains("am"))
grepRdd.saveAsTextFile(output)
------------------------------------------------------
Sed Job in Spark using Scala:
------------------------------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/spark-sed-op"
val fileRdd = sc.textFile(input, 1)
val sedRdd = fileRdd.map(line => line.replaceAll("am", "xyz"))
sedRdd.saveAsTextFile(output)
------------------------------------------------------
Spark SQL:
------------------------------------------------------
hive> select * from kalyan.student;
scala> spark.sql("select * from kalyan.student").show
------------------------------------------------------
hive> select year, count(*) from kalyan.student group by year;
scala> spark.sql("select year, count(*) from kalyan.student group by year").show
------------------------------------------------------
Data Frames in Spark
------------------------------------------------------
val hiveDf = spark.sql("select * from kalyan.student")
hiveDf.show
hiveDf.registerTempTable("hivetbl")
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDf.show
jdbcDf.registerTempTable("jdbctbl")
------------------------------------------------------
scala> val hiveDf = spark.sql("select * from kalyan.student")
hiveDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]
scala> hiveDf.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| arun| 1| cse| 1|
| sunil| 2| cse| 1|
| raj| 3| cse| 1|
|naveen| 4| cse| 1|
| venki| 5| cse| 2|
|prasad| 6| cse| 2|
| sudha| 7| cse| 2|
| ravi| 1| mech| 1|
| raju| 2| mech| 1|
| roja| 3| mech| 1|
| anil| 4| mech| 2|
| rani| 5| mech| 2|
|anvith| 6| mech| 2|
| madhu| 7| mech| 2|
| arun| 1| it| 3|
| sunil| 2| it| 3|
| raj| 3| it| 3|
|naveen| 4| it| 3|
| venki| 5| it| 4|
|prasad| 6| it| 4|
+------+---+------+----+
only showing top 20 rows
------------------------------------------------------
scala> val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]
scala> jdbcDf.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
------------------------------------------------------
scala> hiveDf.registerTempTable("hivetbl")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> jdbcDf.registerTempTable("jdbctbl")
warning: there was one deprecation warning; re-run with -deprecation for details
------------------------------------------------------
spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show
------------------------------------------------------
scala> spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show
+------+---+------+----+------+---+------+----+
| name| id|course|year| name| id|course|year|
+------+---+------+----+------+---+------+----+
| anil| 4| ece| 4| anil| 1| spark|2016|
| anil| 4| mech| 2| anil| 1| spark|2016|
|anvith| 6| ece| 4|anvith| 5|hadoop|2015|
|anvith| 6| mech| 2|anvith| 5|hadoop|2015|
| raj| 3| it| 3| raj| 3| spark|2016|
| raj| 3| cse| 1| raj| 3| spark|2016|
| sunil| 2| it| 3| sunil| 4|hadoop|2015|
| sunil| 2| cse| 1| sunil| 4|hadoop|2015|
+------+---+------+----+------+---+------+----+
------------------------------------------------------
val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()
casDf.show
------------------------------------------------------
scala> val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()
casDf: org.apache.spark.sql.DataFrame = [name: string, course: string ... 2 more fields]
scala> casDf.show
+------+------+---+----+
| name|course| id|year|
+------+------+---+----+
| anil| spark| 1|2016|
| raj| spark| 3|2016|
|anvith|hadoop| 5|2015|
| dev|hadoop| 6|2015|
| sunil|hadoop| 4|2015|
|venkat| spark| 2|2016|
| kk|hadoop| 7|2015|
+------+------+---+----+
------------------------------------------------------
INSERT INTO kalyan.student(name, id, course, year) VALUES ('rajesh', 8, 'hadoop', 2017);
------------------------------------------------------
scala> casDf.show
+------+------+---+----+
| name|course| id|year|
+------+------+---+----+
| anil| spark| 1|2016|
| raj| spark| 3|2016|
|anvith|hadoop| 5|2015|
| dev|hadoop| 6|2015|
| sunil|hadoop| 4|2015|
|venkat| spark| 2|2016|
| kk|hadoop| 7|2015|
|rajesh|hadoop| 8|2017|
+------+------+---+----+
------------------------------------------------------
cqlsh:kalyan> select year, count(*) from kalyan.student group by year;
SyntaxException: line 1:42 missing EOF at 'group' (...(*) from kalyan.student [group] by...)
------------------------------------------------------
casDf.registerTempTable("castbl")
spark.sql("select year, count(*) from castbl group by year").show
------------------------------------------------------
scala> casDf.registerTempTable("castbl")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> spark.sql("select year, count(*) from castbl group by year").show
+----+--------+
|year|count(1)|
+----+--------+
|2015| 4|
|2016| 3|
|2017| 1|
+----+--------+
------------------------------------------------------
spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
------------------------------------------------------
scala> spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
+------+------+---+----+------+---+------+----+
| name|course| id|year| name| id|course|year|
+------+------+---+----+------+---+------+----+
| anil| spark| 1|2016| anil| 1| spark|2016|
|anvith|hadoop| 5|2015|anvith| 5|hadoop|2015|
| dev|hadoop| 6|2015| dev| 6|hadoop|2015|
| raj| spark| 3|2016| raj| 3| spark|2016|
| sunil|hadoop| 4|2015| sunil| 4|hadoop|2015|
|venkat| spark| 2|2016|venkat| 2| spark|2016|
+------+------+---+----+------+---+------+----+
------------------------------------------------------
scala> casDf.toJSON.collect.foreach(println)
{"name":"anil","course":"spark","id":1,"year":2016}
{"name":"raj","course":"spark","id":3,"year":2016}
{"name":"anvith","course":"hadoop","id":5,"year":2015}
{"name":"dev","course":"hadoop","id":6,"year":2015}
{"name":"sunil","course":"hadoop","id":4,"year":2015}
{"name":"venkat","course":"spark","id":2,"year":2016}
{"name":"kk","course":"hadoop","id":7,"year":2015}
{"name":"rajesh","course":"hadoop","id":8,"year":2017}
scala> hiveDf.toJSON.collect.foreach(println)
{"name":"arun","id":1,"course":"cse","year":1}
{"name":"sunil","id":2,"course":"cse","year":1}
{"name":"raj","id":3,"course":"cse","year":1}
{"name":"naveen","id":4,"course":"cse","year":1}
{"name":"venki","id":5,"course":"cse","year":2}
{"name":"prasad","id":6,"course":"cse","year":2}
{"name":"sudha","id":7,"course":"cse","year":2}
{"name":"ravi","id":1,"course":"mech","year":1}
{"name":"raju","id":2,"course":"mech","year":1}
{"name":"roja","id":3,"course":"mech","year":1}
{"name":"anil","id":4,"course":"mech","year":2}
{"name":"rani","id":5,"course":"mech","year":2}
{"name":"anvith","id":6,"course":"mech","year":2}
{"name":"madhu","id":7,"course":"mech","year":2}
scala> jdbcDf.toJSON.collect.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}
------------------------------------------------------