一, 数据源
1, 相似人群数据存在 TDW 库中, 数据字典说明:
- CREATE TABLE sim_people_tdw_tbl(
- uid STRING COMMENT 'reader id',
- sim_uids STRING COMMENT 'sim_uids',
- sim_num BIGINT COMMENT 'sim_num',
- update_date STRING COMMENT 'update_date'
- )
复制代码
字段 | 类型 | 含义 |
---|---|---|
uid | string | 用户标识 |
sim_uids | string | 与 uid 喜好相似的人群,格式为用户编号: 相同阅读量,相似用户之间以逗号分隔 |
sim_num | BIGINT | 相似人群的人数 |
update_date | string | 数据日期 |
2, 基础用户画像存在 MongoDB 中
基础用户画像
字段 | 含义 |
---|---|
_id | 用户 id |
profile(离线)positive(实时) | 用户正画像(喜欢),每个维度以分号间隔,每个子维度以逗号间隔,值格式为 key_id:weight,维度含义依次为一级分类、二级分类、关键字、topic、阅读来源 |
negative | 负画像(不喜欢),其他字段的含义与正画像一样 |
update_time | 更新时间 |
cityCode 或 city | 城市编码 |
3, 相似人群画像也存在 MongoDB 中
二, 整体思路
由于 TESLA 集群无法直接操作 MongoDB, 需要将 TDW 里面的用户画像数据, 通过洛子系统导出至 HDFS, 再与 MongoDB 中原有群画像进行合并.
整体流程
三, 算法流程
算法流程图
四, 核心代码
- #! /usr/bin/python2.7
- # -*- coding: utf8 -*-
- import decimal
- import time
- import math
- import sys
- import os
- import param_map
- from pymongo.collection import Collection
- from decimal import Decimal
- import datetime
- reload(sys)
- sys.setdefaultencoding("utf-8")
- sys.path.append("../")
- from utils import mongoUtils, confUtils
- decimal.getcontext().prec = 6
- BATCH_NUM = 100000
- now_time = datetime.datetime.now()
- delta = datetime.timedelta(days=30)
- delta30 = now_time - delta
- time_limit = int(time.mktime(delta30.timetuple()))
- print(time_limit)
- def split_uid_similarity(uid_num_str):
- """
- 拆分 uid 和相似度, 并分别返回
- :param uid_num_str:
- :return:uid, 相似度
- """ uid_num = uid_num_str.split(":")
- return uid_num[0], float(uid_num[1])
- def split_uid_sim_user(user_hd):
- """
- 拆分 uid 和相似人群, 并分别返回
- :param user_hd:
- :return: uid, 相似人群
- """ uid_sim_user = user_hd.strip().split("\t")
- return uid_sim_user[0], uid_sim_user[1]
- def dimension_profile_limit(dimension_profile, min_i, max_i, limit, cluster_profile_str):
- """
- :param dimension_profile:
- :param min_i:
- :param max_i:
- :param limit:
- :param cluster_profile_str:
- :return: 返回前 limit 个特征标签, 并对特征权重进行映射
- """
- if len(dimension_profile) != 0:
- # 先排序
- dimension_profile = sorted(dimension_profile.iteritems(), key=lambda c: c[1], reverse=True)
- # 再对前 limit 条记录进行映射
- size = limit if len(dimension_profile)> limit else len(dimension_profile)
- for i in range(size):
- tag = dimension_profile[i]
- tag_id = tag[0]
- tag_value = tag[1]
- tag_value = max_i if tag_value> max_i else tag_value
- if tag_value>= min_i:
- cluster_profile_str = cluster_profile_str + str(tag_id) + ":" + str(tag_value) + ","
- if len(dimension_profile) != 0:
- # 假如长度不为 0, 将最后一个逗号删掉
- cluster_profile_str = cluster_profile_str[:-1]
- return cluster_profile_str
- def cluster_profile_dic2list(cluster_profile, dimension_param_dic):
- """
- 相似用户群画像阈值过滤, dic->list
- :param dimension_param_dic: 维度阈值
- :return: 相似用户群特征 list
- :param cluster_profile: 群体画像
- """ cluster_profile_str =""
- if len(cluster_profile) == 0:
- return None
- for key, dimension_profile in cluster_profile.items():
- # 取出维度的阈值
- dimention_param = dimension_param_dic.get(str(key))
- if dimention_param is not None:
- min_i = dimention_param.get("min")
- max_i = dimention_param.get("max")
- limit = dimention_param.get("limit")
- if dimension_profile is not None:
- cluster_profile_str = dimension_profile_limit(dimension_profile, min_i, max_i, limit,
- cluster_profile_str)
- # values 为不为 None 都需要追加一个分号
- cluster_profile_str = cluster_profile_str + ";"
- cluster_profile_list = cluster_profile_str[:-1].split(";")
- return cluster_profile_list
- def sim_users_dic2list(cluster_dic, sim_users_max_size):
- """
- # 相似人群数量限制, dic->list
- :param sim_users_max_size: 相似人群的最大值
- :type cluster_dic: 字典表
- :param cluster_dic: 相似人群字典表
- :return: 相似度最高的相似人群
- """
- user_similarity_list = sorted(cluster_dic.iteritems(), key=lambda b: b[1], reverse=True)
- sim_users_s = ""
- i = 0
- new_cluster_dic = {}
- for i in range(len(user_similarity_list)):
- if i <sim_users_max_size:
- user_similarity = user_similarity_list[i]
- key = user_similarity[0]
- value = user_similarity[1]
- new_cluster_dic[key] = value
- sim_users_s = sim_users_s + key + ":" + str(value) + ","
- else:
- break
- i = i + 1
- sim_users_list = sim_users_s[:-1].split(",")
- return sim_users_list, new_cluster_dic
- class ClusterProfileComputer(object):
- cf = confUtils.getConfig("../conf/setting.conf")
- def __init__(self, environment):
- self.xw_database, self.xw_client = mongoUtils.getMongodb("XW")
- self.pac_database, self.pac_client = mongoUtils.getMongodb("PAC")
- self.om_database, self.pac_client = mongoUtils.getMongodb("OM")
- item = "LOCAL_SIM_USERS_PATH" if environment == "local" else "SIM_USERS_PATH"
- self.sim_users_path = confUtils.getFilePath(self.cf, "SIM_USERS", item)
- self.decay_factor = param_map.SIM_USERS_PARAM.get("decay_factor")
- self.sim_users_max_size = param_map.SIM_USERS_PARAM.get("sim_users_max_size")
- self.similarity_low = param_map.SIM_USERS_PARAM.get("similarity_low")
- self.similarity_high = param_map.SIM_USERS_PARAM.get("similarity_high")
- @staticmethod
- def basic_cursor2dic(platform, mongodb_cursor):
- """
- mongodb 取出的基础画像存到字典表
- :param platform: 平台
- :param mongodb_cursor:
- :return:
- """
- users_profile_map = {}
- for user_profile in mongodb_cursor:
- _uid = user_profile["name"] if platform == "PAC" else user_profile["_id"]
- users_profile_map[_uid] = user_profile
- return users_profile_map
- @staticmethod
- def get_sim_users_profile(all_users_profile, users_similarity):
- """
- :param all_users_profile:
- :param users_similarity:
- :return: 相似人群的画像
- """
- rs = []
- for uid_similarity in users_similarity:
- uid, similarity = split_uid_similarity(uid_similarity)
- profile = all_users_profile.get(uid)
- if profile is not None:
- rs.append(profile)
- return rs
- def dump_basic_profile(self, all_uid, batch_num, platform, profile_collection):
- # type: (list, int) -> dict
- """
- :return: 平台基础画像
- :param platform: 平台
- :return: 基础画像字典表
- :param profile_collection: 数据库集合
- :param all_uid: 用户的编号列表
- :type batch_num: int
- """
- rs = {}
- # 数据库查询所有人群用户画像, 此画像中没有相似人群
- for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):
- key = "name" if platform == "PAC" else "_id"
- cursor = profile_collection.find({"$and": [{key: {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},
- {"update_time": {"$gt": time_limit}}]}, no_cursor_timeout=True)
- rs.update(self.basic_cursor2dic(platform, cursor))
- cursor.close()
- return rs
- def compute_single_file(self, path, xw_profile_collection, pac_profile_collection, om_profile_collection):
- users = open(path)
- all_uid_list = []
- uid_sim_map = {}
- # uid_sim_map["1_291083852"] = ["1_757155427:8"]
- for user_str in users:
- # 从 hdfs 中取出 udi 的相似人群
- uid_hf, sim_users_hd = split_uid_sim_user(user_str)
- uid_sim_map[uid_hf] = sim_users_hd.split(",")
- all_uid_list.append(uid_hf)
- print("uid_sim_map : %d" % len(uid_sim_map))
- # 数据库查询所有用户的基础画像, 此画像中没有相似人群
- platform_basic_profile_dic = {}
- xw_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "XW", xw_profile_collection)
- platform_basic_profile_dic["XW"] = xw_users_basic_profile_map
- pac_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "PAC", pac_profile_collection)
- platform_basic_profile_dic["PAC"] = pac_users_basic_profile_map
- om_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "OM", om_profile_collection)
- platform_basic_profile_dic["OM"] = om_users_basic_profile_map
- # print("dump basic profile %d records" % len(pac_all_users_profile_map))
- # 数据库查询相似人群画像
- cluster_profile_collection = self.xw_database.get_collection(
- param_map.MONGODB_CLUSTER_PROFILE_MAP["Cluster"]) # type: Collection
- old_cluster_profile_map = dump_cluster_profile_history(self, all_uid_list, cluster_profile_collection,
- BATCH_NUM)
- print("dump cluster profile %d records" % len(old_cluster_profile_map))
- #index = 0
- for uid, sim_users_list in uid_sim_map.items():
- print ("uid = %s" % uid)
- # 合并新老相似人群, 并使用衰减因子来计算相似度
- users_similarity_dic = merge_sim_users(uid, sim_users_list, self.decay_factor, self.similarity_low,
- self.similarity_high, old_cluster_profile_map)
- # 相似人群 ----> 将字典表转化为 list, 存储 mongodb
- sim_users_list, users_similarity_dic = sim_users_dic2list(users_similarity_dic, self.sim_users_max_size)
- print("similar people len: %d" % len(sim_users_list))
- platform_cluster_profile_list = []
- for platform_name, platform_basic_profile in platform_basic_profile_dic.items():
- # 取出用户 i 相似人群的画像
- sim_users_profile_list = self.get_sim_users_profile(platform_basic_profile, sim_users_list)
- cluster_profile_dic = cluster_profile_compute(platform_name, sim_users_profile_list,
- users_similarity_dic)
- # 结果区间映射, 相似人群画像特征 -----> 字典表转 list, 便于存储 mongodb
- cluster_profile_list = cluster_profile_dic2list(cluster_profile_dic, param_map.DIMENSION_PARAM)
- platform_cluster_profile_list.append(cluster_profile_list)
- xw_cluster_profile = platform_cluster_profile_list[0]
- pac_cluster_profile = platform_cluster_profile_list[1]
- om_cluster_profile = platform_cluster_profile_list[2]
- old_profile = cluster_profile_collection.find_one({"_id": uid})
- if old_profile is None:
- create_time = int(time.time())
- else:
- create_time = old_profile["create_time"]
- document = ({"_id": uid, "sim_users": sim_users_list, "xw_cluster_profile": xw_cluster_profile,
- "pac_cluster_profile": pac_cluster_profile, "om_cluster_profile": om_cluster_profile,
- "create_time": create_time,
- "update_time": int(time.time())})
- cluster_profile_collection.save(document)
- #if index>= 100:
- # break
- #index = index + 1
- print("end")
- users.close()
- def run(self):
- # 相似人群 HDFS
- xw_profile_collection = self.xw_database.get_collection(param_map.MONGODB_PROFILE_MAP["XW"])
- pac_profile_collection = self.pac_database.get_collection(param_map.MONGODB_PROFILE_MAP["PAC"])
- om_profile_collection = self.om_database.get_collection(param_map.MONGODB_PROFILE_MAP["OM"])
- for dir_path, dir_names, file_names in os.walk(self.sim_users_path):
- print(dir_names)
- for file_name in file_names:
- if "attempt_" in file_name:
- print(file_name)
- path = os.path.join(dir_path, file_name)
- self.compute_single_file(path, xw_profile_collection, pac_profile_collection, om_profile_collection)
- def accumulate_dimension_profile(cluster_dimension_feature, user_dimension, ratio):
- """
- 将 user 指定维度的特征累加到群画像
- :param cluster_dimension_feature: 群画像某个维度的特征
- :param user_dimension: 用户某个维度的特征
- :param ratio:user 的权重, 公式为相似度 /(相似度 + 10), 区间为 (1/3,10/11)
- :return: 指定维度的群画像
- """ if user_dimension !="":
- user_feature_list = user_dimension.split(",")
- for feature in user_feature_list:
- atom = feature.split(":")
- if len(atom) == 2:
- k = atom[0]
- w = atom[1]
- if cluster_dimension_feature.get(k) is None:
- cluster_dimension_feature[k] = Decimal(w) * ratio
- else:
- cluster_dimension_feature[k] = Decimal(w) * ratio + Decimal(cluster_dimension_feature[k])
- return cluster_dimension_feature
- def dump_cluster_profile_history(self, all_uid, collection, batch_num):
- rs = {}
- for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):
- cursor = collection.find({'_id': {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},
- no_cursor_timeout=True)
- rs.update(cluster_cursor2dic(cursor))
- cursor.close()
- return rs
- def cluster_cursor2dic(mongodb_cursor):
- """
- mongodb 取出的人群画像存到字典表
- :param mongodb_cursor:
- :return:
- """
- users_profile_map = {}
- for user_profile in mongodb_cursor:
- _uid = user_profile["_id"]
- users_profile_map[_uid] = user_profile
- return users_profile_map
- def merge_sim_users(uid_hdf, sim_users_new, decay_factor, similarity_low, similarity_high, old_cluster_profile_dic):
- """
- 合并相似人群
- :param similarity_low: 相似度最低值
- :param similarity_high: 相似度最高值
- :param uid_hdf: 用户编号
- :param sim_users_new: 最新的相似用户
- :param decay_factor: 衰减因子
- :param old_cluster_profile_dic: 老群体画像
- :return: 最新的相似人群
- """
- cluster_union_dic = {}
- # 提取 uid 和相似度到字典表
- for user_similarity in sim_users_new:
- _uid, similarity = split_uid_similarity(user_similarity)
- cluster_union_dic[_uid] = similarity
- # 从 mongodb 中读取老画像
- old = old_cluster_profile_dic.get(uid_hdf)
- if old is not None:
- sim_users_old = old['sim_users']
- for uid_similarity_old in sim_users_old:
- uid_similarity_old_list = uid_similarity_old.split(":")
- if len(uid_similarity_old_list) == 2:
- sim_uid_old = uid_similarity_old_list[0]
- try:
- weight_old = float(uid_similarity_old_list[1]) * float(decay_factor)
- except IndexError:
- pass
- else:
- if (cluster_union_dic.get(sim_uid_old) is None) and (weight_old>= similarity_low):
- cluster_union_dic[sim_uid_old] = weight_old
- else:
- weight_new = weight_old + cluster_union_dic[sim_uid_old]
- if weight_new> similarity_high:
- weight_new = similarity_high
- if weight_new <similarity_low:
- del cluster_union_dic[sim_uid_old]
- else:
- cluster_union_dic[sim_uid_old] = weight_new
- return cluster_union_dic
- def cluster_profile_compute(platform, sim_users_profile_array, sim_users_dic):
- # type: (String, list, dic) -> dic
- """
- 相似人群特征计算
- :param platform: 平台
- :param sim_users_profile_array: 从 mongodb 中查出来的相似人群的画像
- :param sim_users_dic: 相似人群的相似度字典表
- :return: 相似人群画像字典表
- """
- cluster_profile_rs = {}
- for sim_user_obj in sim_users_profile_array:
- key = "name" if platform == "PAC" else "_id"
- sim_user_id = sim_user_obj.get(key)
- # 获取两两用户的相似度
- similarity = sim_users_dic.get(sim_user_id)
- if similarity is not None:
- sim_num = Decimal(similarity)
- # 用户对应的权重
- rate = Decimal(sim_num / (10 + sim_num))
- # 取出某一个人的画像
- profile = sim_user_obj.get("profile") if sim_user_obj.get("profile") is not None else "" dimension_list = profile.split(";")
- i = 0
- for u_dimension in dimension_list:
- # 获取群体维度 i 的特征
- dimension_feature = cluster_profile_rs.get(i)
- if dimension_feature is None:
- dimension_feature = {}
- # 更新维度 i 的特征
- cluster_profile_rs[i] = accumulate_dimension_profile(dimension_feature, u_dimension, rate)
- i = i + 1
- return cluster_profile_rs
- if __name__ == "__main__":
- if len(sys.argv) == 2:
- env = sys.argv[1]
- else:
- env = "local"
- computer = ClusterProfileComputer(env)
- computer.run()
来源: https://juejin.im/post/5b56b3b66fb9a04fa42fb8d0