Spark Day 1 Practice:
==============================================
SparkContext (sc) => Main point of contact in spark
RDD => Resilient Distributed Dataset
RDD Features:
------------------
-> Immutable
-> Lazy evaluated
-> Cacheable
-> Type inferred
list <- (1,2,3,4)
RDD operations:
--------------------
Transformation:
---------------
old rdd => new rdd
f(x) -> { x + 1 }
f(list) <- (2,3,4,5)
f(x) -> { x * x }
f(list) <- (1,4,9,16)
Action:
---------------
rdd => result
min(list) -> 1
max(list) -> 4
sum(list) -> 10
avg(list) -> 2.5
Spark Supports Scala, Java, Python and R
-------------------------------------------
Scala + Spark => spark-shell
Python + Spark => pyspark
R + Spark => SparkR
Spark with Scala examples:
--------------------------------------------
Create RDD in 2 ways:
------------------------
1. collection ( list / set / seq / ..)
2. data sets (text / csv / tsv / json / ...)
scala> val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
Transformations on RDD:
----------------------------
scala> rdd1.foreach(println)
3
4
2
1
scala> rdd1.map(x => x + 1).foreach(println)
5
4
2
3
scala> rdd1.map(x => x + 1).foreach(println)
2
4
5
3
How to get the number of partitions:
---------------------------------------
scala> rdd1.getNumPartitions
res7: Int = 4
scala> rdd1.partitions.length
res8: Int = 4
Create a RDD with specific number of partitions:
----------------------------------------------
scala> val rdd2 = sc.parallelize(List(1,2,3,4), 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
scala> rdd2.getNumPartitions
res9: Int = 2
scala> rdd2.partitions.length
res10: Int = 2
Use collect to preserve the order:
-----------------------------------------
Note: Don't use collect in Production Environment (Large Data Sets)
scala> rdd2.collect.foreach(println)
1
2
3
4
scala> rdd2.collect.map(x => x + 1).foreach(println)
2
3
4
5
scala> rdd2.collect.map(x => x * x).foreach(println)
1
4
9
16
scala> rdd2.collect.filter(x => x % 2 == 0).foreach(println)
2
4
Actions on RDD:
----------------------------
scala> rdd2.min
res25: Int = 1
scala> rdd2.max
res26: Int = 4
scala> rdd2.sum
res27: Double = 10.0
scala> rdd2.count
res28: Long = 4
Create a RDD from Text File:
---------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput")
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27
scala> fileRdd.getNumPartitions
res29: Int = 2
scala> fileRdd.partitions.length
res30: Int = 2
Create a RDD from Text File with specific partition number:
-------------------------------------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at textFile at <console>:27
scala> fileRdd.getNumPartitions
res31: Int = 1
scala> fileRdd.partitions.length
res32: Int = 1
Word Count Problem in Spark
---------------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at textFile at <console>:27
scala> fileRdd.collect.foreach(println)
I am going
to hyd
I am learning
hadoop course
scala> fileRdd.flatMap(line => line).collect.foreach(println)
scala> fileRdd.flatMap(line => line.split("")).collect.foreach(println)
I
a
m
g
o
i
n
g
.....
scala> fileRdd.flatMap(line => line.split(" ")).collect.foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course
scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at flatMap at <console>:29
scala> wordsRdd.collect.foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course
scala> val countsRdd = wordsRdd.map(word => (word,1))
countsRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:31
scala> countsRdd.collect.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(hyd,1)
(I,1)
(am,1)
(learning,1)
(hadoop,1)
(course,1)
scala> val wordCountRdd = countsRdd.reduceByKey( _ + _)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:33
scala> val wordCountRdd = countsRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[24] at reduceByKey at <console>:33
scala> wordCountRdd.collect.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)
Solutions:
-------------------
Solution1:
-------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val wordsRdd = fileRdd.flatMap(line => line.split(" "))
val countsRdd = wordsRdd.map(word => (word,1))
val wordCountRdd = countsRdd.reduceByKey( _ + _)
wordCountRdd.saveAsTextFile("file:///home/orienit/work/output/wordcount-op")
Solution2:
-------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey( _ + _)
wordCountRdd.saveAsTextFile("file:///home/orienit/work/output/wordcount-op")
Solution3:
-------------
sc.textFile("file:///home/orienit/work/input/demoinput", 1).flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey( _ + _).saveAsTextFile("file:///home/orienit/work/output/wordcount-op")
Grep Job using Spark:
------------------------------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val filterRdd = fileRdd.filter(line => line.contains("am"))
filterRdd.saveAsTextFile("file:///home/orienit/work/output/grep-op")
Sed Job using Spark:
------------------------------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val mapRdd = fileRdd.map(line => line.replace("am", "at"))
mapRdd.saveAsTextFile("file:///home/orienit/work/output/sed-op")
==============================================
SparkContext (sc) => Main point of contact in spark
RDD => Resilient Distributed Dataset
RDD Features:
------------------
-> Immutable
-> Lazy evaluated
-> Cacheable
-> Type inferred
list <- (1,2,3,4)
RDD operations:
--------------------
Transformation:
---------------
old rdd => new rdd
f(x) -> { x + 1 }
f(list) <- (2,3,4,5)
f(x) -> { x * x }
f(list) <- (1,4,9,16)
Action:
---------------
rdd => result
min(list) -> 1
max(list) -> 4
sum(list) -> 10
avg(list) -> 2.5
Spark Supports Scala, Java, Python and R
-------------------------------------------
Scala + Spark => spark-shell
Python + Spark => pyspark
R + Spark => SparkR
Spark with Scala examples:
--------------------------------------------
Create RDD in 2 ways:
------------------------
1. collection ( list / set / seq / ..)
2. data sets (text / csv / tsv / json / ...)
scala> val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
Transformations on RDD:
----------------------------
scala> rdd1.foreach(println)
3
4
2
1
scala> rdd1.map(x => x + 1).foreach(println)
5
4
2
3
scala> rdd1.map(x => x + 1).foreach(println)
2
4
5
3
How to get the number of partitions:
---------------------------------------
scala> rdd1.getNumPartitions
res7: Int = 4
scala> rdd1.partitions.length
res8: Int = 4
Create a RDD with specific number of partitions:
----------------------------------------------
scala> val rdd2 = sc.parallelize(List(1,2,3,4), 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
scala> rdd2.getNumPartitions
res9: Int = 2
scala> rdd2.partitions.length
res10: Int = 2
Use collect to preserve the order:
-----------------------------------------
Note: Don't use collect in Production Environment (Large Data Sets)
scala> rdd2.collect.foreach(println)
1
2
3
4
scala> rdd2.collect.map(x => x + 1).foreach(println)
2
3
4
5
scala> rdd2.collect.map(x => x * x).foreach(println)
1
4
9
16
scala> rdd2.collect.filter(x => x % 2 == 0).foreach(println)
2
4
Actions on RDD:
----------------------------
scala> rdd2.min
res25: Int = 1
scala> rdd2.max
res26: Int = 4
scala> rdd2.sum
res27: Double = 10.0
scala> rdd2.count
res28: Long = 4
Create a RDD from Text File:
---------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput")
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27
scala> fileRdd.getNumPartitions
res29: Int = 2
scala> fileRdd.partitions.length
res30: Int = 2
Create a RDD from Text File with specific partition number:
-------------------------------------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at textFile at <console>:27
scala> fileRdd.getNumPartitions
res31: Int = 1
scala> fileRdd.partitions.length
res32: Int = 1
Word Count Problem in Spark
---------------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at textFile at <console>:27
scala> fileRdd.collect.foreach(println)
I am going
to hyd
I am learning
hadoop course
scala> fileRdd.flatMap(line => line).collect.foreach(println)
scala> fileRdd.flatMap(line => line.split("")).collect.foreach(println)
I
a
m
g
o
i
n
g
.....
scala> fileRdd.flatMap(line => line.split(" ")).collect.foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course
scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at flatMap at <console>:29
scala> wordsRdd.collect.foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course
scala> val countsRdd = wordsRdd.map(word => (word,1))
countsRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:31
scala> countsRdd.collect.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(hyd,1)
(I,1)
(am,1)
(learning,1)
(hadoop,1)
(course,1)
scala> val wordCountRdd = countsRdd.reduceByKey( _ + _)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:33
scala> val wordCountRdd = countsRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[24] at reduceByKey at <console>:33
scala> wordCountRdd.collect.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)
Solutions:
-------------------
Solution1:
-------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val wordsRdd = fileRdd.flatMap(line => line.split(" "))
val countsRdd = wordsRdd.map(word => (word,1))
val wordCountRdd = countsRdd.reduceByKey( _ + _)
wordCountRdd.saveAsTextFile("file:///home/orienit/work/output/wordcount-op")
Solution2:
-------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey( _ + _)
wordCountRdd.saveAsTextFile("file:///home/orienit/work/output/wordcount-op")
Solution3:
-------------
sc.textFile("file:///home/orienit/work/input/demoinput", 1).flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey( _ + _).saveAsTextFile("file:///home/orienit/work/output/wordcount-op")
Grep Job using Spark:
------------------------------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val filterRdd = fileRdd.filter(line => line.contains("am"))
filterRdd.saveAsTextFile("file:///home/orienit/work/output/grep-op")
Sed Job using Spark:
------------------------------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
val mapRdd = fileRdd.map(line => line.replace("am", "at"))
mapRdd.saveAsTextFile("file:///home/orienit/work/output/sed-op")
Share this article with your friends.
No comments :
Post a Comment