pyspark计算余弦相似度
可用于相似文章推荐
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder. \
appName("mllib"). \
getOrCreate()
sc = spark.sparkContext
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.feature import Normalizer
# 0 origin data
rdd_0 = sc.parallelize([["v1", "Delhi, Mumbai, Gandhinagar"],
["v2", " Delhi, Mandi"],
["v3", "Hyderbad, Jaipur"],
["v4", "Delhi, Mumbai, Gandhinagar,xyz"]
])
rdd_0.cache()
# 索引:文章id的字典
index2video = rdd_0.zipWithIndex().map(lambda x: (x[1] + 1, x[0][0])).collectAsMap()
video2index = {v: k for k, v in index2video.items()}
print(index2video)
print(video2index)
# 1. generate input data
rdd = rdd_0.map(lambda x: (video2index[x[0]], x[1]))
# 2. compute TF-IDF
documents = rdd.map(lambda x: x[1].replace(" ", "").split(","))
hashing = HashingTF(100)
tf = hashing.transform(documents)
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf) # 使用这种方式计算tfidf
# 3. compute L2 norm
labels = rdd.map(lambda x: x[0])
features = tfidf
norm = Normalizer()
data = labels.zip(norm.transform(features))
# 4. compute consine similarity by multiplying the matrix with itself
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
mat = IndexedRowMatrix(data).toBlockMatrix()
dot = mat.multiply(mat.transpose())
r = dot.toLocalMatrix().toArray() # return a numpy.ndarray
print(r)
import numpy as np
y = np.argsort(r) # 从小到打大的索引,然后取top100
result = y[:, r.shape[1]:r.shape[1] - 100:-1] # 因为argsort是正序排序后的索引,这里倒序获取数据,就是倒叙排列了
print(result)
def get_recmd(video_id):
"""
输入文章id,得到相似的100个文章
:param video_id:
:return:
"""
rr = []
index = video2index[video_id]
tmp = result[index]
# 转为list
recmd = list(tmp)
for ind in recmd:
temp_video_id = index2video.get(ind)
if temp_video_id and temp_video_id!=video_id:
rr.append(temp_video_id)
return video_id, [item for item in rr if item!=video_id]
final = rdd_0.map(lambda x: get_recmd(x[0]))
output = spark.createDataFrame(final,["video_id","recmd"]).collect()
print(output)
参考文献
https://stackoverflow.com/questions/46758768/calculating-the-cosine-similarity-between-all-the-rows-of-a-dataframe-in-pyspark