Pre-Requisites of Flume Project:
hadoop-2.6.0
flume-1.6.0
mongodb-3.2.7
java-1.7
NOTE: Make sure that install all the above components
Flume Project Download Links:
`hadoop-2.6.0.tar.gz` ==> link
`apache-flume-1.6.0-bin.tar.gz` ==> link
`mongodb-linux-x86_64-ubuntu1404-3.2.7.tgz` ==> link
`kalyan-twitter-hdfs-mongo-agent.conf` ==> link
`kalyan-flume-project-0.1.jar` ==> link
`mongodb-driver-core-3.3.0.jar` ==> link
`mongo-java-driver-3.3.0.jar` ==> link
-----------------------------------------------------------------------------
1. create "kalyan-twitter-hdfs-mongo-agent.conf" file with below content
agent.sources = Twitter
agent.channels = MemChannel1 MemChannel2
agent.sinks = HDFS MongoDB
agent.sources.Twitter.type = com.orienit.kalyan.flume.source.KalyanTwitterSource
agent.sources.Twitter.channels = MemChannel1 MemChannel2
agent.sources.Twitter.consumerKey = ********
agent.sources.Twitter.consumerSecret = ********
agent.sources.Twitter.accessToken = ********
agent.sources.Twitter.accessTokenSecret = ********
agent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.channel = MemChannel1
agent.sinks.HDFS.hdfs.path = hdfs://localhost:8020/user/flume/tweets
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.hdfs.useLocalTimeStamp = true
agent.sinks.MongoDB.type = com.orienit.kalyan.flume.sink.KalyanMongoSink
agent.sinks.MongoDB.hostNames = localhost
agent.sinks.MongoDB.database = flume
agent.sinks.MongoDB.collection = twitter
agent.sinks.MongoDB.batchSize = 10
agent.sinks.MongoDB.channel = MemChannel2
agent.channels.MemChannel1.type = memory
agent.channels.MemChannel1.capacity = 1000
agent.channels.MemChannel1.transactionCapacity = 100
agent.channels.MemChannel2.type = memory
agent.channels.MemChannel2.capacity = 1000
agent.channels.MemChannel2.transactionCapacity = 100
2. Copy "kalyan-twitter-hdfs-mongo-agent.conf" file into "$FUME_HOME/conf" folder
3. Copy "kalyan-flume-project-0.1.jar, mongodb-driver-core-3.3.0.jar and mongo-java-driver-3.3.0.jar " files into"$FLUME_HOME/lib" folder
4. Execute the below command to `Extract data from Twitter into HDFS & MongoDB using Flume`
$FLUME_HOME/bin/flume-ng agent -n agent --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/kalyan-twitter-hdfs-mongo-agent.conf -Dflume.root.logger=DEBUG,console
5. Verify the data in console
6. Verify the data in HDFS and MongoDB
7. Start the MongoDB Server using below command
8. Start the MongoDB client using below command (mongo)
9. Verify the List of DataBases in MongoDB using below command (show dbs)
10. Verify the List of Operations in MongoDB using below commands
// list of databases
show dbs
// use flume database
use flume
// list of collections
show collections
// find the count of documents in 'twitter' collection
db.twitter.count()
// display list of documents in 'twitter' collection
db.twitter.find()
Pre-Requisites of Flume Project:
hadoop-2.6.0
flume-1.6.0
mongodb-3.2.7
java-1.7
NOTE: Make sure that install all the above components
Flume Project Download Links:
`hadoop-2.6.0.tar.gz` ==> link
`apache-flume-1.6.0-bin.tar.gz` ==> link
`mongodb-linux-x86_64-ubuntu1404-3.2.7.tgz` ==> link
`kalyan-twitter-mongo-agent.conf` ==> link
`kalyan-flume-project-0.1.jar` ==> link
`mongodb-driver-core-3.3.0.jar` ==> link
`mongo-java-driver-3.3.0.jar` ==> link
-----------------------------------------------------------------------------
1. create "kalyan-twitter-mongo-agent.conf" file with below content
agent.sources = Twitter
agent.channels = MemChannel
agent.sinks = MongoDB
agent.sources.Twitter.type = com.orienit.kalyan.flume.source.KalyanTwitterSource
agent.sources.Twitter.channels = MemChannel
agent.sources.Twitter.consumerKey = ********
agent.sources.Twitter.consumerSecret = ********
agent.sources.Twitter.accessToken = ********
agent.sources.Twitter.accessTokenSecret = ********
agent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
agent.sinks.MongoDB.type = com.orienit.kalyan.flume.sink.KalyanMongoSink
agent.sinks.MongoDB.hostNames = localhost
agent.sinks.MongoDB.database = flume
agent.sinks.MongoDB.collection = twitter
agent.sinks.MongoDB.batchSize = 10
agent.sinks.MongoDB.channel = MemChannel
agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 1000
agent.channels.MemChannel.transactionCapacity = 100
2. Copy "kalyan-twitter-mongo-agent.conf" file into "$FUME_HOME/conf" folder
3. Copy "kalyan-flume-project-0.1.jar, mongodb-driver-core-3.3.0.jar and mongo-java-driver-3.3.0.jar " files into"$FLUME_HOME/lib" folder
4. Execute the below command to `Extract data from Twitter into MongoDB using Flume`
$FLUME_HOME/bin/flume-ng agent -n agent --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/kalyan-twitter-mongo-agent.conf -Dflume.root.logger=DEBUG,console
5. Verify the data in console
6. Verify the data in MongoDB
7. Start the MongoDB Server using below command
8. Start the MongoDB client using below command (mongo)
9. Verify the List of DataBases in MongoDB using below command (show dbs)
10. Verify the List of Operations in MongoDB using below commands
// list of databases
show dbs
// use flume database
use flume
// list of collections
show collections
// find the count of documents in 'twitter' collection
db.twitter.count()
// display list of documents in 'twitter' collection
db.twitter.find()
Pre-Requisites of Flume Project:
hadoop-2.6.0
flume-1.6.0
java-1.7
NOTE: Make sure that install all the above components
Flume Project Download Links:
`hadoop-2.6.0.tar.gz` ==> link
`apache-flume-1.6.0-bin.tar.gz` ==> link
`kalyan-twitter-hdfs-agent.conf` ==> link
`kalyan-flume-project-0.1.jar` ==> link
-----------------------------------------------------------------------------
1. create "kalyan-twitter-hdfs-agent.conf" file with below content
agent.sources = Twitter
agent.channels = MemChannel
agent.sinks = HDFS
agent.sources.Twitter.type = com.orienit.kalyan.flume.source.KalyanTwitterSource
agent.sources.Twitter.channels = MemChannel
agent.sources.Twitter.consumerKey = ********
agent.sources.Twitter.consumerSecret = ********
agent.sources.Twitter.accessToken = ********
agent.sources.Twitter.accessTokenSecret = ********
agent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.hdfs.path = hdfs://localhost:8020/user/flume/tweets
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.hdfs.useLocalTimeStamp = true
agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 1000
agent.channels.MemChannel.transactionCapacity = 100
2. Copy "kalyan-twitter-hdfs-agent.conf" file into "$FUME_HOME/conf" folder
3. Copy "kalyan-flume-project-0.1.jar" file into"$FLUME_HOME/lib" folder
4. Execute the below command to `Extract data from Twitter into Hadoop using Flume`
$FLUME_HOME/bin/flume-ng agent -n agent --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/kalyan-twitter-hdfs-agent.conf -Dflume.root.logger=DEBUG,console
5. Verify the data in console
6. Verify the data in hdfs location is "hdfs://localhost:8020/user/flume/tweets"
SPARK SQL Practice
------------------------------------------------------------
To integrate Spark with Hive ... we need to copy 'hive-site.xml' from '$HIVE_HOME/conf' folder to '$SPARK_HOME/conf' folder
hive> select * from test.student;
OK
arun 1 cse 1
sunil 2 cse 1
raj 3 cse 1
naveen 4 cse 1
venki 5 cse 2
prasad 6 cse 2
sudha 7 cse 2
ravi 1 mech 1
raju 2 mech 1
roja 3 mech 1
anil 4 mech 2
rani 5 mech 2
anvith 6 mech 2
madhu 7 mech 2
Time taken: 0.143 seconds, Fetched: 14 row(s)
hive> select year, count(*) from test.``student group by year;
OK
1 7
2 7
Time taken: 21.865 seconds, Fetched: 2 row(s)
scala> sqlContext.sql("select year, count(*) from test.student group by year").show
+----+---+
|year|_c1|
+----+---+
| 1| 7|
| 2| 7|
+----+---+
--------------------------------------------------------------------
Convert any data into `DataFrame` then
we can execute all SQL queries like select, join, group by, order by .....
----------------------------------------------------------------------
val df = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/hadoop/spark/input/student.parquet")
scala> val df = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> val df = sqlContext.read.parquet("file:///home/hadoop/spark/input/student.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
scala> df.filter(df("id") > 3)
res12: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.filter(df("id") > 3).show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| sunil| 4|hadoop|2015|
+------+---+------+----+
scala> df.filter(df("id") > 3).filter(df("id") < 5)
res14: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.filter(df("id") > 3).filter(df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil| 4|hadoop|2015|
+-----+---+------+----+
scala> df.filter(df("id") > 3 and df("id") < 5)
res16: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> df.filter(df("id") > 3 and df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil| 4|hadoop|2015|
+-----+---+------+----+
scala> df.where(df("id") > 3 and df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil| 4|hadoop|2015|
+-----+---+------+----+
scala> df.select(df("name"), df("year"))
res20: org.apache.spark.sql.DataFrame = [name: string, year: int]
scala> df.select(df("name"), df("year")).show
+------+----+
| name|year|
+------+----+
| anil|2016|
|anvith|2015|
| dev|2015|
| raj|2016|
| sunil|2015|
|venkat|2016|
+------+----+
// Register DataFrame as Table
scala> df.registerTempTable("studentdf")
// Retrieve the data from Table using Sql Queries
scala> sqlContext.sql("select * from studentdf")
res23: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> sqlContext.sql("select * from studentdf").show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
// Retrieve the data from DataFrame using DSL
scala> df.show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
| anil| 1| spark|2016|
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
|venkat| 2| spark|2016|
+------+---+------+----+
scala> sqlContext.sql("select year, count(*) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015| 3|
|2016| 3|
+----+---+
scala> sqlContext.sql("select year, sum(id) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015| 15|
|2016| 6|
+----+---+
scala> sqlContext.sql("select year, min(id) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015| 4|
|2016| 1|
+----+---+
----------------------------------------------------------
scala> val hiveDF = sqlContext.sql("select * from test.student")
hiveDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]
scala> val parDF = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
parDF: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> hiveDF.registerTempTable("hivetbl")
scala> parDF.registerTempTable("partbl")
scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id = partbl.id").show
+------+---+------+----+------+---+------+----+
| name| id|course|year|course| id| name|year|
+------+---+------+----+------+---+------+----+
| arun| 1| cse| 1| spark| 1| anil|2016|
| sunil| 2| cse| 1| spark| 2|venkat|2016|
| raj| 3| cse| 1| spark| 3| raj|2016|
|naveen| 4| cse| 1|hadoop| 4| sunil|2015|
| venki| 5| cse| 2|hadoop| 5|anvith|2015|
|prasad| 6| cse| 2|hadoop| 6| dev|2015|
| ravi| 1| mech| 1| spark| 1| anil|2016|
| raju| 2| mech| 1| spark| 2|venkat|2016|
| roja| 3| mech| 1| spark| 3| raj|2016|
| anil| 4| mech| 2|hadoop| 4| sunil|2015|
| rani| 5| mech| 2|hadoop| 5|anvith|2015|
|anvith| 6| mech| 2|hadoop| 6| dev|2015|
+------+---+------+----+------+---+------+----+
scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id != partbl.id").show
+------+---+------+----+------+---+------+----+
| name| id|course|year|course| id| name|year|
+------+---+------+----+------+---+------+----+
| arun| 1| cse| 1|hadoop| 5|anvith|2015|
| arun| 1| cse| 1|hadoop| 6| dev|2015|
| arun| 1| cse| 1| spark| 3| raj|2016|
| arun| 1| cse| 1|hadoop| 4| sunil|2015|
| arun| 1| cse| 1| spark| 2|venkat|2016|
| sunil| 2| cse| 1| spark| 1| anil|2016|
| sunil| 2| cse| 1|hadoop| 5|anvith|2015|
| sunil| 2| cse| 1|hadoop| 6| dev|2015|
| sunil| 2| cse| 1| spark| 3| raj|2016|
| sunil| 2| cse| 1|hadoop| 4| sunil|2015|
| raj| 3| cse| 1| spark| 1| anil|2016|
| raj| 3| cse| 1|hadoop| 5|anvith|2015|
| raj| 3| cse| 1|hadoop| 6| dev|2015|
| raj| 3| cse| 1|hadoop| 4| sunil|2015|
| raj| 3| cse| 1| spark| 2|venkat|2016|
|naveen| 4| cse| 1| spark| 1| anil|2016|
|naveen| 4| cse| 1|hadoop| 5|anvith|2015|
|naveen| 4| cse| 1|hadoop| 6| dev|2015|
|naveen| 4| cse| 1| spark| 3| raj|2016|
|naveen| 4| cse| 1| spark| 2|venkat|2016|
+------+---+------+----+------+---+------+----+
only showing top 20 rows
scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id != partbl.id").foreach(println)
[arun,1,cse,1,hadoop,5,anvith,2015]
[arun,1,cse,1,hadoop,6,dev,2015]
[arun,1,cse,1,spark,3,raj,2016]
[arun,1,cse,1,hadoop,4,sunil,2015]
[arun,1,cse,1,spark,2,venkat,2016]
[sunil,2,cse,1,spark,1,anil,2016]
[sunil,2,cse,1,hadoop,5,anvith,2015]
[sunil,2,cse,1,hadoop,6,dev,2015]
[sunil,2,cse,1,spark,3,raj,2016]
[sunil,2,cse,1,hadoop,4,sunil,2015]
[raj,3,cse,1,spark,1,anil,2016]
[raj,3,cse,1,hadoop,5,anvith,2015]
[raj,3,cse,1,hadoop,6,dev,2015]
[raj,3,cse,1,hadoop,4,sunil,2015]
[raj,3,cse,1,spark,2,venkat,2016]
[naveen,4,cse,1,spark,1,anil,2016]
[naveen,4,cse,1,hadoop,5,anvith,2015]
[naveen,4,cse,1,hadoop,6,dev,2015]
[naveen,4,cse,1,spark,3,raj,2016]
[naveen,4,cse,1,spark,2,venkat,2016]
[venki,5,cse,2,spark,1,anil,2016]
[venki,5,cse,2,hadoop,6,dev,2015]
[venki,5,cse,2,spark,3,raj,2016]
[venki,5,cse,2,hadoop,4,sunil,2015]
[venki,5,cse,2,spark,2,venkat,2016]
[prasad,6,cse,2,spark,1,anil,2016]
[prasad,6,cse,2,hadoop,5,anvith,2015]
[prasad,6,cse,2,spark,3,raj,2016]
[prasad,6,cse,2,hadoop,4,sunil,2015]
[prasad,6,cse,2,spark,2,venkat,2016]
[sudha,7,cse,2,spark,1,anil,2016]
[sudha,7,cse,2,hadoop,5,anvith,2015]
[sudha,7,cse,2,hadoop,6,dev,2015]
[sudha,7,cse,2,spark,3,raj,2016]
[sudha,7,cse,2,hadoop,4,sunil,2015]
[sudha,7,cse,2,spark,2,venkat,2016]
[ravi,1,mech,1,hadoop,5,anvith,2015]
[ravi,1,mech,1,hadoop,6,dev,2015]
[ravi,1,mech,1,spark,3,raj,2016]
[ravi,1,mech,1,hadoop,4,sunil,2015]
[ravi,1,mech,1,spark,2,venkat,2016]
[raju,2,mech,1,spark,1,anil,2016]
[raju,2,mech,1,hadoop,5,anvith,2015]
[raju,2,mech,1,hadoop,6,dev,2015]
[raju,2,mech,1,spark,3,raj,2016]
[raju,2,mech,1,hadoop,4,sunil,2015]
[roja,3,mech,1,spark,1,anil,2016]
[roja,3,mech,1,hadoop,5,anvith,2015]
[roja,3,mech,1,hadoop,6,dev,2015]
[roja,3,mech,1,hadoop,4,sunil,2015]
[roja,3,mech,1,spark,2,venkat,2016]
[anil,4,mech,2,spark,1,anil,2016]
[anil,4,mech,2,hadoop,5,anvith,2015]
[anil,4,mech,2,hadoop,6,dev,2015]
[anil,4,mech,2,spark,3,raj,2016]
[anil,4,mech,2,spark,2,venkat,2016]
[rani,5,mech,2,spark,1,anil,2016]
[rani,5,mech,2,hadoop,6,dev,2015]
[rani,5,mech,2,spark,3,raj,2016]
[rani,5,mech,2,hadoop,4,sunil,2015]
[rani,5,mech,2,spark,2,venkat,2016]
[anvith,6,mech,2,spark,1,anil,2016]
[anvith,6,mech,2,hadoop,5,anvith,2015]
[anvith,6,mech,2,spark,3,raj,2016]
[anvith,6,mech,2,hadoop,4,sunil,2015]
[anvith,6,mech,2,spark,2,venkat,2016]
[madhu,7,mech,2,spark,1,anil,2016]
[madhu,7,mech,2,hadoop,5,anvith,2015]
[madhu,7,mech,2,hadoop,6,dev,2015]
[madhu,7,mech,2,spark,3,raj,2016]
[madhu,7,mech,2,hadoop,4,sunil,2015]
[madhu,7,mech,2,spark,2,venkat,2016]
mysql> select * from student;
+----+-------------+-------+------+
| id | name | class | mark |
+----+-------------+-------+------+
| 1 | John Deo | Four | 75 |
| 2 | Max Ruin | Three | 85 |
| 3 | Arnold | Three | 55 |
| 4 | Krish Star | Four | 60 |
| 5 | John Mike | Four | 60 |
| 6 | Alex John | Four | 55 |
| 7 | My John Rob | Fifth | 78 |
| 8 | Asruid | Five | 85 |
| 9 | Tes Qry | Six | 78 |
| 10 | Big John | Four | 55 |
| 11 | Ronald | Six | 89 |
| 12 | Recky | Six | 94 |
| 13 | Kty | Seven | 88 |
| 14 | Bigy | Seven | 88 |
| 15 | Tade Row | Four | 88 |
| 16 | Gimmy | Four | 88 |
| 17 | Tumyu | Six | 54 |
| 18 | Honny | Five | 75 |
| 19 | Tinny | Nine | 18 |
| 20 | Jackly | Nine | 65 |
| 21 | Babby John | Four | 69 |
| 22 | Reggid | Seven | 55 |
| 23 | Herod | Eight | 79 |
| 24 | Tiddy Now | Seven | 78 |
| 25 | Giff Tow | Seven | 88 |
| 26 | Crelea | Seven | 79 |
| 27 | Big Nose | Three | 81 |
| 28 | Rojj Base | Seven | 86 |
| 29 | Tess Played | Seven | 55 |
| 30 | Reppy Red | Six | 79 |
| 31 | Marry Toeey | Four | 88 |
| 32 | Binn Rott | Seven | 90 |
| 33 | Kenn Rein | Six | 96 |
| 34 | Gain Toe | Seven | 69 |
| 35 | Rows Noump | Six | 88 |
+----+-------------+-------+------+
35 rows in set (0.00 sec)
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/sqoop", "student", prop)
jdbcDF.show()
scala> val prop = new java.util.Properties
prop: java.util.Properties = {}
scala> prop.setProperty("driver","com.mysql.jdbc.Driver")
res35: Object = null
scala> prop.setProperty("user","root")
res36: Object = null
scala> prop.setProperty("password","hadoop")
res37: Object = null
scala>
scala> val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/sqoop", "student", prop)
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string, class: string, mark: int]
scala>
scala> jdbcDF.show()
+---+-----------+-----+----+
| id| name|class|mark|
+---+-----------+-----+----+
| 1| John Deo| Four| 75|
| 2| Max Ruin|Three| 85|
| 3| Arnold|Three| 55|
| 4| Krish Star| Four| 60|
| 5| John Mike| Four| 60|
| 6| Alex John| Four| 55|
| 7|My John Rob|Fifth| 78|
| 8| Asruid| Five| 85|
| 9| Tes Qry| Six| 78|
| 10| Big John| Four| 55|
| 11| Ronald| Six| 89|
| 12| Recky| Six| 94|
| 13| Kty|Seven| 88|
| 14| Bigy|Seven| 88|
| 15| Tade Row| Four| 88|
| 16| Gimmy| Four| 88|
| 17| Tumyu| Six| 54|
| 18| Honny| Five| 75|
| 19| Tinny| Nine| 18|
| 20| Jackly| Nine| 65|
+---+-----------+-----+----+
only showing top 20 rows
Spark RDD operations
----------------------------------------
command line options in spark:
Scala => spark-shell
Python => pyspark
R => SparkR
----------------------------------------
// create a RDD from scala collection
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27
// get the number of partitions from RDD
scala> rdd.getNumPartitions
res1: Int = 4
// create a RDD from scala collection using number of partitions
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27
// get the number of partitions from RDD
scala> rdd.getNumPartitions
res2: Int = 2
// create a RDD from scala collection using number of partitions
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27
// get the number of partitions from RDD
scala> rdd.getNumPartitions
res3: Int = 1
----------------------------------------
// create a RDD from scala collection
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
// sum the RDD data
scala> rdd.sum
res4: Double = 21.0
// min the RDD data
scala> rdd.min
res5: Int = 1
// max the RDD data
scala> rdd.max
res6: Int = 6
// transform the RDD data (x => x + 1)
scala> val rdd1 = rdd.map(x => x + 1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:29
scala> rdd.foreach(println)
1
2
3
5
6
4
scala> rdd1.foreach(println)
2
5
3
4
6
7
// transform the RDD data (x => x > 1)
scala> val rdd2 = rdd.filter(x => x > 3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:29
scala> rdd2.foreach(println)
4
5
6
----------------------------------------
We can prepare a RDD with 2 approaches
1. from collection
val rdd = sc.parallelize(collection, number of partitions)
2. from data source(text / csv / tsv / json files....)
val rdd = sc.textFile(path of the file, number of partitions )
Note: file can be from Local File System / HDFS / S3 / FTP ....
----------------------------------------
scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput")
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:27
scala> rdd.getNumPartitions
res10: Int = 2
scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:27
scala> rdd.getNumPartitions
res11: Int = 1
scala> rdd.foreach(println)
I am going
to hyd
I am learning
hadoop course
----------------------------------------
scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27
scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course
scala> fileRDD.flatMap(line => line.split(" ")).foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course
scala> val wordsRDD = fileRDD.flatMap(line => line.split(" "))
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:29
scala> val tupleRDD = wordsRDD.map( word => (word, 1))
tupleRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:31
scala> tupleRDD.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(hyd,1)
(I,1)
(am,1)
(learning,1)
(hadoop,1)
(course,1)
scala> val wordcountRDD = tupleRDD.reduceByKey( (a,b) => a + b )
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:33
scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)
scala> val wordcountRDD = tupleRDD.reduceByKey( _ + _)
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:33
scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)
scala> wordcountRDD.sortBy( t => t._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
scala> wordcountRDD.sortBy( t => t._2).foreach(println)
(learning,1)
(hadoop,1)
(hyd,1)
(to,1)
(going,1)
(course,1)
(am,2)
(I,2)
scala> wordcountRDD.sortBy( t => t._1, false).foreach(println)
(to,1)
(learning,1)
(hyd,1)
(hadoop,1)
(going,1)
(course,1)
(am,2)
(I,2)
scala> wordcountRDD.sortBy( t => t._1, true).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
scala> wordcountRDD.sortBy( t => t._1, true).getNumPartitions
res24: Int = 1
scala> wordcountRDD.sortBy( t => t._1, true, 2).getNumPartitions
res25: Int = 2
----------------------------------------------
Word Count in Spark using Scala
----------------------------------------------
val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
val wordsRDD = fileRDD.flatMap(line => line.split(" "))
val tupleRDD = wordsRDD.map( word => (word, 1))
val wordcountRDD = tupleRDD.reduceByKey( _ + _)
(or)
val wordcountRDD = fileRDD.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey( _ + _)
wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-op")
val sortedRDD = wordcountRDD.sortBy( t => t._1, true)
sortedRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-sorted-op")
sortedRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-sorted-op")
----------------------------------------------
Word Count in Spark using Python
----------------------------------------------
fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
wordsRDD = fileRDD.flatMap(lambda line : line.split(" "))
tupleRDD = wordsRDD.map(lambda word : (word, 1))
wordcountRDD = tupleRDD.reduceByKey( lambda a, b : a + b)
(or)
wordcountRDD = fileRDD.flatMap(lambda line : line.split(" "))).map( lambda word : (word, 1)).reduceByKey( lambda a, b : a + b )
wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-python-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-python-op")
----------------------------------------------
Grep Job in Spark using Scala
----------------------------------------------
scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at textFile at <console>:27
scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course
scala> val grepRDD = fileRDD.filter(line => line.contains("am"))
grepRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at filter at <console>:29
scala> grepRDD.foreach(println)
I am going
I am learning
--------------------------------------------------------
scala> val list1 = List(1,2,3)
list1: List[Int] = List(1, 2, 3)
scala> val list2 = List("aa","bb","cc")
list2: List[String] = List(aa, bb, cc)
scala> val arr1 = Array(1,2,3)
arr1: Array[Int] = Array(1, 2, 3)
scala> val arr2 = Array("aa","bb","cc")
arr2: Array[String] = Array(aa, bb, cc)
scala> val seq1 = Seq(1,2,3)
seq1: Seq[Int] = List(1, 2, 3)
scala> val set1 = Set(1,2,3)
set1: scala.collection.immutable.Set[Int] = Set(1, 2, 3)
scala> val vec1 = Vector(1,2,3)
vec1: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)
--------------------------------------------------------
scala> val map1 = Map( 1 -> "aa", 2 -> "bb", 3 -> "cc")
map1: scala.collection.immutable.Map[Int,String] = Map(1 -> aa, 2 -> bb, 3 -> cc)
scala> list1
res0: List[Int] = List(1, 2, 3)
scala> list2
res1: List[String] = List(aa, bb, cc)
scala> val map2 = list1.zip(list2)
map2: List[(Int, String)] = List((1,aa), (2,bb), (3,cc))
scala> val map12 = map2.toMap
map12: scala.collection.immutable.Map[Int,String] = Map(1 -> aa, 2 -> bb, 3 -> cc)
--------------------------------------------------------
scala> val list1 = List(1,2,3)
list1: List[Int] = List(1, 2, 3)
scala> val list2 = 1 :: 2 :: 3 :: Nil
list2: List[Int] = List(1, 2, 3)
scala> val list3 = (1 :: (2 :: (3 :: Nil)))
list3: List[Int] = List(1, 2, 3)
scala> val list4 = 0 :: list3
list4: List[Int] = List(0, 1, 2, 3)
scala> val list5 = list4 :: 4
<console>:12: error: value :: is not a member of Int
val list5 = list4 :: 4
^
scala> val list5 = list4 :: 4 :: Nil
list5: List[Any] = List(List(0, 1, 2, 3), 4)
scala> val list5 = list4 ::: 4 :: Nil
list5: List[Int] = List(0, 1, 2, 3, 4)
--------------------------------------------------------
scala> list1
res2: List[Int] = List(1, 2, 3)
scala> val list2 = list1.map((x : Int) => {x + 1})
list2: List[Int] = List(2, 3, 4)
scala> val list2 = list1.map((x : Int) => x + 1)
list2: List[Int] = List(2, 3, 4)
scala> val list2 = list1.map(x => x + 1)
list2: List[Int] = List(2, 3, 4)
scala> val list2 = list1.map( _ + 1)
list2: List[Int] = List(2, 3, 4)
scala> list1
res3: List[Int] = List(1, 2, 3)
scala> val list2 = list1.filter(x => x > 1)
list2: List[Int] = List(2, 3)
scala> val list2 = list1.filter(_ > 1)
list2: List[Int] = List(2, 3)
scala> val list1 = List(List(1,2,3), List(4,5,6))
list1: List[List[Int]] = List(List(1, 2, 3), List(4, 5, 6))
scala> val list2 = list1.flatMap(x => x)
list2: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val list1 = List(List("I am going", "to hyd"), List("I am learning", "hadoop course"))
list1: List[List[String]] = List(List(I am going, to hyd), List(I am learning, hadoop course))
scala> val list2 = list1.flatMap(x => x)
list2: List[String] = List(I am going, to hyd, I am learning, hadoop course)
scala> val list3 = list2.flatMap(x => x)
list3: List[Char] = List(I, , a, m, , g, o, i, n, g, t, o, , h, y, d, I, , a, m, , l, e, a, r, n, i, n, g, h, a, d, o, o, p, , c, o, u, r, s, e)
scala> val list3 = list2.flatMap(x => x.split(""))
list3: List[String] = List(I, " ", a, m, " ", g, o, i, n, g, t, o, " ", h, y, d, I, " ", a, m, " ", l, e, a, r, n, i, n, g, h, a, d, o, o, p, " ", c, o, u, r, s, e)
scala> val list3 = list2.flatMap(x => x.split(" "))
list3: List[String] = List(I, am, going, to, hyd, I, am, learning, hadoop, course)
scala> val list3 = list2.flatMap(x => x.split(" "))
list3: List[String] = List(I, am, going, to, hyd, I, am, learning, hadoop, course)
scala> val map = list3.map(x => (x , 1))
map: List[(String, Int)] = List((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))
scala> list3.count(x => true)
res10: Int = 10
scala> list3.count(x => x.length > 2)
res11: Int = 5
scala> val group = list3.groupBy(x => x)
group: scala.collection.immutable.Map[String,List[String]] = Map(course -> List(course), am -> List(am, am), going -> List(going), I -> List(I, I), hyd -> List(hyd), to -> List(to), hadoop -> List(hadoop), learning -> List(learning))
scala> val wordcount = group.map(x => (x._1, x._2))
wordcount: scala.collection.immutable.Map[String,List[String]] = Map(course -> List(course), am -> List(am, am), going -> List(going), I -> List(I, I), hyd -> List(hyd), to -> List(to), hadoop -> List(hadoop), learning -> List(learning))
scala> val wordcount = group.map(x => (x._1, x._2.count(y => true)))
wordcount: scala.collection.immutable.Map[String,Int] = Map(course -> 1, am -> 2, going -> 1, I -> 2, hyd -> 1, to -> 1, hadoop -> 1, learning -> 1)
scala> wordcount.foreach(x => println(x))
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)
scala> wordcount.foreach(println)
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)
scala> wordcount.toList.sorted.foreach(println))
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
scala> wordcount.toList.sortBy(x => x._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
scala> wordcount.toList.sortBy(x => x._2).foreach(println)
(course,1)
(going,1)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)
(am,2)
(I,2)
scala> wordcount.toList.sortWith((x,y) => x._1 > y._1).foreach(println)
(to,1)
(learning,1)
(hyd,1)
(hadoop,1)
(going,1)
(course,1)
(am,2)
(I,2)
scala> wordcount.toList.sortWith((x,y) => x._1 < y._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
scala> wordcount.toList.sortWith((x,y) => x._2 < y._2).foreach(println)
(course,1)
(going,1)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)
(am,2)
(I,2)
scala> wordcount.toList.sortWith((x,y) => x._2 > y._2).foreach(println)
(am,2)
(I,2)
(course,1)
(going,1)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)