1、首先下载分析文件网址为:http://grouplens.org/datasets/movielens/(MovieLens 1M Dataset 中的这个包 ml-1m.zip)
2、部分文件如下:
movies.dat:
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's 9::Sudden Death (1995)::Action 10::GoldenEye (1995)::Action|Adventure|Thriller 11::American President, The (1995)::Comedy|Drama|Romance 12::Dracula: Dead and Loving It (1995)::Comedy|Horror 13::Balto (1995)::Animation|Children's 14::Nixon (1995)::Drama 15::Cutthroat Island (1995)::Action|Adventure|Romance 16::Casino (1995)::Drama|Thriller 17::Sense and Sensibility (1995)::Drama|Romance 18::Four Rooms (1995)::Thriller 19::Ace Ventura: When Nature Calls (1995)::Comedy 20::Money Train (1995)::Action 21::Get Shorty (1995)::Action|Comedy|Drama 22::Copycat (1995)::Crime|Drama|Thriller 23::Assassins (1995)::Thriller 24::Powder (1995)::Drama|Sci-Fi 25::Leaving Las Vegas (1995)::Drama|Romance 26::Othello (1995)::Drama 27::Now and Then (1995)::Drama 28::Persuasion (1995)::Romance 29::City of Lost Children, The (1995)::Adventure|Sci-Fi 30::Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)::Drama 31::Dangerous Minds (1995)::Drama
ratings.dat
1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291 1::1197::3::978302268 1::1287::5::978302039 1::2804::5::978300719 1::594::4::978302268 1::919::4::978301368 1::595::5::978824268 1::938::4::978301752 1::2398::4::978302281 1::2918::4::978302124 1::1035::5::978301753 1::2791::4::978302188 1::2687::3::978824268 1::2018::4::978301777 1::3105::5::978301713 1::2797::4::978302039 1::2321::3::978302205 1::720::3::978300760 1::1270::5::978300055 1::527::5::978824195 1::2340::3::978300103
users.dat
1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117 4::M::45::7::02460 5::M::25::20::55455 6::F::50::9::55117 7::M::35::1::06810 8::M::25::12::11413 9::M::25::17::61614 10::F::35::1::95370 11::F::25::1::04093 12::M::25::12::32793 13::M::45::1::93304 14::M::35::0::60126 15::M::25::7::22903 16::F::35::0::20670 17::M::50::1::95350 18::F::18::3::95825 19::M::1::10::48073 20::M::25::14::55113 21::M::18::16::99353 22::M::18::15::53706 23::M::35::0::90049 24::F::25::7::10023 25::M::18::4::01609 26::M::25::7::23112 27::M::25::11::19130 28::F::25::1::14607 29::M::35::7::33407
package org.training.spark.core
import org.apache.spark._
/**
* 看过“Lord of the Rings, The (1978)”用户年龄和性别分布
*/
object MovieUserAnalyzer {
def main(args: Array[String]) {
var masterUrl = "local[1]"
var dataPath = "data/ml-1m/"
if (args.length > 0) {
masterUrl = args(0)
} else if(args.length > 1) {
dataPath = args(1)
}
// Create a SparContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("MovieUserAnalyzer")
val sc = new SparkContext(conf)
/**
* Step 1: Create RDDs
*/
val DATA_PATH = dataPath
val MOVIE_TITLE = "Lord of the Rings, The (1978)"
val MOVIE_ID = "2116"
val usersRdd = sc.textFile(DATA_PATH + "users.dat")
val ratingsRdd = sc.textFile(DATA_PATH + "ratings.dat")
/**
* Step 2: Extract columns from RDDs
*/
//users: RDD[(userID, (gender, age))]
val users = usersRdd.map(_.split("::")).map { x =>
(x(0), (x(1), x(2)))
}
//rating: RDD[Array(userID, movieID, ratings, timestamp)]
val rating = ratingsRdd.map(_.split("::"))
//usermovie: RDD[(userID, movieID)]
val usermovie = rating.map{ x =>
(x(0), x(1))
}.filter(_._2.equals(MOVIE_ID))
/**
* Step 3: join RDDs
*/
//useRating: RDD[(userID, (movieID, (gender, age))]
val userRating = usermovie.join(users)
//userRating.take(1).foreach(print)
//movieuser: RDD[(movieID, (movieTile, (gender, age))]
val userDistribution = userRating.map { x =>
(x._2._2, 1)
}.reduceByKey(_ + _)
userDistribution.foreach(println)
sc.stop()
}
}
package org.training.spark.core
import org.apache.spark._
import scala.collection.immutable.HashSet
/**
* 年龄段在“18-24”的男性年轻人,最喜欢看哪10部电影
*/
object PopularMovieAnalyzer {
def main(args: Array[String]) {
var masterUrl = "local[1]"
var dataPath = "data/ml-1m/"
if (args.length > 0) {
masterUrl = args(0)
} else if(args.length > 1) {
dataPath = args(1)
}
// Create a SparContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("PopularMovieAnalyzer")
val sc = new SparkContext(conf)
/**
* Step 1: Create RDDs
*/
val DATA_PATH = dataPath
val USER_AGE = "18"
val usersRdd = sc.textFile(DATA_PATH + "users.dat")
val moviesRdd = sc.textFile(DATA_PATH + "movies.dat")
val ratingsRdd = sc.textFile(DATA_PATH + "ratings.dat")
/**
* Step 2: Extract columns from RDDs
*/
//users: RDD[(userID, age)]
val users = usersRdd.map(_.split("::")).map { x =>
(x(0), x(2))
}.filter(_._2.equals(USER_AGE))
//Array[String]
val userlist = users.map(_._1).collect()
//broadcast
val userSet = HashSet() ++ userlist
val broadcastUserSet = sc.broadcast(userSet)
/**
* Step 3: map-side join RDDs
*/
val topKmovies = ratingsRdd.map(_.split("::")).map{ x =>
(x(0), x(1))
}.filter { x =>
broadcastUserSet.value.contains(x._1)
}.map{ x=>
(x._2, 1)
}.reduceByKey(_ + _).map{ x =>
(x._2, x._1)
}.sortByKey(false).map{ x=>
(x._2, x._1)
}.take(10)
/**
* Transfrom filmID to fileName
*/
val movieID2Name = moviesRdd.map(_.split("::")).map { x =>
(x(0), x(1))
}.collect().toMap
topKmovies.map(x => (movieID2Name.getOrElse(x._1, null), x._2)).foreach(println)
println(System.currentTimeMillis())
sc.stop()
}
}
package org.training.spark.core
import org.apache.spark._
import scala.collection.immutable.HashSet
/**
* 得分最高的10部电影;看过电影最多的前10个人;女性看多最多的10部电影;男性看过最多的10部电影
*/
object TopKMovieAnalyzer {
def main(args: Array[String]) {
var masterUrl = "local[1]"
var dataPath = "data/ml-1m/"
if (args.length > 0) {
masterUrl = args(0)
} else if(args.length > 1) {
dataPath = args(1)
}
// Create a SparContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("TopKMovieAnalyzer")
val sc = new SparkContext(conf)
/**
* Step 1: Create RDDs
*/
val DATA_PATH = dataPath
val ratingsRdd = sc.textFile(DATA_PATH + "ratings.dat")
/**
* Step 2: Extract columns from RDDs
*/
//users: RDD[(userID, movieID, score)]
val ratings = ratingsRdd.map(_.split("::")).map { x =>
(x(0), x(1), x(2))
}.cache
/**
* Step 3: analyze result
*/
val topKScoreMostMovie = ratings.map{x =>
(x._2, (x._3.toInt, 1))
}.reduceByKey { (v1, v2) =>
(v1._1 + v2._1, v1._2 + v2._2)
}.map { x =>
(x._2._1.toFloat / x._2._2.toFloat, x._1)
}.sortByKey(false).
take(10).
foreach(println)
val topKmostPerson = ratings.map{ x =>
(x._1, 1)
}.reduceByKey(_ + _).
map(x => (x._2, x._1)).
sortByKey(false).
take(10).
foreach(println)
sc.stop()
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于