天天看点

Spark协同过滤推荐

项目最近需要给用户推荐潜在的店铺,当时也在考虑是用协同过滤推荐还是用ALS训练模型,但是考虑到数据量是以一年为周期每天更新跑的,模型就算训练出来也没多大用处。耗时,调参,没有必要。所以还是决定使用协同过滤推荐。而我采用的是同现相似度矩阵来计算的。

相关的原理介绍我这里就不再重复了,大家可以搜搜,有很多源码,只是对于矩阵不熟悉的人想告诉大家每个步骤计算得到的是什么内容,看着rdd一步一步往下走但是不知所云就不好了。而且如果业务有新的需求,需要更改源码的计算逻辑,你都不知道在哪一步修改。最后再说说我的经验之谈。

首先贴下最基础的源码,然后我再说明每个rdd到底是什么。欧氏距离相似度、余弦相似度的代码我就不贴了。

package recommend

import scala.math._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

/**
 * 用户评分.
 * @param userid 用户
 * @param itemid 评分物品
 * @param pref 评分
 */
case class ItemPref(
  val userid: String,
  val itemid: String,
  val pref: Double) extends Serializable
/**
 * 用户推荐.
 * @param userid 用户
 * @param itemid 推荐物品
 * @param pref 评分
 */
case class UserRecomm(
  val userid: String,
  val itemid: String,
  val pref: Double) extends Serializable
/**
 * 相似度.
 * @param itemid1 物品
 * @param itemid2 物品
 * @param similar 相似度
 */
case class ItemSimi(
  val itemid1: String,
  val itemid2: String,
  val similar: Double) extends Serializable

/**
 * 相似度计算.
 * 支持:同现相似度、欧氏距离相似度、余弦相似度
 *
 */
class ItemSimilarity extends Serializable {

  /**
   * 相似度计算.
   * @param user_rdd 用户评分
   * @param stype 计算相似度公式
   * @param RDD[ItemSimi] 返回物品相似度
   *
   */
  def Similarity(user_rdd: RDD[ItemPref], stype: String): (RDD[ItemSimi]) = {
    val simil_rdd = stype match {
      case "cooccurrence" =>
        ItemSimilarity.CooccurrenceSimilarity(user_rdd)
      case _ =>
        ItemSimilarity.CooccurrenceSimilarity(user_rdd)
    }
    simil_rdd
  }

}

object ItemSimilarity {

  /**
   * 同现相似度矩阵计算.
   * w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))
   * @param user_rdd 用户评分
   * @param RDD[ItemSimi] 返回物品相似度
   *
   */
  def CooccurrenceSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
    // 0 数据做准备
    val user_rdd1 = user_rdd.map(f => (f.userid, f.itemid, f.pref))
    val user_rdd2 = user_rdd1.map(f => (f._1, f._2))
    // 1 (用户:物品) 笛卡尔积 (用户:物品) => 物品:物品组合     
    val user_rdd3 = user_rdd2.join(user_rdd2)
    val user_rdd4 = user_rdd3.map(f => (f._2, 1))
    // 2 物品:物品:频次 
    val user_rdd5 = user_rdd4.reduceByKey((x, y) => x + y)
    // 3 对角矩阵 
    val user_rdd6 = user_rdd5.filter(f => f._1._1 == f._1._2)
    // 4 非对角矩阵 
    val user_rdd7 = user_rdd5.filter(f => f._1._1 != f._1._2)
    // 5 计算同现相似度(物品1,物品2,同现频次)
    val user_rdd8 = user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).
      join(user_rdd6.map(f => (f._1._1, f._2)))
    val user_rdd9 = user_rdd8.map(f => (f._2._1._2, (f._2._1._1,
      f._2._1._2, f._2._1._3, f._2._2)))
    val user_rdd10 = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
    val user_rdd11 = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
    val user_rdd12 = user_rdd11.map(f => (f._1, f._2, (f._3 / sqrt(f._4 * f._5))))
    // 6 结果返回
    user_rdd12.map(f => ItemSimi(f._1, f._2, f._3))
  }

}


           
package recommend

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

  /**
   * 用户推荐计算.
   * 根据物品相似度、用户评分、指定最大推荐数量进行用户推荐
   */

class RecommendedItem {
  /**
   * 用户推荐计算.
   * @param items_similar 物品相似度
   * @param user_prefer 用户评分
   * @param r_number 推荐数量
   * @param RDD[UserRecomm] 返回用户推荐物品
   *
   */
  def Recommend(items_similar: RDD[ItemSimi],
    user_prefer: RDD[ItemPref],
    r_number: Int): (RDD[UserRecomm]) = {
    //   0 数据准备  
    val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
    val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
    //   1 矩阵计算——i行与j列join
    val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
      join(user_prefer1.map(f => (f._2, (f._1, f._3))))
    //   2 矩阵计算——i行与j列元素相乘
    val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
    //   3 矩阵计算——用户:元素累加求和
    val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
    //   4 矩阵计算——用户:对结果过滤已有I2
    val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
      filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
    //   5 矩阵计算——用户:用户对结果排序,过滤
    val rdd_app1_R6 = rdd_app1_R5.groupByKey()
    val rdd_app1_R7 = rdd_app1_R6.map(f => {
      val i2 = f._2.toBuffer
      val i2_2 = i2.sortBy(_._2)
      if (i2_2.length > r_number) i2_2.remove(0, (i2_2.length - r_number))
      (f._1, i2_2.toIterable)
    })
    val rdd_app1_R8 = rdd_app1_R7.flatMap(f => {
      val id2 = f._2
      for (w <- id2) yield (f._1, w._1, w._2)
    })
    rdd_app1_R8.map(f => UserRecomm(f._1, f._2, f._3))
  }

  /**
   * 用户推荐计算.
   * @param items_similar 物品相似度
   * @param user_prefer 用户评分
   * @param RDD[UserRecomm] 返回用户推荐物品
   *
   */
  def Recommend(items_similar: RDD[ItemSimi],
    user_prefer: RDD[ItemPref]): (RDD[UserRecomm]) = {
    //   0 数据准备  
    val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
    val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
    //   1 矩阵计算——i行与j列join
    val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
      join(user_prefer1.map(f => (f._2, (f._1, f._3))))
    //   2 矩阵计算——i行与j列元素相乘
    val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
    //   3 矩阵计算——用户:元素累加求和
    val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
    //   4 矩阵计算——用户:对结果过滤已有I2
    val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
      filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
    //   5 矩阵计算——用户:用户对结果排序,过滤
    val rdd_app1_R6 = rdd_app1_R5.map(f => (f._1, f._2._1, f._2._2)).
      sortBy(f => (f._1, f._3))
    rdd_app1_R6.map(f => UserRecomm(f._1, f._2, f._3))
  }

}
           
package recommend

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD

object ItemCF {
  def main(args: Array[String]) {

    //0 构建Spark对象
    val conf = new SparkConf().setAppName("ItemCF")
    val sc = new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)

    //1 读取样本数据
    val data_path = "data.txt"
    val data = sc.textFile(data_path)
    val userdata = data.map(_.split(",")).map(f => (ItemPref(f(0), f(1), f(2).toDouble))).cache()

    //2 建立模型
    val mysimil = new ItemSimilarity()
    val simil_rdd1 = mysimil.Similarity(userdata, "cooccurrence")
    val recommd = new RecommendedItem
    val recommd_rdd1 = recommd.Recommend(simil_rdd1, userdata, 30)

    //3 打印结果
    println(s"物品相似度矩阵: ${simil_rdd1.count()}")
    simil_rdd1.collect().foreach { ItemSimi =>
      println(ItemSimi.itemid1 + ", " + ItemSimi.itemid2 + ", " + ItemSimi.similar)
    }
    println(s"用戶推荐列表: ${recommd_rdd1.count()}")
    recommd_rdd1.collect().foreach { UserRecomm =>
      println(UserRecomm.userid + ", " + UserRecomm.itemid + ", " + UserRecomm.pref)
    }    

  }
}

           

相似度计算的逻辑大致是先计算出物品与物品之间关联的矩阵,然后用物品关联的矩阵X用户购买物品矩阵,得出用户可能购买的矩阵。

原始数据说明data.txt

第一列是用户,第二列是商品,第三列可以理解为评分,比如我购买了一瓶啤酒,觉得味道不错,评价0.8分(满分1分),但是用户的评价数据一般是很难收集到的,如果收集到肯定要放入矩阵计算。但是大部分情况下收集不到,所以我们直接用1和0来代替,1是购买,0是未购买。未购买的直接提前就过滤掉,不放入模型计算。

1,1,1

1,2,1

2,1,1

2,3,1

3,3,1

3,4,1

4,2,1

4,4,1

5,1,1

5,2,1

5,3,1

6,4,1

现在就开始说明每个rdd得到的到底是什么?

类ItemSimilarity中

原始数据得到user_rdd1:

用户,物品,评分

1,1,1

1,2,1

2,1,1

2,3,1

3,3,1

3,4,1

4,2,1

4,4,1

5,1,1

5,2,1

5,3,1

6,4,1

user_rdd2:

用户,物品

(1,1)

(1,2)

(2,1)

(2,3)

(3,3)

(3,4)

(4,2)

(4,4)

(5,1)

(5,2)

(5,3)

(6,4)

user_rdd3,笛卡尔join操作

用户,物品    用户,物品

1,1            1,1    

1,2            1,2

2,1            2,1        

2,3            2,3

3,3            3,3

3,4            3,4

4,2            4,2

4,4            4,4

5,1            5,1

5,2            5,2

5,3            5,3

6,4            6,4

user_rdd3:

用户,(物品,物品)

(4,(2,2))

(4,(2,4))

(4,(4,2))

(4,(4,4))

(5,(1,1))

(5,(1,2))

(5,(1,3))

(5,(2,1))

(5,(2,2))

(5,(2,3))

(5,(3,1))

(5,(3,2))

(5,(3,3))

(6,(4,4))

(2,(1,1))

(2,(1,3))

(2,(3,1))

(2,(3,3))

(3,(3,3))

(3,(3,4))

(3,(4,3))

(3,(4,4))

(1,(1,1))

(1,(1,2))

(1,(2,1))

(1,(2,2))

user_rdd4:

(物品,物品),1

((2,2),1)

((2,4),1)

((4,2),1)

((4,4),1)

((1,1),1)

((1,2),1)

((1,3),1)

((2,1),1)

((2,2),1)

((2,3),1)

((3,1),1)

((3,2),1)

((3,3),1)

((4,4),1)

((1,1),1)

((1,3),1)

((3,1),1)

((3,3),1)

((3,3),1)

((3,4),1)

((4,3),1)

((4,4),1)

((1,1),1)

((1,2),1)

((2,1),1)

((2,2),1)

user_rdd5:

(物品,物品),频次

((1,1),3)

((2,1),2)

((1,3),2)

((4,3),1)

((4,2),1)

((3,4),1)

((3,3),3)

((2,2),3)

((4,4),3)

((2,4),1)

((1,2),2)

((2,3),1)

((3,1),2)

((3,2),1)

user_rdd6:

物品==物品(对角矩阵)

((1,1),3)

((3,3),3)

((2,2),3)

((4,4),3)

user_rdd7:

物品!=物品(非对角矩阵)

((2,1),2)

((1,3),2)

((4,3),1)

((4,2),1)

((3,4),1)

((2,4),1)

((1,2),2)

((2,3),1)

((3,1),2)

((3,2),1)

user_rdd8:

物品1,((物品1,物品2,同现频次),物品1购买次数)

(4,((4,3,1),3))

(4,((4,2,1),3))

(2,((2,1,2),3))

(2,((2,4,1),3))

(2,((2,3,1),3))

(3,((3,4,1),3))

(3,((3,1,2),3))

(3,((3,2,1),3))

(1,((1,3,2),3))

(1,((1,2,2),3))

user_rdd9:

物品2,(物品1,物品2,同现频次, 物品1购买次数)

(3,(4,3,1,3))

(2,(4,2,1,3))

(1,(2,1,2,3))

(4,(2,4,1,3))

(3,(2,3,1,3))

(4,(3,4,1,3))

(1,(3,1,2,3))

(2,(3,2,1,3))

(3,(1,3,2,3))

(2,(1,2,2,3))

user_rdd10:

物品2,((物品1,物品2,同现频次, 物品1购买次数), 物品2购买次数)

(4,((2,4,1,3),3))

(4,((3,4,1,3),3))

(2,((4,2,1,3),3))

(2,((3,2,1,3),3))

(2,((1,2,2,3),3))

(3,((4,3,1,3),3))

(3,((2,3,1,3),3))

(3,((1,3,2,3),3))

(1,((2,1,2,3),3))

(1,((3,1,2,3),3))

user_rdd11:

物品1,物品2,同现频次, 物品1购买次数,物品2购买次数

(2,4,1,3,3)

(3,4,1,3,3)

(4,2,1,3,3)

(3,2,1,3,3)

(1,2,2,3,3)

(4,3,1,3,3)

(2,3,1,3,3)

(1,3,2,3,3)

(2,1,2,3,3)

(3,1,2,3,3)

user_rdd12:

相似度定义:同时喜欢物品1又喜欢物品2的个数/sqrt(喜欢物品1的个数*喜欢物品2的个数),这里是同现相似度最核心的代码

物品1,物品2,相似度

(2,4,0.3333333333333333)

(3,4,0.3333333333333333)

(4,2,0.3333333333333333)

(3,2,0.3333333333333333)

(1,2,0.6666666666666666)

(4,3,0.3333333333333333)

(2,3,0.3333333333333333)

(1,3,0.6666666666666666)

(2,1,0.6666666666666666)

(3,1,0.6666666666666666)

用户推荐,类RecommendedItem:

rdd_app1_R1:

物品1,物品2,相似度

(2,4,0.3333333333333333)

(3,4,0.3333333333333333)

(4,2,0.3333333333333333)

(3,2,0.3333333333333333)

(1,2,0.6666666666666666)

(4,3,0.3333333333333333)

(2,3,0.3333333333333333)

(1,3,0.6666666666666666)

(2,1,0.6666666666666666)

(3,1,0.6666666666666666)

rdd_app1_R2:

物品1, ((物品2, 相似度),(用户,评分))

(4,((2,0.3333333333333333),(3,1.0)))

(4,((2,0.3333333333333333),(4,1.0)))

(4,((2,0.3333333333333333),(6,1.0)))

(4,((3,0.3333333333333333),(3,1.0)))

(4,((3,0.3333333333333333),(4,1.0)))

(4,((3,0.3333333333333333),(6,1.0)))

(2,((4,0.3333333333333333),(1,1.0)))

(2,((4,0.3333333333333333),(4,1.0)))

(2,((4,0.3333333333333333),(5,1.0)))

(2,((3,0.3333333333333333),(1,1.0)))

(2,((3,0.3333333333333333),(4,1.0)))

(2,((3,0.3333333333333333),(5,1.0)))

(2,((1,0.6666666666666666),(1,1.0)))

(2,((1,0.6666666666666666),(4,1.0)))

(2,((1,0.6666666666666666),(5,1.0)))

(3,((4,0.3333333333333333),(2,1.0)))

(3,((4,0.3333333333333333),(3,1.0)))

(3,((4,0.3333333333333333),(5,1.0)))

(3,((2,0.3333333333333333),(2,1.0)))

(3,((2,0.3333333333333333),(3,1.0)))

(3,((2,0.3333333333333333),(5,1.0)))

(3,((1,0.6666666666666666),(2,1.0)))

(3,((1,0.6666666666666666),(3,1.0)))

(3,((1,0.6666666666666666),(5,1.0)))

(1,((2,0.6666666666666666),(1,1.0)))

(1,((2,0.6666666666666666),(2,1.0)))

(1,((2,0.6666666666666666),(5,1.0)))

(1,((3,0.6666666666666666),(1,1.0)))

(1,((3,0.6666666666666666),(2,1.0)))

(1,((3,0.6666666666666666),(5,1.0)))

rdd_app1_R3:

(用户,物品2),评分*相似度

((3,2),0.3333333333333333)

((4,2),0.3333333333333333)

((6,2),0.3333333333333333)

((3,3),0.3333333333333333)

((4,3),0.3333333333333333)

((6,3),0.3333333333333333)

((1,4),0.3333333333333333)

((4,4),0.3333333333333333)

((5,4),0.3333333333333333)

((1,3),0.3333333333333333)

((4,3),0.3333333333333333)

((5,3),0.3333333333333333)

((1,1),0.6666666666666666)

((4,1),0.6666666666666666)

((5,1),0.6666666666666666)

((2,4),0.3333333333333333)

((3,4),0.3333333333333333)

((5,4),0.3333333333333333)

((2,2),0.3333333333333333)

((3,2),0.3333333333333333)

((5,2),0.3333333333333333)

((2,1),0.6666666666666666)

((3,1),0.6666666666666666)

((5,1),0.6666666666666666)

((1,2),0.6666666666666666)

((2,2),0.6666666666666666)

((5,2),0.6666666666666666)

((1,3),0.6666666666666666)

((2,3),0.6666666666666666)

((5,3),0.6666666666666666)

rdd_app1_R4:

(用户,物品2),评分*相似度

((1,1),0.6666666666666666)

((1,4),0.3333333333333333)

((2,1),0.6666666666666666)

((6,2),0.3333333333333333)

((1,3),1.0)

((4,3),0.6666666666666666)

((4,2),0.3333333333333333)

((5,3),1.0)

((3,4),0.3333333333333333)

((3,3),0.3333333333333333)

((2,2),1.0)

((4,4),0.3333333333333333)

((2,4),0.3333333333333333)

((1,2),0.6666666666666666)

((6,3),0.3333333333333333)

((2,3),0.6666666666666666)

((3,1),0.6666666666666666)

((3,2),0.6666666666666666)

((4,1),0.6666666666666666)

((5,4),0.6666666666666666)

((5,1),1.3333333333333333)

((5,2),1.0)

rdd_app1_R5:过滤出来了用户未买物品的推荐度

(用户,(物品2,评分*相似度))

(1,(4,0.3333333333333333))

(6,(2,0.3333333333333333))

(1,(3,1.0))

(4,(3,0.6666666666666666))

(2,(2,1.0))

(2,(4,0.3333333333333333))

(6,(3,0.3333333333333333))

(3,(1,0.6666666666666666))

(3,(2,0.6666666666666666))

(4,(1,0.6666666666666666))

(5,(4,0.6666666666666666))

rdd_app1_R6:

用户, [(未购买物品1,推荐度),(未购买物品2,推荐度)]

(4,CompactBuffer((3,0.6666666666666666), (1,0.6666666666666666)))

(5,CompactBuffer((4,0.6666666666666666)))

(6,CompactBuffer((2,0.3333333333333333), (3,0.3333333333333333)))

(2,CompactBuffer((2,1.0), (4,0.3333333333333333)))

(3,CompactBuffer((1,0.6666666666666666), (2,0.6666666666666666)))

(1,CompactBuffer((4,0.3333333333333333), (3,1.0)))

rdd_app1_R7:

这一步中会排序和筛选(筛选会改变长度,所以需要toBuffer)

用户, [(未购买物品1,推荐度),(未购买物品2,推荐度)]

(4,ArrayBuffer((3,0.6666666666666666), (1,0.6666666666666666)))

(5,ArrayBuffer((4,0.6666666666666666)))

(6,ArrayBuffer((2,0.3333333333333333), (3,0.3333333333333333)))

(2,ArrayBuffer((4,0.3333333333333333), (2,1.0)))

(3,ArrayBuffer((1,0.6666666666666666), (2,0.6666666666666666)))

(1,ArrayBuffer((4,0.3333333333333333), (3,1.0)))

rdd_app1_R8:

用户,未购买物品,推荐度

(4,3,0.6666666666666666)

(4,1,0.6666666666666666)

(5,4,0.6666666666666666)

(6,2,0.3333333333333333)

(6,3,0.3333333333333333)

(2,4,0.3333333333333333)

(2,2,1.0)

(3,1,0.6666666666666666)

(3,2,0.6666666666666666)

(1,4,0.3333333333333333)

(1,3,1.0)

经验之谈:

1.在类ItemSimilarity中有两个Recommend方法,这里我们推荐使用第一个Recommend方法,因为给用户推荐的数量太过庞大了,比如最后给某个人推荐了几百种东西,你觉得他都有可能会买吗?大家最关心的是他最有可能购买的是哪些物品,所以给定一个r_number限制一下,比如20.这样系统只会保存每个用户top20的商品。后面你再取数据排序的时候,数据量就大大缩小了,只需要排序top20的数据,而不是对每个用户几百个数据排序,所有会员排完序,那将是噩梦。

2.在这段代码最后加上过滤filter(f => f._2._2 > 0)

val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))). filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1))).filter(f => f._2._2 > 0)

因为最后计算出来的评分可能出现NaN,也就是为空的情况,会影响排序,提前过滤掉。如果数据量觉得还是太大,可以把0慢慢调大,过滤一些评分非常低的数据。

3.参数传入只能传入两个参数,用户和商品,最后得出来的矩阵是用户,商品,评分。这显然是不够的,难道我们还需要去关联用户表,商品表来获取相关数据吗?这个join也是非常恐怖的,所以提前传参的时候就把相关数据拼接起来,计算完毕后再拆分。比如输入数据是“用户id#用户姓名#用户性别#用户年龄”,“商品id#商品名称#商品价格#商品类别”。最后的输出数据再以#拆分。

继续阅读