# slope one python版与spark sql版本代码分享

4,779次阅读

## slopeone原理

Slope One  是一种很简单的类比类似的算法， 其实大体意思 就是A B 不同的用户 对不同的 item1 item2 打分

 user item1 item2 A 7 2 B 8 3 C 9

1. 计算物品之间评分差的平均值，记为物品间的评分偏差；

2.根据物品间的评分偏差和用户的历史评分，给用户生成预测评分高的推荐物品列表。

## python版本

# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>.
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, version 2 or later, which is
# incorporated herein by reference.

class SlopeOne(object):
def __init__(self):
self.diffs = {}
self.freqs = {}

def predict(self, userprefs):
preds, freqs = {}, {}
for item, rating in userprefs.iteritems():
for diffitem, diffratings in self.diffs.iteritems():
try:
freq = self.freqs[diffitem][item]
except KeyError:
continue
preds.setdefault(diffitem, 0.0)
freqs.setdefault(diffitem, 0)
preds[diffitem] += freq * (diffratings[item] + rating)
freqs[diffitem] += freq
return dict([(item, value / freqs[item])
for item, value in preds.iteritems()
if item not in userprefs and freqs[item] > 0])

def update(self, userdata):
for ratings in userdata.itervalues():
for item1, rating1 in ratings.iteritems():
self.freqs.setdefault(item1, {})
self.diffs.setdefault(item1, {})
for item2, rating2 in ratings.iteritems():
self.freqs[item1].setdefault(item2, 0)
self.diffs[item1].setdefault(item2, 0.0)
self.freqs[item1][item2] += 1
self.diffs[item1][item2] += rating1 - rating2
for item1, ratings in self.diffs.iteritems():
for item2 in ratings:
ratings[item2] /= self.freqs[item1][item2]

if __name__ == '__main__':
userdata = dict(
alice=dict(squid=1.0,
cuttlefish=0.5,
octopus=0.2),
bob=dict(squid=1.0,
octopus=0.5,
nautilus=0.2),
carole=dict(squid=0.2,
octopus=1.0,
cuttlefish=0.4,
nautilus=0.4),
dave=dict(cuttlefish=0.9,
octopus=0.4,
nautilus=0.5),
)
s = SlopeOne()
s.update(userdata)
print s.predict(dict(squid=0.4))

## spark版本

spark主要使用spark sql模块操作

import sys
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import Rating
from pyspark.sql import SQLContext
import operator
import math

conf = SparkConf().setAppName("Slope One")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

training_inputs='/home/cuijian/slope_one_train.txt'
testing_inputs='/home/cuijian/slope_one_test.txt'
def get_tuple(line):
elems = line.split(',')
return int(elems[0]), int(elems[1]), float(elems[2])

def main():
training_in = sc.textFile(training_inputs)
testing_in = sc.textFile(testing_inputs)

training_data = training_in.map(get_tuple)
testing_data = testing_in.map(get_tuple).cache()

training_df = sqlContext.createDataFrame(training_data, ['uid', 'mid', 'rating'])
testing_df = sqlContext.createDataFrame(testing_data, ['uid', 'mid', 'rating'])

training_df.registerTempTable("TrainingTable")
testing_df.registerTempTable("TestingTable")

joined_user_df = sqlContext.sql("""
SELECT t1.uid, t1.mid as mid1, t2.mid as mid2, (t1.rating-t2.rating) as rating_diff FROM
TrainingTable t1
JOIN
TrainingTable t2
ON (t1.uid = t2.uid)
""")

joined_user_df.registerTempTable("JoinedUserTable")
mpair_dev_c_df = sqlContext.sql("""
SELECT mid1, mid2, sum(rating_diff)/count(rating_diff) as dev, count(rating_diff) as c FROM
JoinedUserTable
Group By mid1, mid2
""")

mpair_dev_c_df.registerTempTable('mpair_dev_c_dfTable')

result=sqlContext.sql('SELECT a.mid,b.mid2,a.rating-b.dev  FROM  TestingTable a JOIN mpair_dev_c_dfTable b ON a.mid=b.mid1 ')

result.show()

# testing_training_df = sqlContext.sql("""
# SELECT t1.uid, t1.mid as midj, t2.mid as midi, t1.rating as rating_j, t2.rating as rating_i FROM
# TestingTable t1
# JOIN
# TrainingTable t2
# ON (t1.uid = t2.uid)
# """)

# cond = [testing_training_df.midj == mpair_dev_c_df.mid1, testing_training_df.midi == mpair_dev_c_df.mid2]
# df = testing_training_df.join(mpair_dev_c_df, cond)

# df.registerTempTable("AllTable")
# ps = sqlContext.sql("""
# SELECT uid, midj, sum((dev+rating_i)*c)/sum(c) as p, rating_j as true_rating FROM
# AllTable
# Group By uid, midj, rating_j
# """)

# ps.registerTempTable("PTable")
# rmse = sqlContext.sql("""
# SELECT sqrt(sum(power(true_rating-p, 2))/count(true_rating)) as RMSE FROM
# PTable
# """)
# rmse.show()

if __name__ == '__main__':
main()

http://www.codexiu.cn/spark/blog/13452/