基于用户的协同过滤


一、概述:

1.计算用户之间的相似矩阵

2.基于用户相似矩阵(去给B做推荐)找到与B最相似的用户A,用户A喜欢的物品并且B用户未看过的物品,推荐给B

二、整体流程

1、原始数据:用户听歌日志表(表:user_listen)

四个字段分别为:

userId         用户id
musicId       歌曲id
remainTime  收听时长(秒)
durationHour   收听的时间点(在一天的几点收听的)

2、制作用户对音乐的喜爱程度字段(核心就是用户听该音乐的总时长/该用户听歌的时长)

SELECT userid , musicid,(t1.itemTotalTime/t2.totalTime) as rating FROM 
(SELECT  userid , musicid ,SUM(remaintime) as itemTotalTime FROM default.user_listen group by userid , musicid) t1  --用户听该音乐的总时长 
LEFT JOIN  
(SELECT  userid as userid2 , SUM(remaintime) as totalTime FROM default.user_listen group by userid ) t2  --一个用户听歌的总时长
ON  t1.userid=t2.userid2

得到字段 userid , musicid, rating(用户对音乐的喜爱程度评分)

 3、计算用户之间的相似性

由上一步能够转换得到每个用户所有音乐喜爱程度的列表,如下形式:

ui=[i1:0.3,i19:0.5,i21:0.3,i41:0.5]
uj=[i1:0.4,i21:0.2,i41:0.7]

根据余弦相似度计算两两用户之间的相似评分

注:严格按照余弦公式分母是ui和uj共同物品的喜爱列表的平方和的根,即:i1、i21、i41

转换成ui和uj各自物品的喜爱列表的平方和的根,共同变小不影响最终的相对评分

分母:sqrt(0.3^2+0.5^2+0.3^2+0.5^2)*sqrt(0.4^2+0.2^2+0.7^5)

 group by userid 就能够实现的到字段:userid      userid分母

分子:把userid , musicid, rating的数据再复制一份,通过musicid join得到:

userid1,userid2,musicid,rating1,rating2

在此join的优化:

(1)业务指定避免所有人去参与相似计算。年龄、性别、地理位置等一系列用户自身属性的字段,去减小计算能量

(2)基于业务方面的聚类,对用户进行聚类,避免所有用户进行两两相似计算。

用户间的相似评分流程如下:

 用户间的相似性计算量相对较大可以落地到hive,周更

4、制作用户物品推荐评分,用于线上推荐日更

参考代码:

import breeze.numerics.{pow, sqrt}
import org.apache.spark.sql.SparkSession

object RecallUserCf {
  def main(args: Array[String]): Unit = {

    //    在driver端创建 sparksession
    val spark = SparkSession.
      builder().
//      master("local").
      appName("RecallUserCf").
      config("hive.metastore.uris",
        "thrift://"+"kf1-82-hdp"+":9083").
      //      config("spark.sql.warehouse.dir", "F:/recommend/spark-warehouse").
      enableHiveSupport().
      getOrCreate()

    /***
     *1.导入数据,并加工评分数据
     */
    //    10w用户
    //我们在日常spark开发中,90%的数据都是从hive里面读取的。
    //user_listen有500mb, user_listen 的 rdd的partition有多少个:
    // block128mb, 4个。当你从hdfs里面读取数据,你的rdd的分区是有 block来决定的
    val user_profile = spark.sql("select * from badou.user_profile")
//    val user_listen = spark.sql("select userid,musicid,cast(remaintime as double),cast(durationhour as double) from badou.user_listen ")
    val user_listen = spark.sql("select userid,musicid,cast(remaintime as double),cast(durationhour as double) from default.ul_log ")

    //   2.计算用户听某一首歌曲的总时间
    val itemTotalTime = user_listen.
      selectExpr("userId",
        "musicId", "cast(remainTime as double)",
        "cast(durationHour as double)").
      groupBy("userId", "musicId").
      sum("remainTime").
      withColumnRenamed("sum(remainTime)", "itemTotalTime")
    //    withColumnRenamed指的是重新名命名字段
    //    用户总共听歌时长
    val totalTime = user_listen.
      selectExpr("userId", "musicId",
        "cast(remainTime as double)",
        "cast(durationHour as double)").
      groupBy("userId").
      sum("remainTime").
      withColumnRenamed("sum(remainTime)", "totalTime")

    //    uid ,iid ,rating
    //    itemTotalTime
    //|              userId|   musicId|itemTotalTime|
    //+--------------------+----------+-------------+
    //|00941e652b84b9967...|6326709127|        200.0|
    //|01e7b27c256cc06c1...|5652309207|        280.0|
    //|0115519e4f2e7488d...| 536400214|        195.0|
    //+--------------------+----------+-------------+
    //    totalTime 表比较小,1000w (1gb)
    //+--------------------+---------+
    //|              userId|totalTime|
    //+--------------------+---------+
    //|001182ecc8be5e709...|     75.0|
    //|003bfa68b61dd5c9e...|   1810.0|
    //|00c68f637da1a8731...|    763.0|
    //+--------------------+---------+
    // 在做join的是后,提前聚合数据.
    val data = itemTotalTime.
      join(totalTime, "userId").
      selectExpr("userId", "musicId as itemId", "itemTotalTime/totalTime as rating")
    data.createOrReplaceTempView("user")
    data.cache()
    //        data.show(2)
    /**
     *
     * +--------------------+---------+-------------------+
     * |              userId|   itemId|             rating|
     * +--------------------+---------+-------------------+
     * |000005a451226ca84...|177200319|                1.0|
     * |0000cbb6ea39957f7...|068800255|0.20651420651420652|
     * +--------------------+---------+-------------------+
     */
    //    data.write.mode("overwrite").saveAsTable("user_item_rating")
    /***
     *2.计算用户相似性
     */
    //(x1*y1+x2*y2......)/|X|*|Y|
    //|X| =sqrt(x1^2+x2^2+x3^2)
    //|Y| =sqrt(y1^2+y2^2+y3^2)
    //    2.1计算分母
    import spark.implicits._

    //  userSumPowRatin  每个用户 对所有的物品进行评分的平方和 (|X| =sqrt(x1^2+x2^2+x3^2)
    val userSumPowRating =  data.
      rdd.
      map(x => (x(0).toString, x(2).toString)).
      groupByKey().
      // 一条数据
      // (003d60470fbc5eb862940b4a9c8b3b26,CompactBuffer(0.6582343830665979, 0.24109447599380485, 0.10067114093959731))
      mapValues(x => sqrt(x.toArray.map(rating => pow(rating.toDouble, 2)).sum)).
      toDF("userId", "sqrt_rating_sum")

    userSumPowRating.cache()
    userSumPowRating.show(3)
    //    2.计算分子
    //    uid 247999,如果要计算 两两相似性,直接爆炸。
    val data_with_copy =data.selectExpr("userId as userId1","itemId as itemId1","rating as rating1")

    //     item->user倒排表,去除一样用户的打分
    //    todo优化计算空间,对用户不进行笛卡尔积
    //    优化前
    //     333499378条数据
    val user_item2item = data.
      join(data_with_copy,data_with_copy("itemId1")===data("itemId")).
      filter("userId <> userId1 ")
    user_item2item.selectExpr("userId").distinct().count()
    //    92964 说明 部分用户跟其他人听的歌没有任何交集,这部分用户不会有召回结果,所以,我们需要对这部分用户以其他方式进行相似性计算。
    //    计算两个用户在一个item下的评分的乘积,consine举例的分子的一部分
    //    dot
    import org.apache.spark.sql.functions._

    val product_udf = udf((s1:Double,s2:Double)=>s1*s2)

    val selectData = user_item2item.selectExpr("userId","rating","itemId","userId1","rating1")

    //    val user_data =  selectData.selectExpr("userId","userId1","rating1*rating as rating_product")
    val user_data =  selectData.
      withColumn("rating_product",product_udf(col("rating"),col("rating1"))).
      selectExpr("userId","userId1","itemId","rating_product")


    //    此处直接爆炸了
    val user_rating_sum =  user_data.
      groupBy("userId","userId1").
      agg("rating_product"->"sum").
      withColumnRenamed("sum(rating_product)","rating_dot")

    //    分子除以分母 做相似性

    val userSumPowRatingCopy = userSumPowRating.selectExpr("userId as userId1","sqrt_rating_sum as sqrt_rating_sum1")

    val df_sim = user_rating_sum.
      join(userSumPowRating,"userId").
      join(userSumPowRatingCopy,"userId1").
      selectExpr("userId","userId1",
        "rating_dot/(sqrt_rating_sum*sqrt_rating_sum1) as cosine_sim")

    //      2.  获取相似用户的物品集合
    //       2.1取得前n个相似用户
    val df_nsim =df_sim.
      rdd.
      map(x=> (x(0).toString,(x(1).toString,x(2).toString))).
      groupByKey().
      mapValues { x =>
        x.toArray.sortWith((x, y) => x._2 > y._2).slice(0,10)}.
      flatMapValues(x=>x).toDF("userid","user_v_sim").
      selectExpr("user_id","user_v_sim._1 as user_v","user_v_sim._2 as sim")

    //    2.2获取用户的物品集合进行过滤,拿到每一个用户的物品集合,列表
    val df_user_item = data.
      rdd.
      map(x=>(x(0).toString,x(1).toString+"_"+x(2).toString)).
      groupByKey().mapValues(x=>x.toArray)
      .toDF("user_id","item_rating_arr")
    val df_user_item_v = df_user_item.selectExpr("user_id as user_v",
      "item_rating_arr as item_rating_arr_v")
    /** df_gen_item:
     * +------+-------+-------------------+--------------------+--------------------+
        |user_v|user_id|                sim|     item_rating_arr|   item_rating_arr_v|
        +------+-------+-------------------+--------------------+--------------------+
        |   296|     71|0.33828954632615976|[89_5, 134_3, 346...|[705_5, 508_5, 20...|
        |   467|     69| 0.4284583738949647|[256_5, 240_3, 26...|[1017_2, 50_4, 15...|
        |   467|    139|0.32266158985444504|[268_4, 303_5, 45...|[1017_2, 50_4, 15...|
        |   467|    176|0.44033327143526596|[875_4, 324_5, 32...|[1017_2, 50_4, 15...|
        |   467|    150|0.47038691576507874|[293_4, 181_5, 12...|[1017_2, 50_4, 15...|
        +------+-------+-------------------+--------------------+--------------------+
     * */
    val df_gen_item = df_nsim.join(df_user_item,"user_id")
      .join(df_user_item_v,"user_v")
    //       2.3用一个udf过滤相似用户user_id1中包含user_id已经打过分的物品


    val filter_udf = udf{(items:Seq[String],items_v:Seq[String])=>
      val fMap = items.map{x=>
        val l=x.split("_")
        (l(0),l(1))
      }.toMap
      items_v.filter{x=>
        val l = x.split("_")
        fMap.getOrElse(l(0),-1) == -1
      }
    }

    val df_filter_item = df_gen_item.withColumn("filtered_item",
      filter_udf(col("item_rating_arr"),col("item_rating_arr_v")))
      .select("user_id","sim","filtered_item")


    /** df_filter_item:
     * +-------+-------------------+--------------------+
     * |user_id|                sim|       filtered_item|
        +-------+-------------------+--------------------+
        |     71|0.33828954632615976|[705_5, 508_5, 20...|
        |     69| 0.4284583738949647|[762_3, 264_2, 25...|
        |    139|0.32266158985444504|[1017_2, 50_4, 76...|
        |    176|0.44033327143526596|[1017_2, 762_3, 2...|
        |    150|0.47038691576507874|[1017_2, 762_3, 2...|
        +-------+-------------------+--------------------+
     * */


    //        2.4公式计算 相似度*rating
    val simRatingUDF = udf{(sim:Double,items:Seq[String])=>
      items.map{x=>
        val l = x.split("_")
        l(0)+"_"+l(1).toDouble*sim
      }
    }
    val itemSimRating = df_filter_item.withColumn("item_prod",
      simRatingUDF(col("sim"),col("filtered_item")))
      .select("user_id","item_prod")

    /**itemSimRating:
     *+-------+--------------------+
        |user_id|           item_prod|
        +-------+--------------------+
        |     71|[705_1.6914477316...|
        |     69|[762_1.2853751216...|
        |    139|[1017_0.645323179...|
        |    176|[1017_0.880666542...|
        |    150|[1017_0.940773831...|
        +-------+--------------------+
     */

    val userItemScore = itemSimRating.select(itemSimRating("user_id"),
      explode(itemSimRating("item_prod"))).toDF("user_id","item_prod")
      .selectExpr("user_id","split(item_prod,'_')[0] as item_id",
        "cast(split(item_prod,'_')[1] as double) as score")

    /**userItemScore:
     *+-------+-------+------------------+
        |user_id|item_id|             score|
        +-------+-------+------------------+
        |     71|    705|1.6914477316307988|
        |     71|    508|1.6914477316307988|
        |     71|     20|1.6914477316307988|
        |     71|    228| 1.353158185304639|
        |     71|    855|1.6914477316307988|
        +-------+-------+------------------+
     */
  }
}