slopeone原理
Slope One 是一种很简单的类比类似的算法, 其实大体意思 就是A B 不同的用户 对不同的 item1 item2 打分
那么 一个新用户C 对item的打分,就是 该用户的打分 减去 其他用户打分的平均 就是C用户对未知tem的打分
他有个很好的有点,数据少的时候效果也很好。
user | item1 | item2 |
A | 7 | 2 |
B | 8 | 3 |
C | 9 |
那么 C的item2 打分 为 9-((7-2)+(8-3))/2=4
其实分成2步
1. 计算物品之间评分差的平均值,记为物品间的评分偏差;
2.根据物品间的评分偏差和用户的历史评分,给用户生成预测评分高的推荐物品列表。
python版本
# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>. # # This software may be used and distributed according to the terms # of the GNU General Public License, version 2 or later, which is # incorporated herein by reference. class SlopeOne(object): def __init__(self): self.diffs = {} self.freqs = {} def predict(self, userprefs): preds, freqs = {}, {} for item, rating in userprefs.iteritems(): for diffitem, diffratings in self.diffs.iteritems(): try: freq = self.freqs[diffitem][item] except KeyError: continue preds.setdefault(diffitem, 0.0) freqs.setdefault(diffitem, 0) preds[diffitem] += freq * (diffratings[item] + rating) freqs[diffitem] += freq return dict([(item, value / freqs[item]) for item, value in preds.iteritems() if item not in userprefs and freqs[item] > 0]) def update(self, userdata): for ratings in userdata.itervalues(): for item1, rating1 in ratings.iteritems(): self.freqs.setdefault(item1, {}) self.diffs.setdefault(item1, {}) for item2, rating2 in ratings.iteritems(): self.freqs[item1].setdefault(item2, 0) self.diffs[item1].setdefault(item2, 0.0) self.freqs[item1][item2] += 1 self.diffs[item1][item2] += rating1 - rating2 for item1, ratings in self.diffs.iteritems(): for item2 in ratings: ratings[item2] /= self.freqs[item1][item2] if __name__ == '__main__': userdata = dict( alice=dict(squid=1.0, cuttlefish=0.5, octopus=0.2), bob=dict(squid=1.0, octopus=0.5, nautilus=0.2), carole=dict(squid=0.2, octopus=1.0, cuttlefish=0.4, nautilus=0.4), dave=dict(cuttlefish=0.9, octopus=0.4, nautilus=0.5), ) s = SlopeOne() s.update(userdata) print s.predict(dict(squid=0.4))
spark版本
spark主要使用spark sql模块操作
import sys from pyspark import SparkConf, SparkContext from pyspark.mllib.recommendation import Rating from pyspark.sql import SQLContext import operator import math conf = SparkConf().setAppName("Slope One") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) training_inputs='/home/cuijian/slope_one_train.txt' testing_inputs='/home/cuijian/slope_one_test.txt' def get_tuple(line): elems = line.split(',') return int(elems[0]), int(elems[1]), float(elems[2]) def main(): training_in = sc.textFile(training_inputs) testing_in = sc.textFile(testing_inputs) training_data = training_in.map(get_tuple) testing_data = testing_in.map(get_tuple).cache() training_df = sqlContext.createDataFrame(training_data, ['uid', 'mid', 'rating']) testing_df = sqlContext.createDataFrame(testing_data, ['uid', 'mid', 'rating']) training_df.registerTempTable("TrainingTable") testing_df.registerTempTable("TestingTable") joined_user_df = sqlContext.sql(""" SELECT t1.uid, t1.mid as mid1, t2.mid as mid2, (t1.rating-t2.rating) as rating_diff FROM TrainingTable t1 JOIN TrainingTable t2 ON (t1.uid = t2.uid) """) joined_user_df.registerTempTable("JoinedUserTable") mpair_dev_c_df = sqlContext.sql(""" SELECT mid1, mid2, sum(rating_diff)/count(rating_diff) as dev, count(rating_diff) as c FROM JoinedUserTable Group By mid1, mid2 """) mpair_dev_c_df.registerTempTable('mpair_dev_c_dfTable') result=sqlContext.sql('SELECT a.mid,b.mid2,a.rating-b.dev FROM TestingTable a JOIN mpair_dev_c_dfTable b ON a.mid=b.mid1 ') result.show() # testing_training_df = sqlContext.sql(""" # SELECT t1.uid, t1.mid as midj, t2.mid as midi, t1.rating as rating_j, t2.rating as rating_i FROM # TestingTable t1 # JOIN # TrainingTable t2 # ON (t1.uid = t2.uid) # """) # cond = [testing_training_df.midj == mpair_dev_c_df.mid1, testing_training_df.midi == mpair_dev_c_df.mid2] # df = testing_training_df.join(mpair_dev_c_df, cond) # df.registerTempTable("AllTable") # ps = sqlContext.sql(""" # SELECT uid, midj, sum((dev+rating_i)*c)/sum(c) as p, rating_j as true_rating FROM # AllTable # Group By uid, midj, rating_j # """) # ps.registerTempTable("PTable") # rmse = sqlContext.sql(""" # SELECT sqrt(sum(power(true_rating-p, 2))/count(true_rating)) as RMSE FROM # PTable # """) # rmse.show() if __name__ == '__main__': main()
参考资料 http://www.codexiu.cn/spark/blog/13452/