Wednesday, 28 September 2016

How To Stream Twitter Data Into Hadoop and MongoDB Using Apache Flume

Pre-Requisites of Flume Project:
hadoop-2.6.0
flume-1.6.0

mongodb-3.2.7
java-1.7


NOTE: Make sure that install all the above components

Flume Project Download Links:
`hadoop-2.6.0.tar.gz` ==> link
`apache-flume-1.6.0-bin.tar.gz` ==> link

`mongodb-linux-x86_64-ubuntu1404-3.2.7.tgz` ==> link
`kalyan-twitter-hdfs-mongo-agent.conf` ==> link
`kalyan-flume-project-0.1.jar` ==> link

`mongodb-driver-core-3.3.0.jar` ==> link
`mongo-java-driver-3.3.0.jar` ==> link

-----------------------------------------------------------------------------

1. create "kalyan-twitter-hdfs-mongo-agent.conf" file with below content

agent.sources = Twitter
agent.channels = MemChannel1 
MemChannel2
agent.sinks = HDFS 
MongoDB

agent.sources.Twitter.type = com.orienit.kalyan.flume.source.KalyanTwitterSource
agent.sources.Twitter.channels = MemChannel1 
MemChannel2
agent.sources.Twitter.consumerKey = ********
agent.sources.Twitter.consumerSecret = ********
agent.sources.Twitter.accessToken = ********
agent.sources.Twitter.accessTokenSecret = ********
agent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing



agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.channel = MemChannel1
agent.sinks.HDFS.hdfs.path = hdfs://localhost:8020/user/flume/tweets
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.hdfs.useLocalTimeStamp = true


agent.sinks.MongoDB.type = com.orienit.kalyan.flume.sink.KalyanMongoSink 

agent.sinks.MongoDB.hostNames = localhost 
agent.sinks.MongoDB.database = flume 
agent.sinks.MongoDB.collection = twitter 
agent.sinks.MongoDB.batchSize = 10 
agent.sinks.MongoDB.channel = MemChannel2

agent.channels.MemChannel1.type = memory
agent.channels.MemChannel1.capacity = 1000
agent.channels.MemChannel1.transactionCapacity = 100


agent.channels.MemChannel2.type = memory
agent.channels.MemChannel2.capacity = 1000
agent.channels.MemChannel2.transactionCapacity = 100


2. Copy "kalyan-twitter-hdfs-mongo-agent.conf" file into "$FUME_HOME/conf" folder

3. Copy "kalyan-flume-project-0.1.jar, 
mongodb-driver-core-3.3.0.jar and mongo-java-driver-3.3.0.jar " files into"$FLUME_HOME/lib" folder

4. Execute the below command to `Extract data from Twitter into HDFSMongoDB using Flume`

$FLUME_HOME/bin/flume-ng agent -n agent --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/kalyan-twitter-hdfs-mongo-agent.conf -Dflume.root.logger=DEBUG,console





5. Verify the data in console





6. Verify the data in HDFS and MongoDB






7. Start the MongoDB Server using below command



8. Start the MongoDB client using below command (mongo)





9. Verify the List of DataBases in MongoDB using below command (show dbs)



10. Verify the List of Operations in MongoDB using below commands

// list of databases
show dbs

// use flume database
use flume

// list of collections
show collections

// find the count of documents in 'twitter' collection
db.twitter.count()

// display list of documents in 'twitter' collection
db.twitter.find()





How To Stream Twitter Data Into MongoDB Using Apache Flume

Pre-Requisites of Flume Project:
hadoop-2.6.0
flume-1.6.0

mongodb-3.2.7
java-1.7


NOTE: Make sure that install all the above components

Flume Project Download Links:
`hadoop-2.6.0.tar.gz` ==> link
`apache-flume-1.6.0-bin.tar.gz` ==> link

`mongodb-linux-x86_64-ubuntu1404-3.2.7.tgz` ==> link
`kalyan-twitter-mongo-agent.conf` ==> link
`kalyan-flume-project-0.1.jar` ==> link

`mongodb-driver-core-3.3.0.jar` ==> link
`mongo-java-driver-3.3.0.jar` ==> link

-----------------------------------------------------------------------------

1. create "kalyan-twitter-mongo-agent.conf" file with below content

agent.sources = Twitter
agent.channels = MemChannel
agent.sinks = 
MongoDB

agent.sources.Twitter.type = com.orienit.kalyan.flume.source.KalyanTwitterSource
agent.sources.Twitter.channels = MemChannel
agent.sources.Twitter.consumerKey = ********
agent.sources.Twitter.consumerSecret = ********
agent.sources.Twitter.accessToken = ********
agent.sources.Twitter.accessTokenSecret = ********
agent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing


agent.sinks.MongoDB.type = com.orienit.kalyan.flume.sink.KalyanMongoSink

agent.sinks.MongoDB.hostNames = localhost 
agent.sinks.MongoDB.database = flume 
agent.sinks.MongoDB.collection = twitter 
agent.sinks.MongoDB.batchSize = 10 
agent.sinks.MongoDB.channel = MemChannel

agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 1000
agent.channels.MemChannel.transactionCapacity = 100


2. Copy "kalyan-twitter-mongo-agent.conf" file into "$FUME_HOME/conf" folder

3. Copy "kalyan-flume-project-0.1.jar, 
mongodb-driver-core-3.3.0.jar and mongo-java-driver-3.3.0.jar " files into"$FLUME_HOME/lib" folder

4. Execute the below command to `Extract data from Twitter into MongoDB using Flume`

$FLUME_HOME/bin/flume-ng agent -n agent --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/kalyan-twitter-mongo-agent.conf -Dflume.root.logger=DEBUG,console





5. Verify the data in console





6. Verify the data in MongoDB


7. Start the MongoDB Server using below command



8. Start the MongoDB client using below command (mongo)





9. Verify the List of DataBases in MongoDB using below command (show dbs)



10. Verify the List of Operations in MongoDB using below commands

// list of databases
show dbs

// use flume database
use flume

// list of collections
show collections

// find the count of documents in 'twitter' collection
db.twitter.count()


// display list of documents in 'twitter' collection
db.twitter.find()





How To Stream Twitter Data Into Hadoop Using Apache Flume

Pre-Requisites of Flume Project:
hadoop-2.6.0
flume-1.6.0
java-1.7


NOTE: Make sure that install all the above components

Flume Project Download Links:
`hadoop-2.6.0.tar.gz` ==> link
`apache-flume-1.6.0-bin.tar.gz` ==> link
`kalyan-twitter-hdfs-agent.conf` ==> link
`kalyan-flume-project-0.1.jar` ==> link


-----------------------------------------------------------------------------

1. create "kalyan-twitter-hdfs-agent.conf" file with below content

agent.sources = Twitter
agent.channels = MemChannel
agent.sinks = HDFS

agent.sources.Twitter.type = com.orienit.kalyan.flume.source.KalyanTwitterSource
agent.sources.Twitter.channels = MemChannel
agent.sources.Twitter.consumerKey = ********
agent.sources.Twitter.consumerSecret = ********
agent.sources.Twitter.accessToken = ********
agent.sources.Twitter.accessTokenSecret = ********
agent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.channel = MemChannel
agent.sinks.HDFS.hdfs.path = hdfs://localhost:8020/user/flume/tweets
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.batchSize = 100
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 100
agent.sinks.HDFS.hdfs.useLocalTimeStamp = true

agent.channels.MemChannel.type = memory
agent.channels.MemChannel.capacity = 1000
agent.channels.MemChannel.transactionCapacity = 100


2. Copy "kalyan-twitter-hdfs-agent.conf" file into "$FUME_HOME/conf" folder

3. Copy "kalyan-flume-project-0.1.jar" file into"$FLUME_HOME/lib" folder

4. Execute the below command to `Extract data from Twitter into Hadoop using Flume`

$FLUME_HOME/bin/flume-ng agent -n agent --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/kalyan-twitter-hdfs-agent.conf -Dflume.root.logger=DEBUG,console





5. Verify the data in console





6. Verify the data in hdfs location is "hdfs://localhost:8020/user/flume/tweets"







Friday, 23 September 2016

SPARK BASICS DAY 2 Practice on 24 Sept 2016


















SPARK SQL Practice
------------------------------------------------------------

To integrate Spark with Hive ... we need to copy 'hive-site.xml' from '$HIVE_HOME/conf' folder to '$SPARK_HOME/conf' folder


hive> select * from test.student;
OK
arun 1 cse 1
sunil 2 cse 1
raj 3 cse 1
naveen 4 cse 1
venki 5 cse 2
prasad 6 cse 2
sudha 7 cse 2
ravi 1 mech 1
raju 2 mech 1
roja 3 mech 1
anil 4 mech 2
rani 5 mech 2
anvith 6 mech 2
madhu 7 mech 2
Time taken: 0.143 seconds, Fetched: 14 row(s)

hive> select year, count(*) from test.``student group by year;
OK
1 7
2 7
Time taken: 21.865 seconds, Fetched: 2 row(s)


scala> sqlContext.sql("select year, count(*) from test.student group by year").show
+----+---+
|year|_c1|
+----+---+
|   1|  7|
|   2|  7|
+----+---+

--------------------------------------------------------------------

Convert any data into `DataFrame` then 

we can execute all SQL queries like select, join, group by, order by .....

----------------------------------------------------------------------


val df = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")

val df = sqlContext.read.parquet("file:///home/hadoop/spark/input/student.parquet")


scala> val df = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]


scala> val df = sqlContext.read.parquet("file:///home/hadoop/spark/input/student.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]


scala> df.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


scala> df.filter(df("id") > 3)
res12: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.filter(df("id") > 3).show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
| sunil|  4|hadoop|2015|
+------+---+------+----+


scala> df.filter(df("id") > 3).filter(df("id") < 5)
res14: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.filter(df("id") > 3).filter(df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil|  4|hadoop|2015|
+-----+---+------+----+


scala> df.filter(df("id") > 3 and df("id") < 5)
res16: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.filter(df("id") > 3 and df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil|  4|hadoop|2015|
+-----+---+------+----+

scala> df.where(df("id") > 3 and df("id") < 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|sunil|  4|hadoop|2015|
+-----+---+------+----+

scala> df.select(df("name"), df("year"))
res20: org.apache.spark.sql.DataFrame = [name: string, year: int]

scala> df.select(df("name"), df("year")).show
+------+----+
|  name|year|
+------+----+
|  anil|2016|
|anvith|2015|
|   dev|2015|
|   raj|2016|
| sunil|2015|
|venkat|2016|
+------+----+


// Register DataFrame as Table

scala> df.registerTempTable("studentdf")


// Retrieve the data from Table using Sql Queries

scala> sqlContext.sql("select * from studentdf")
res23: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> sqlContext.sql("select * from studentdf").show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


// Retrieve the data from DataFrame using DSL

scala> df.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+

scala> sqlContext.sql("select year, count(*) from studentdf group by year").show 
+----+---+
|year|_c1|
+----+---+
|2015|  3|
|2016|  3|
+----+---+


scala> sqlContext.sql("select year, sum(id) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015| 15|
|2016|  6|
+----+---+


scala> sqlContext.sql("select year, min(id) from studentdf group by year").show
+----+---+
|year|_c1|
+----+---+
|2015|  4|
|2016|  1|
+----+---+


----------------------------------------------------------

scala> val hiveDF = sqlContext.sql("select * from test.student")
hiveDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> val parDF = sqlContext.read.json("file:///home/hadoop/spark/input/student.json")
parDF: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]


scala> hiveDF.registerTempTable("hivetbl")

scala> parDF.registerTempTable("partbl")

scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id = partbl.id").show
+------+---+------+----+------+---+------+----+
|  name| id|course|year|course| id|  name|year|
+------+---+------+----+------+---+------+----+
|  arun|  1|   cse|   1| spark|  1|  anil|2016|
| sunil|  2|   cse|   1| spark|  2|venkat|2016|
|   raj|  3|   cse|   1| spark|  3|   raj|2016|
|naveen|  4|   cse|   1|hadoop|  4| sunil|2015|
| venki|  5|   cse|   2|hadoop|  5|anvith|2015|
|prasad|  6|   cse|   2|hadoop|  6|   dev|2015|
|  ravi|  1|  mech|   1| spark|  1|  anil|2016|
|  raju|  2|  mech|   1| spark|  2|venkat|2016|
|  roja|  3|  mech|   1| spark|  3|   raj|2016|
|  anil|  4|  mech|   2|hadoop|  4| sunil|2015|
|  rani|  5|  mech|   2|hadoop|  5|anvith|2015|
|anvith|  6|  mech|   2|hadoop|  6|   dev|2015|
+------+---+------+----+------+---+------+----+


scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id != partbl.id").show
+------+---+------+----+------+---+------+----+
|  name| id|course|year|course| id|  name|year|
+------+---+------+----+------+---+------+----+
|  arun|  1|   cse|   1|hadoop|  5|anvith|2015|
|  arun|  1|   cse|   1|hadoop|  6|   dev|2015|
|  arun|  1|   cse|   1| spark|  3|   raj|2016|
|  arun|  1|   cse|   1|hadoop|  4| sunil|2015|
|  arun|  1|   cse|   1| spark|  2|venkat|2016|
| sunil|  2|   cse|   1| spark|  1|  anil|2016|
| sunil|  2|   cse|   1|hadoop|  5|anvith|2015|
| sunil|  2|   cse|   1|hadoop|  6|   dev|2015|
| sunil|  2|   cse|   1| spark|  3|   raj|2016|
| sunil|  2|   cse|   1|hadoop|  4| sunil|2015|
|   raj|  3|   cse|   1| spark|  1|  anil|2016|
|   raj|  3|   cse|   1|hadoop|  5|anvith|2015|
|   raj|  3|   cse|   1|hadoop|  6|   dev|2015|
|   raj|  3|   cse|   1|hadoop|  4| sunil|2015|
|   raj|  3|   cse|   1| spark|  2|venkat|2016|
|naveen|  4|   cse|   1| spark|  1|  anil|2016|
|naveen|  4|   cse|   1|hadoop|  5|anvith|2015|
|naveen|  4|   cse|   1|hadoop|  6|   dev|2015|
|naveen|  4|   cse|   1| spark|  3|   raj|2016|
|naveen|  4|   cse|   1| spark|  2|venkat|2016|
+------+---+------+----+------+---+------+----+
only showing top 20 rows


scala> sqlContext.sql("select hivetbl.*, partbl.* from hivetbl join partbl on hivetbl.id != partbl.id").foreach(println)
[arun,1,cse,1,hadoop,5,anvith,2015]
[arun,1,cse,1,hadoop,6,dev,2015]
[arun,1,cse,1,spark,3,raj,2016]
[arun,1,cse,1,hadoop,4,sunil,2015]
[arun,1,cse,1,spark,2,venkat,2016]
[sunil,2,cse,1,spark,1,anil,2016]
[sunil,2,cse,1,hadoop,5,anvith,2015]
[sunil,2,cse,1,hadoop,6,dev,2015]
[sunil,2,cse,1,spark,3,raj,2016]
[sunil,2,cse,1,hadoop,4,sunil,2015]
[raj,3,cse,1,spark,1,anil,2016]
[raj,3,cse,1,hadoop,5,anvith,2015]
[raj,3,cse,1,hadoop,6,dev,2015]
[raj,3,cse,1,hadoop,4,sunil,2015]
[raj,3,cse,1,spark,2,venkat,2016]
[naveen,4,cse,1,spark,1,anil,2016]
[naveen,4,cse,1,hadoop,5,anvith,2015]
[naveen,4,cse,1,hadoop,6,dev,2015]
[naveen,4,cse,1,spark,3,raj,2016]
[naveen,4,cse,1,spark,2,venkat,2016]
[venki,5,cse,2,spark,1,anil,2016]
[venki,5,cse,2,hadoop,6,dev,2015]
[venki,5,cse,2,spark,3,raj,2016]
[venki,5,cse,2,hadoop,4,sunil,2015]
[venki,5,cse,2,spark,2,venkat,2016]
[prasad,6,cse,2,spark,1,anil,2016]
[prasad,6,cse,2,hadoop,5,anvith,2015]
[prasad,6,cse,2,spark,3,raj,2016]
[prasad,6,cse,2,hadoop,4,sunil,2015]
[prasad,6,cse,2,spark,2,venkat,2016]
[sudha,7,cse,2,spark,1,anil,2016]
[sudha,7,cse,2,hadoop,5,anvith,2015]
[sudha,7,cse,2,hadoop,6,dev,2015]
[sudha,7,cse,2,spark,3,raj,2016]
[sudha,7,cse,2,hadoop,4,sunil,2015]
[sudha,7,cse,2,spark,2,venkat,2016]
[ravi,1,mech,1,hadoop,5,anvith,2015]
[ravi,1,mech,1,hadoop,6,dev,2015]
[ravi,1,mech,1,spark,3,raj,2016]
[ravi,1,mech,1,hadoop,4,sunil,2015]
[ravi,1,mech,1,spark,2,venkat,2016]
[raju,2,mech,1,spark,1,anil,2016]
[raju,2,mech,1,hadoop,5,anvith,2015]
[raju,2,mech,1,hadoop,6,dev,2015]
[raju,2,mech,1,spark,3,raj,2016]
[raju,2,mech,1,hadoop,4,sunil,2015]
[roja,3,mech,1,spark,1,anil,2016]
[roja,3,mech,1,hadoop,5,anvith,2015]
[roja,3,mech,1,hadoop,6,dev,2015]
[roja,3,mech,1,hadoop,4,sunil,2015]
[roja,3,mech,1,spark,2,venkat,2016]
[anil,4,mech,2,spark,1,anil,2016]
[anil,4,mech,2,hadoop,5,anvith,2015]
[anil,4,mech,2,hadoop,6,dev,2015]
[anil,4,mech,2,spark,3,raj,2016]
[anil,4,mech,2,spark,2,venkat,2016]
[rani,5,mech,2,spark,1,anil,2016]
[rani,5,mech,2,hadoop,6,dev,2015]
[rani,5,mech,2,spark,3,raj,2016]
[rani,5,mech,2,hadoop,4,sunil,2015]
[rani,5,mech,2,spark,2,venkat,2016]
[anvith,6,mech,2,spark,1,anil,2016]
[anvith,6,mech,2,hadoop,5,anvith,2015]
[anvith,6,mech,2,spark,3,raj,2016]
[anvith,6,mech,2,hadoop,4,sunil,2015]
[anvith,6,mech,2,spark,2,venkat,2016]
[madhu,7,mech,2,spark,1,anil,2016]
[madhu,7,mech,2,hadoop,5,anvith,2015]
[madhu,7,mech,2,hadoop,6,dev,2015]
[madhu,7,mech,2,spark,3,raj,2016]
[madhu,7,mech,2,hadoop,4,sunil,2015]
[madhu,7,mech,2,spark,2,venkat,2016]


mysql> select * from student;
+----+-------------+-------+------+
| id | name        | class | mark |
+----+-------------+-------+------+
|  1 | John Deo    | Four  |   75 |
|  2 | Max Ruin    | Three |   85 |
|  3 | Arnold      | Three |   55 |
|  4 | Krish Star  | Four  |   60 |
|  5 | John Mike   | Four  |   60 |
|  6 | Alex John   | Four  |   55 |
|  7 | My John Rob | Fifth |   78 |
|  8 | Asruid      | Five  |   85 |
|  9 | Tes Qry     | Six   |   78 |
| 10 | Big John    | Four  |   55 |
| 11 | Ronald      | Six   |   89 |
| 12 | Recky       | Six   |   94 |
| 13 | Kty         | Seven |   88 |
| 14 | Bigy        | Seven |   88 |
| 15 | Tade Row    | Four  |   88 |
| 16 | Gimmy       | Four  |   88 |
| 17 | Tumyu       | Six   |   54 |
| 18 | Honny       | Five  |   75 |
| 19 | Tinny       | Nine  |   18 |
| 20 | Jackly      | Nine  |   65 |
| 21 | Babby John  | Four  |   69 |
| 22 | Reggid      | Seven |   55 |
| 23 | Herod       | Eight |   79 |
| 24 | Tiddy Now   | Seven |   78 |
| 25 | Giff Tow    | Seven |   88 |
| 26 | Crelea      | Seven |   79 |
| 27 | Big Nose    | Three |   81 |
| 28 | Rojj Base   | Seven |   86 |
| 29 | Tess Played | Seven |   55 |
| 30 | Reppy Red   | Six   |   79 |
| 31 | Marry Toeey | Four  |   88 |
| 32 | Binn Rott   | Seven |   90 |
| 33 | Kenn Rein   | Six   |   96 |
| 34 | Gain Toe    | Seven |   69 |
| 35 | Rows Noump  | Six   |   88 |
+----+-------------+-------+------+
35 rows in set (0.00 sec)




val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/sqoop", "student", prop)

jdbcDF.show()



scala> val prop = new java.util.Properties
prop: java.util.Properties = {}

scala> prop.setProperty("driver","com.mysql.jdbc.Driver")
res35: Object = null

scala> prop.setProperty("user","root")
res36: Object = null

scala> prop.setProperty("password","hadoop")
res37: Object = null

scala> 

scala> val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/sqoop", "student", prop)
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string, class: string, mark: int]

scala> 

scala> jdbcDF.show()
+---+-----------+-----+----+
| id|       name|class|mark|
+---+-----------+-----+----+
|  1|   John Deo| Four|  75|
|  2|   Max Ruin|Three|  85|
|  3|     Arnold|Three|  55|
|  4| Krish Star| Four|  60|
|  5|  John Mike| Four|  60|
|  6|  Alex John| Four|  55|
|  7|My John Rob|Fifth|  78|
|  8|     Asruid| Five|  85|
|  9|    Tes Qry|  Six|  78|
| 10|   Big John| Four|  55|
| 11|     Ronald|  Six|  89|
| 12|      Recky|  Six|  94|
| 13|        Kty|Seven|  88|
| 14|       Bigy|Seven|  88|
| 15|   Tade Row| Four|  88|
| 16|      Gimmy| Four|  88|
| 17|      Tumyu|  Six|  54|
| 18|      Honny| Five|  75|
| 19|      Tinny| Nine|  18|
| 20|     Jackly| Nine|  65|
+---+-----------+-----+----+
only showing top 20 rows






Thursday, 22 September 2016

SPARK BASICS DAY 1 Practice on 23 Sept 2016















Spark RDD operations
----------------------------------------
command line options in spark:

Scala => spark-shell
Python => pyspark
R         => SparkR

----------------------------------------
// create a RDD from scala collection

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27


// get the number of partitions from RDD

scala> rdd.getNumPartitions
res1: Int = 4


// create a RDD from scala collection using number of partitions

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27


// get the number of partitions from RDD

scala> rdd.getNumPartitions
res2: Int = 2

// create a RDD from scala collection using number of partitions

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27

// get the number of partitions from RDD

scala> rdd.getNumPartitions
res3: Int = 1

----------------------------------------
// create a RDD from scala collection

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27

// sum the RDD data

scala> rdd.sum
res4: Double = 21.0                                                             


// min the RDD data

scala> rdd.min
res5: Int = 1


// max the RDD data

scala> rdd.max
res6: Int = 6


// transform the RDD data (x => x + 1)

scala> val rdd1 = rdd.map(x => x + 1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:29

scala> rdd.foreach(println)
1
2
3
5
6
4

scala> rdd1.foreach(println)
2
5
3
4
6
7


// transform the RDD data (x => x > 1)

scala> val rdd2 = rdd.filter(x => x > 3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:29

scala> rdd2.foreach(println)
4
5
6

----------------------------------------

We can prepare a RDD with 2 approaches

1. from collection

val rdd = sc.parallelize(collection, number of partitions)


2. from data source(text / csv / tsv / json files....)

val rdd = sc.textFile(path of the file, number of partitions )

Note: file can be from Local File System / HDFS / S3 / FTP ....

----------------------------------------

scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput")
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:27

scala> rdd.getNumPartitions
res10: Int = 2

scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:27

scala> rdd.getNumPartitions
res11: Int = 1


scala> rdd.foreach(println)
I am going
to hyd
I am learning
hadoop course

----------------------------------------

scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27

scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course


scala> fileRDD.flatMap(line => line.split(" ")).foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course


scala> val wordsRDD = fileRDD.flatMap(line => line.split(" "))
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:29

scala> val tupleRDD = wordsRDD.map( word => (word, 1))
tupleRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:31


scala> tupleRDD.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(hyd,1)
(I,1)
(am,1)
(learning,1)
(hadoop,1)
(course,1)


scala> val wordcountRDD = tupleRDD.reduceByKey( (a,b) => a + b )
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:33

scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)


scala> val wordcountRDD = tupleRDD.reduceByKey( _ + _)
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:33

scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)


scala> wordcountRDD.sortBy( t => t._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)

scala> wordcountRDD.sortBy( t => t._2).foreach(println)
(learning,1)
(hadoop,1)
(hyd,1)
(to,1)
(going,1)
(course,1)
(am,2)
(I,2)



scala> wordcountRDD.sortBy( t => t._1, false).foreach(println)
(to,1)
(learning,1)
(hyd,1)
(hadoop,1)
(going,1)
(course,1)
(am,2)
(I,2)

scala> wordcountRDD.sortBy( t => t._1, true).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)


scala> wordcountRDD.sortBy( t => t._1, true).getNumPartitions
res24: Int = 1

scala> wordcountRDD.sortBy( t => t._1, true, 2).getNumPartitions
res25: Int = 2

----------------------------------------------
Word Count in Spark using Scala
----------------------------------------------

val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
val wordsRDD = fileRDD.flatMap(line => line.split(" "))
val tupleRDD = wordsRDD.map( word => (word, 1))
val wordcountRDD = tupleRDD.reduceByKey( _ + _)


(or)

val wordcountRDD = fileRDD.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey( _ + _)


wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-op")


val sortedRDD = wordcountRDD.sortBy( t => t._1, true)
sortedRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-sorted-op")
sortedRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-sorted-op")


----------------------------------------------
Word Count in Spark using Python
----------------------------------------------

fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
wordsRDD = fileRDD.flatMap(lambda line : line.split(" "))
tupleRDD = wordsRDD.map(lambda word : (word, 1))
wordcountRDD = tupleRDD.reduceByKey( lambda a, b : a + b)


(or)

wordcountRDD = fileRDD.flatMap(lambda line : line.split(" "))).map( lambda word : (word, 1)).reduceByKey( lambda a, b : a + b )


wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-python-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-python-op")



----------------------------------------------
Grep Job in Spark using Scala
----------------------------------------------

scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at textFile at <console>:27

scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course

scala> val grepRDD = fileRDD.filter(line => line.contains("am"))
grepRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at filter at <console>:29

scala> grepRDD.foreach(println)
I am going
I am learning



Monday, 19 September 2016

SCALA BASICS DAY 2 Practice on 20 Sept 2016






















--------------------------------------------------------

scala> val list1 = List(1,2,3)
list1: List[Int] = List(1, 2, 3)

scala> val list2 = List("aa","bb","cc")
list2: List[String] = List(aa, bb, cc)

scala> val arr1 = Array(1,2,3)
arr1: Array[Int] = Array(1, 2, 3)

scala> val arr2 = Array("aa","bb","cc")
arr2: Array[String] = Array(aa, bb, cc)

scala> val seq1 = Seq(1,2,3)
seq1: Seq[Int] = List(1, 2, 3)

scala> val set1 = Set(1,2,3)
set1: scala.collection.immutable.Set[Int] = Set(1, 2, 3)

scala> val vec1 = Vector(1,2,3)
vec1: scala.collection.immutable.Vector[Int] = Vector(1, 2, 3)

--------------------------------------------------------

scala> val map1 = Map( 1 -> "aa", 2 -> "bb", 3 -> "cc")
map1: scala.collection.immutable.Map[Int,String] = Map(1 -> aa, 2 -> bb, 3 -> cc)

scala> list1
res0: List[Int] = List(1, 2, 3)

scala> list2
res1: List[String] = List(aa, bb, cc)

scala> val map2 = list1.zip(list2)
map2: List[(Int, String)] = List((1,aa), (2,bb), (3,cc))

scala> val map12 = map2.toMap
map12: scala.collection.immutable.Map[Int,String] = Map(1 -> aa, 2 -> bb, 3 -> cc)

--------------------------------------------------------

scala> val list1 = List(1,2,3)
list1: List[Int] = List(1, 2, 3)

scala> val list2 = 1 :: 2 :: 3 :: Nil
list2: List[Int] = List(1, 2, 3)

scala> val list3 = (1 :: (2 :: (3 :: Nil)))
list3: List[Int] = List(1, 2, 3)

scala> val list4 = 0 :: list3
list4: List[Int] = List(0, 1, 2, 3)

scala> val list5 = list4 :: 4
<console>:12: error: value :: is not a member of Int
       val list5 = list4 :: 4
                         ^

scala> val list5 = list4 :: 4 :: Nil
list5: List[Any] = List(List(0, 1, 2, 3), 4)


scala> val list5 = list4 ::: 4 :: Nil
list5: List[Int] = List(0, 1, 2, 3, 4)

--------------------------------------------------------

scala> list1
res2: List[Int] = List(1, 2, 3)

scala> val list2 = list1.map((x : Int) => {x + 1})
list2: List[Int] = List(2, 3, 4)

scala> val list2 = list1.map((x : Int) => x + 1)
list2: List[Int] = List(2, 3, 4)

scala> val list2 = list1.map(x => x + 1)
list2: List[Int] = List(2, 3, 4)

scala> val list2 = list1.map( _ + 1)
list2: List[Int] = List(2, 3, 4)


scala> list1
res3: List[Int] = List(1, 2, 3)

scala> val list2 = list1.filter(x => x > 1)
list2: List[Int] = List(2, 3)

scala> val list2 = list1.filter(_ > 1)
list2: List[Int] = List(2, 3)



scala> val list1 = List(List(1,2,3), List(4,5,6))
list1: List[List[Int]] = List(List(1, 2, 3), List(4, 5, 6))

scala> val list2 = list1.flatMap(x => x)
list2: List[Int] = List(1, 2, 3, 4, 5, 6)


scala> val list1 = List(List("I am going", "to hyd"), List("I am learning", "hadoop course"))
list1: List[List[String]] = List(List(I am going, to hyd), List(I am learning, hadoop course))

scala> val list2 = list1.flatMap(x => x)
list2: List[String] = List(I am going, to hyd, I am learning, hadoop course)

scala> val list3 = list2.flatMap(x => x)
list3: List[Char] = List(I,  , a, m,  , g, o, i, n, g, t, o,  , h, y, d, I,  , a, m,  , l, e, a, r, n, i, n, g, h, a, d, o, o, p,  , c, o, u, r, s, e)


scala> val list3 = list2.flatMap(x => x.split(""))
list3: List[String] = List(I, " ", a, m, " ", g, o, i, n, g, t, o, " ", h, y, d, I, " ", a, m, " ", l, e, a, r, n, i, n, g, h, a, d, o, o, p, " ", c, o, u, r, s, e)

scala> val list3 = list2.flatMap(x => x.split(" "))
list3: List[String] = List(I, am, going, to, hyd, I, am, learning, hadoop, course)


scala> val list3 = list2.flatMap(x => x.split(" "))
list3: List[String] = List(I, am, going, to, hyd, I, am, learning, hadoop, course)

scala> val map = list3.map(x => (x , 1))
map: List[(String, Int)] = List((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))


scala> list3.count(x => true)
res10: Int = 10

scala> list3.count(x => x.length > 2)
res11: Int = 5


scala> val group = list3.groupBy(x => x)
group: scala.collection.immutable.Map[String,List[String]] = Map(course -> List(course), am -> List(am, am), going -> List(going), I -> List(I, I), hyd -> List(hyd), to -> List(to), hadoop -> List(hadoop), learning -> List(learning))



scala> val wordcount = group.map(x => (x._1, x._2))
wordcount: scala.collection.immutable.Map[String,List[String]] = Map(course -> List(course), am -> List(am, am), going -> List(going), I -> List(I, I), hyd -> List(hyd), to -> List(to), hadoop -> List(hadoop), learning -> List(learning))

scala> val wordcount = group.map(x => (x._1, x._2.count(y => true)))
wordcount: scala.collection.immutable.Map[String,Int] = Map(course -> 1, am -> 2, going -> 1, I -> 2, hyd -> 1, to -> 1, hadoop -> 1, learning -> 1)


scala> wordcount.foreach(x => println(x))
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)


scala> wordcount.foreach(println)
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)



scala> wordcount.toList.sorted.foreach(println))
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)



scala> wordcount.toList.sortBy(x => x._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)


scala> wordcount.toList.sortBy(x => x._2).foreach(println)
(course,1)
(going,1)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)
(am,2)
(I,2)




scala> wordcount.toList.sortWith((x,y) => x._1 > y._1).foreach(println)
(to,1)
(learning,1)
(hyd,1)
(hadoop,1)
(going,1)
(course,1)
(am,2)
(I,2)


scala> wordcount.toList.sortWith((x,y) => x._1 < y._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)


scala> wordcount.toList.sortWith((x,y) => x._2 < y._2).foreach(println)
(course,1)
(going,1)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)
(am,2)
(I,2)


scala> wordcount.toList.sortWith((x,y) => x._2 > y._2).foreach(println)
(am,2)
(I,2)
(course,1)
(going,1)
(hyd,1)
(to,1)
(hadoop,1)
(learning,1)

Related Posts Plugin for WordPress, Blogger...