File size: 8,435 Bytes
b321188 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# %%
import pandas as pd
import numpy as np
import ast
from pymilvus import MilvusClient
from sklearn.metrics.pairwise import cosine_similarity
# %%
# 假设 MilvusClient、DataType、FieldSchema、CollectionSchema 已经正确导入
# 例如:
# from your_milvus_module import MilvusClient, DataType, FieldSchema, CollectionSchema
# ----------------------------
# 1. 从 CSV 中读取预计算的评分相似度数据(items_similar.csv)
def compute_rating_similarity(csv_path):
"""
从 csv 文件中读取预计算的商品评分相似度数据.
CSV 文件应包含两列:
asin, top50_similar
其中 top50_similar 是字符串形式的列表,每个元素为 (asin, sim_score)。
返回:
rating_sim: dict, 格式为 {asin: {asin2: sim_score, ...} }
"""
df = pd.read_csv(csv_path)
rating_sim = {}
for _, row in df.iterrows():
asin = row["asin"]
top50_similar_str = row["top50_similar"]
try:
top50_similar_list = ast.literal_eval(top50_similar_str)
except Exception as e:
top50_similar_list = []
# 将列表转换为字典形式
rating_sim[asin] = {item[0]: item[1] for item in top50_similar_list}
return rating_sim
# ----------------------------
# 2. Milvus 查询模块:图片和描述相似度
def query_image_similarity(client, asin, top_k=50):
"""
查询指定 asin 对应的图片向量,并在 Milvus 中搜索相似商品(图片相似度)。
返回:
字典格式 {asin: sim_score}
其中 sim_score 采用 COSINE 指标,计算方式: sim_score = 1 - hit.distance
"""
query_expr = f"asin == '{asin}'"
query_res = client.query(
collection_name="image_embeddings",
filter=query_expr,
output_fields=["embedding"],
)
if not query_res:
return {}
target_embedding = query_res[0]["embedding"]
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
search_results = client.search(
collection_name="image_embeddings",
data=[target_embedding],
anns_field="embedding",
search_params=search_params,
limit=top_k,
filter=f"asin != '{asin}'", # 排除自身
)
sim_dict = {}
for hit in search_results[0]:
sim_asin = hit.entity.get("asin")
sim_score = 1 - hit.distance
sim_dict[sim_asin] = sim_score
return sim_dict
def query_desc_similarity(client, asin, top_k=50):
"""
查询指定 asin 对应的描述向量,并在 Milvus 中搜索相似商品(描述相似度)。
使用您提供的描述向量 schema,假设集合名称为 "metadata_embeddings"。
返回:
字典格式 {asin: sim_score},sim_score = 1 - hit.distance
"""
query_expr = f"asin == '{asin}'"
query_res = client.query(
collection_name="metadata_embeddings",
filter=query_expr,
output_fields=["embedding"],
)
if not query_res:
return {}
target_embedding = query_res[0]["embedding"]
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
search_results = client.search(
collection_name="metadata_embeddings",
data=[target_embedding],
anns_field="embedding",
search_params=search_params,
limit=top_k,
filter=f"asin != '{asin}'", # 排除自身
)
sim_dict = {}
for hit in search_results[0]:
sim_asin = hit['id']
sim_score = 1 - hit['distance']
sim_dict[sim_asin] = sim_score
return sim_dict
def query_milvus_similarity(client, asin, similarity_type="image", top_k=50):
"""
根据 similarity_type 参数调用不同的 Milvus 查询:
- "image":基于图片的查询
- "description":基于描述的查询
"""
if similarity_type == "image":
return query_image_similarity(client, asin, top_k)
elif similarity_type == "description":
return query_desc_similarity(client, asin, top_k)
else:
return {}
# ----------------------------
# 3. 混合相似度计算(利用预读取评分相似度及 Milvus 查询结果)
def get_hybrid_similarity(
asin1, asin2, rating_sim_dict, weights, client, milvus_cache=None
):
"""
计算 asin1 与 asin2 之间的混合相似度。
参数:
asin1, asin2: 商品标识符
rating_sim_dict: 从 CSV 中读取的评分相似度字典
weights: 各部分的权重字典,例如 {"rating": 0.6, "image": 0.2, "description": 0.2}
client: Milvus 客户端对象
milvus_cache: 缓存字典,用以减少重复查询
返回:
混合相似度分值
"""
# ① 获取评分相似度:从预计算字典中查找
rating_score = 0
if asin1 in rating_sim_dict:
rating_score = rating_sim_dict[asin1].get(asin2, 0)
if milvus_cache is None:
milvus_cache = {}
# ② 图片相似度
if (asin1, "image") in milvus_cache:
image_sim_dict = milvus_cache[(asin1, "image")]
else:
image_sim_dict = query_milvus_similarity(
client, asin1, similarity_type="image", top_k=50
)
milvus_cache[(asin1, "image")] = image_sim_dict
image_score = image_sim_dict.get(asin2, 0)
# ③ 描述相似度
if (asin1, "description") in milvus_cache:
desc_sim_dict = milvus_cache[(asin1, "description")]
else:
desc_sim_dict = query_milvus_similarity(
client, asin1, similarity_type="description", top_k=50
)
milvus_cache[(asin1, "description")] = desc_sim_dict
desc_score = desc_sim_dict.get(asin2, 0)
# 融合三部分得分
hybrid_score = (
weights.get("rating", 0) * rating_score
+ weights.get("image", 0) * image_score
+ weights.get("description", 0) * desc_score
)
return hybrid_score
# ----------------------------
# 4. 推荐函数:为用户生成个性化推荐
def recommend_for_user(
user_id,
user_rating_df,
rating_sim_dict,
weights,
client,
milvus_cache=None,
top_n=10,
):
"""
根据用户的历史评分和混合相似度为用户生成推荐。
参数:
user_id: 用户标识
user_rating_df: 包含 user_id, asin, rating 等信息的 DataFrame
rating_sim_dict: 预计算的评分相似度字典(从 CSV 读取)
weights: 各模块混合相似度的权重
client: Milvus 客户端对象
milvus_cache: 缓存字典(可选)
top_n: 返回推荐的商品数量
返回:
推荐列表,每个元素为 (asin, score)
"""
# 找出该用户已评分的商品
rated_items = set(user_rating_df[user_rating_df["user_id"] == user_id]["asin"])
# 候选商品:在评分相似度字典中出现,但用户未评分的
candidate_items = set(rating_sim_dict.keys()) - rated_items
scores = {}
for candidate in candidate_items:
total_score = 0
count = 0
for rated in rated_items:
sim = get_hybrid_similarity(
rated, candidate, rating_sim_dict, weights, client, milvus_cache
)
total_score += sim
count += 1
avg_score = total_score / count if count > 0 else 0
scores[candidate] = avg_score
# 按照得分排序,返回 Top-N 推荐
recommended = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_n]
return recommended
# %%
# ① 加载用户评分数据,CSV 文件中应至少包含 'user_id' 和 'asin' 字段
user_rating_df = pd.read_csv("ratings.csv")
# ② 从预计算的 csv 文件中读取评分相似度数据(items_similar.csv)
rating_sim_dict = compute_rating_similarity("items_similar.csv")
# ③ 配置混合相似度的权重
weights = {"rating": 0.6, "image": 0.2, "description": 0.2}
# %%
# ④ 初始化 Milvus 客户端
client = MilvusClient(uri="./Amazon_electronics.db")
# ⑤ 初始化 Milvus 查询缓存字典
milvus_cache = {}
# %%
# ⑥ 指定目标用户(替换为实际用户ID)
target_user = "A192HO2ICJ75VU"
recommendations = recommend_for_user(
target_user,
user_rating_df,
rating_sim_dict,
weights,
client,
milvus_cache,
top_n=10,
)
print(f"为用户 {target_user} 推荐的商品列表:")
for asin, score in recommendations:
print(f"ASIN: {asin},得分: {score:.4f}")
|