提到 ALS 相信大家应该都不会觉得陌生(不陌生你点进来干嘛[捂脸]), 它是协同过滤的一种, 并被集成到 Spark 的 Mllib 库中. 本文就 ALS 的基本原理进行讲解, 并手把手, 肩并肩地带您实现这一算法.
原理篇
我们用人话而不是大段的数学公式来讲讲 ALS 是怎么一回事.
1.1 你听说过推荐算法么
假如我是豆瓣的 CEO, 很多豆瓣的用户在豆瓣电影上都会对电影进行评分. 那么根据这个评分数据, 我们有可能知道这些用户除了自己评过分的电影之外还喜欢或讨厌哪些电影吗? 这就是一个典型的推荐问题, 解决这一类问题的算法被称为推荐算法.
1.2 什么是协同过滤
协同过滤的英文全称是 Collaborative Filtering, 简称 CF. 注意, 这不是一款游戏! 从字面上分析, 协同就是寻找共同点, 过滤就是筛选出优质的内容.
1.3 协同过滤的分类
一般来说, 协同过滤推荐分为三种类型:
基于用户 (user-based) 的协同过滤, 通过计算用户和用户的相似度找到跟用户 A 相似的用户 B, C, D... 再把这些用户喜欢的内容推荐给 A;
基于物品 (item-based) 的协同过滤, 通过计算物品和物品的相似度找到跟物品 1 相似的物品 2, 3, 4... 再把这些物品推荐给看过物品 1 的用户们;
基于模型 (model based) 的协同过滤. 主流的方法可以分为: 矩阵分解, 关联算法, 聚类算法, 分类算法, 回归算法, 神经网络.
1.4 矩阵分解
矩阵分解 (decomposition, factorization)是将矩阵拆解为数个矩阵的乘积. 比如豆瓣电影有 m 个用户, n 个电影. 那么用户对电影的评分可以形成一个 m 行 n 列的矩阵 R, 我们可以找到一个 m 行 k 列的矩阵 U, 和一个 k 行 n 列的矩阵 I, 通过 U * I 来得到矩阵 R.
1.5 ALS
如果想通过矩阵分解的方法实现基于模型的协同过滤, ALS 是一个不错的选择, 其英文全称是 Alternating Least Square, 翻译过来是交替最小二乘法. 假设用户为 a, 物品为 b, 评分矩阵为 R(m, n), 可分解为用户矩阵 U(k, m)和物品矩阵 I(k, n), 其中 m, n, k 代表矩阵的维度. 前方小段数学公式低能预警:
根据矩阵分解的定义, 有
用 MSE 作为损失函数, 为了方便化简, 加法符号左侧的常数改为 - 1/2
对损失函数求 U_a 的一阶偏导数, 那
令一阶偏导数等于 0
同理, 可证
1.6 求解用户矩阵 U 和物品矩阵 I
矩阵 R 是已知的, 我们随机生成用户矩阵 U,
利用 1.5 中的式 5,R 和 U 求出 I
利用 1.5 中的式 6,R 和 I 求出 U
如此交替地执行步骤 1 和步骤 2, 直到算法收敛或者迭代次数超过了最大限制, 最终我们用 RMSE 来评价模型的好坏.
实现篇
本人用全宇宙最简单的编程语言 --Python 实现了 ALS 算法, 没有依赖任何第三方库, 便于学习和使用. 简单说明一下实现过程, 更详细的注释请参考本人 GitHub 上的代码.
注: 代码中用到的 Matrix 类是我写的一个矩阵类, 可以取出矩阵的行或列, 计算矩阵的乘法, 转置和逆.
2.1 创建 ALS 类
初始化, 存储用户 ID, 物品 ID, 用户 ID 与用户矩阵列号的对应关系, 物品 ID 与物品矩阵列号的对应关系, 用户已经看过哪些物品, 评分矩阵的 Shape 以及 RMSE.
- class ALS(object):
- def __init__(self):
- self.user_ids = None
- self.item_ids = None
- self.user_ids_dict = None
- self.item_ids_dict = None
- self.user_matrix = None
- self.item_matrix = None
- self.user_items = None
- self.shape = None
- self.rmse = None
2.2 数据预处理
对训练数据进行处理, 得到用户 ID, 物品 ID, 用户 ID 与用户矩阵列号的对应关系, 物品 ID 与物品矩阵列号的对应关系, 评分矩阵的 Shape, 评分矩阵及评分矩阵的转置.
- def _process_data(self, X):
- self.user_ids = tuple((set(map(lambda x: x[0], X))))
- self.user_ids_dict = dict(map(lambda x: x[::-1],
- enumerate(self.user_ids)))
- self.item_ids = tuple((set(map(lambda x: x[1], X))))
- self.item_ids_dict = dict(map(lambda x: x[::-1],
- enumerate(self.item_ids)))
- self.shape = (len(self.user_ids), len(self.item_ids))
- ratings = defaultdict(lambda: defaultdict(int))
- ratings_T = defaultdict(lambda: defaultdict(int))
- for row in X:
- user_id, item_id, rating = row
- ratings[user_id][item_id] = rating
- ratings_T[item_id][user_id] = rating
- err_msg = "Length of user_ids %d and ratings %d not match!" % (
- len(self.user_ids), len(ratings))
- assert len(self.user_ids) == len(ratings), err_msg
- err_msg = "Length of item_ids %d and ratings_T %d not match!" % (
- len(self.item_ids), len(ratings_T))
- assert len(self.item_ids) == len(ratings_T), err_msg
- return ratings, ratings_T
2.3 用户矩阵乘以评分矩阵
实现稠密矩阵与稀疏矩阵的矩阵乘法, 得到用户矩阵与评分矩阵的乘积.
- def _users_mul_ratings(self, users, ratings_T):
- def f(users_row, item_id):
- user_ids = iter(ratings_T[item_id].keys())
- scores = iter(ratings_T[item_id].values())
- col_nos = map(lambda x: self.user_ids_dict[x], user_ids)
- _users_row = map(lambda x: users_row[x], col_nos)
- return sum(a * b for a, b in zip(_users_row, scores))
- ret = [[f(users_row, item_id) for item_id in self.item_ids]
- for users_row in users.data]
- return Matrix(ret)
2.4 物品矩阵乘以评分矩阵
实现稠密矩阵与稀疏矩阵的矩阵乘法, 得到物品矩阵与评分矩阵的乘积.
- def _items_mul_ratings(self, items, ratings):
- def f(items_row, user_id):
- item_ids = iter(ratings[user_id].keys())
- scores = iter(ratings[user_id].values())
- col_nos = map(lambda x: self.item_ids_dict[x], item_ids)
- _items_row = map(lambda x: items_row[x], col_nos)
- return sum(a * b for a, b in zip(_items_row, scores))
- ret = [[f(items_row, user_id) for user_id in self.user_ids]
- for items_row in items.data]
- return Matrix(ret)
2.5 生成随机矩阵
- def _gen_random_matrix(self, n_rows, n_colums):
- data = [[random() for _ in range(n_colums)] for _ in range(n_rows)]
- return Matrix(data)
2.6 计算 RMSE
- def _get_rmse(self, ratings):
- m, n = self.shape
- mse = 0.0
- n_elements = sum(map(len, ratings.values()))
- for i in range(m):
- for j in range(n):
- user_id = self.user_ids[i]
- item_id = self.item_ids[j]
- rating = ratings[user_id][item_id]
- if rating> 0:
- user_row = self.user_matrix.col(i).transpose
- item_col = self.item_matrix.col(j)
- rating_hat = user_row.mat_mul(item_col).data[0][0]
- square_error = (rating - rating_hat) ** 2
- mse += square_error / n_elements
- return mse ** 0.5
2.7 训练模型
数据预处理
变量 k 合法性检查
生成随机矩阵 U
交替计算矩阵 U 和矩阵 I, 并打印 RMSE 信息, 直到迭代次数达到 max_iter
保存最终的 RMSE
- def fit(self, X, k, max_iter=10):
- ratings, ratings_T = self._process_data(X)
- self.user_items = {k: set(v.keys()) for k, v in ratings.items()}
- m, n = self.shape
- error_msg = "Parameter k must be less than the rank of original matrix"
- assert k < min(m, n), error_msg
- self.user_matrix = self._gen_random_matrix(k, m)
- for i in range(max_iter):
- if i % 2:
- items = self.item_matrix
- self.user_matrix = self._items_mul_ratings(
- items.mat_mul(items.transpose).inverse.mat_mul(items),
- ratings
- )
- else:
- users = self.user_matrix
- self.item_matrix = self._users_mul_ratings(
- users.mat_mul(users.transpose).inverse.mat_mul(users),
- ratings_T
- )
- rmse = self._get_rmse(ratings)
- print("Iterations: %d, RMSE: %.6f" % (i + 1, rmse))
- self.rmse = rmse
2.8 预测一个用户
预测一个用户感兴趣的内容, 剔除用户已看过的内容. 然后按感兴趣分值排序, 取出前 n_items 个内容.
- def _predict(self, user_id, n_items):
- users_col = self.user_matrix.col(self.user_ids_dict[user_id])
- users_col = users_col.transpose
- items_col = enumerate(users_col.mat_mul(self.item_matrix).data[0])
- items_scores = map(lambda x: (self.item_ids[x[0]], x[1]), items_col)
- viewed_items = self.user_items[user_id]
- items_scores = filter(lambda x: x[0] not in viewed_items, items_scores)
- return sorted(items_scores, key=lambda x: x[1], reverse=True)[:n_items]
2.9 预测多个用户
循环调用 2.8, 预测多个用户感兴趣的内容.
- def predict(self, user_ids, n_items=10):
- return [self._predict(user_id, n_items) for user_id in user_ids]
3 效果评估
3.1 main 函数
使用电影评分数据集, 训练模型并统计 RMSE.
- @run_time
- def main():
- print("Tesing the accuracy of ALS...")
- X = load_movie_ratings()
- model = ALS()
- model.fit(X, k=3, max_iter=5)
- print()
- print("Showing the predictions of users...")
- user_ids = range(1, 5)
- predictions = model.predict(user_ids, n_items=2)
- for user_id, prediction in zip(user_ids, predictions):
- _prediction = [format_prediction(item_id, score)
- for item_id, score in prediction]
- print("User id:%d recommedation: %s" % (user_id, _prediction))
3.2 效果展示
设置 k=3, 迭代 5 次, 并展示了前 4 个用户的推荐内容, 最终 RMSE 为 0.370, 运行时间 46.5 秒, 效果还算不错~
3.3 工具函数
本人自定义了一些工具函数, 可以在 GitHub 上查看:
run_time - 测试函数运行时间
load_movie_ratings - 加载电影评分数据
总结
ALS 的原理: 鸡生蛋, 蛋生鸡
ALS 的实现: 基本上就是矩阵乘法
来源: https://yq.aliyun.com/articles/684195