import streamlit as st import pandas as pd import io from openpyxl import load_workbook from openpyxl.drawing.image import Image as XLImage import numpy as np from datetime import datetime, timedelta import PIL from PIL import Image as PILImage import tempfile import os st.set_page_config(page_title="细分市场", layout="wide") st.title("🪆 细分市场分析") st.markdown(""" 上传卖家精灵数据+“分类”数据,自动得出不同产品分类的月销量等数据对比(如下载结果示例) """) # 添加下载按钮 st.link_button( "📥 数据源示例", "https://objects.mambate.top/example/细分市场数据源示例.xlsx", help="点击下载数据源示例文件" ) st.link_button( "📊 下载结果示例", "https://objects.mambate.top/example/细分市场结果.xlsx", help="点击下载结果示例文件" ) st.markdown(""" 源数据为卖家精灵+“分类”数据(需要新增一列字段名为“分类”的数据) """) uploaded_file = st.file_uploader("上传数据源xlsx文件", type="xlsx") @st.cache_data def read_excel_data(file_bytes, sheet_name=None): """Streamlit缓存装饰器函数,用于读取Excel数据""" excel_io = io.BytesIO(file_bytes) df = pd.read_excel(excel_io, sheet_name=sheet_name) return df @st.cache_data def get_excel_sheets(file_bytes): """Streamlit缓存装饰器函数,用于获取Excel的工作表列表""" excel_io = io.BytesIO(file_bytes) xls = pd.ExcelFile(excel_io) return xls.sheet_names def process_excel(file): # 一次性读取所有字节,只做一次I/O data = file.read() # 获取工作表列表(使用缓存) sheet_names = get_excel_sheets(data) # 默认使用第一个工作簿 sheet_name = sheet_names[0] st.info(f"正在处理工作簿: {sheet_name}") # 读取选定的sheet(使用缓存) df = read_excel_data(data, sheet_name=sheet_name) # 使用openpyxl读取Excel文件以提取图片 wb = load_workbook(io.BytesIO(data), data_only=True) ws = wb[sheet_name] # 显示原始数据 st.subheader("原始数据预览") st.dataframe(df.head(10)) # 找到所有分类 categories = [] for col in df.columns: if isinstance(col, str) and "分类" in col: unique_categories = df[col].dropna().unique().tolist() categories.extend(unique_categories) # 去除重复的分类 categories = list(dict.fromkeys(categories)) # 存储每个分类的图片 category_images = {category: [] for category in categories} # 找到图片列的索引 image_col_indices = [] for col_idx, col_name in enumerate(df.columns): if isinstance(col_name, str) and ("图片" in col_name or "产品图" in col_name or "主图" in col_name): image_col_indices.append(col_idx) # 提取嵌入在Excel中的图片 embedded_images = {} for image in ws._images: row = image.anchor._from.row col = image.anchor._from.col if col in image_col_indices and row > 0: # 跳过标题行 # 获取该行的分类 row_data = df.iloc[row-1] # 因为Excel行号从1开始,pandas从0开始 # 查找该行的分类 for cat_col in df.columns: if isinstance(cat_col, str) and "分类" in cat_col and pd.notna(row_data[cat_col]): category = row_data[cat_col] if category in categories: # 将图片对象添加到对应分类 category_images[category].append(image) # 在网页上展示随机选择的分类图片 st.subheader("各分类随机图片展示") image_cols = st.columns(len(categories)) # 存储用于导出的图片对象 export_images = {} for i, category in enumerate(categories): if category_images[category]: # 从该分类的图片中随机选择一个 random_image_idx = np.random.randint(0, len(category_images[category])) random_image = category_images[category][random_image_idx] # 使用NamedTemporaryFile安全地创建临时文件 with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp_file: # 获取图片数据并保存为文件 img_data = random_image._data() tmp_file.write(img_data) tmp_path = tmp_file.name try: # 在Streamlit界面上显示图片 with image_cols[i]: st.write(f"**{category}**") try: img = PILImage.open(tmp_path) st.image(img, caption=category, use_container_width=True) except Exception as e: st.error(f"无法显示{category}图片: {str(e)}") except Exception as e: with image_cols[i]: st.write(f"**{category}**") st.error(f"图片处理错误: {str(e)}") finally: # 清理临时文件 if os.path.exists(tmp_path): try: os.unlink(tmp_path) except: pass # 存储图片数据用于Excel导出(而非对象引用) export_images[category] = img_data else: with image_cols[i]: st.write(f"**{category}**") st.info(f"没有找到{category}的图片") # 创建新的DataFrame用于结果 result_df = pd.DataFrame() # 第一行:款式 result_df.loc[0, "A"] = "款式" for i, category in enumerate(categories): result_df.loc[0, chr(66 + i)] = category result_df.loc[0, chr(66 + len(categories))] = "总计" # 第二行:图片 result_df.loc[1, "A"] = "图片" # 第三行:月销量 result_df.loc[2, "A"] = "月销量" # 第四行:月销售额 result_df.loc[3, "A"] = "月销售额" # 第五行:平均月销售额 result_df.loc[4, "A"] = "平均月销售额" # 第六行:ASIN数 result_df.loc[5, "A"] = "ASIN数" # 第七行:平均价格 result_df.loc[6, "A"] = "平均价格" # 第八行:平均review result_df.loc[7, "A"] = "平均review" # 第九行:月销量占比 result_df.loc[8, "A"] = "月销量占比" # 第十行:月销售额占比 result_df.loc[9, "A"] = "月销售额占比" # 第十一行:ASIN占比 result_df.loc[10, "A"] = "ASIN占比" # 第十二行:半年新品数量 result_df.loc[11, "A"] = "半年新品数量" # 第十三行:半年新品占比 result_df.loc[12, "A"] = "半年新品占比" # 获取当前日期 current_date = datetime.now() # 计算半年前的日期 half_year_ago = current_date - timedelta(days=180) # 用于存储每个分类的统计数据 category_stats = {} for category in categories: category_stats[category] = { 'sales_volume': 0, # 月销量 'sales_amount': 0, # 月销售额 'asin_count': 0, # ASIN数 'price_sum': 0, # 价格总和 'review_sum': 0, # 评分总和 'new_product_count': 0 # 半年新品数量 } # 计算每个分类的统计数据 for col in df.columns: if isinstance(col, str) and "分类" in col: # 处理每个分类列 for category in categories: # 找到对应分类的行 category_rows = df[df[col] == category] # 计算ASIN数(该分类下的行数) category_stats[category]['asin_count'] += len(category_rows) # 计算半年新品数量 for _, row in category_rows.iterrows(): # 查找上架时间列 for col_name in row.index: if isinstance(col_name, str) and ("上架时间" in col_name or "上架日期" in col_name): try: # 尝试将上架时间转换为日期对象 if pd.notna(row[col_name]): listing_date = pd.to_datetime(row[col_name]) # 如果上架时间在半年内,则计为新品 if listing_date > half_year_ago: category_stats[category]['new_product_count'] += 1 break except: # 如果转换失败,则忽略该行 pass # 查找月销量和月销售额列 for sales_col in df.columns: if isinstance(sales_col, str): if sales_col == "月销量" or (isinstance(sales_col, str) and sales_col.strip() == "月销量"): # 累加月销量(确保为整数) category_stats[category]['sales_volume'] += int(category_rows[sales_col].sum()) elif sales_col.startswith("月销售额"): # 累加月销售额 category_stats[category]['sales_amount'] += category_rows[sales_col].sum() elif sales_col.startswith("价格"): # 累加价格 category_stats[category]['price_sum'] += category_rows[sales_col].sum() elif sales_col.startswith("评分数"): # 累加评分数 category_stats[category]['review_sum'] += category_rows[sales_col].sum() # 计算总计 total_sales_volume = 0 total_sales_amount = 0 total_asin_count = 0 total_price_sum = 0 total_review_sum = 0 # 为每个分类随机选择一个图片 for i, category in enumerate(categories): if category_images[category]: # 从该分类的图片中随机选择一个 random_image_idx = np.random.randint(0, len(category_images[category])) random_image = category_images[category][random_image_idx] # 填充结果DataFrame for i, category in enumerate(categories): # 获取该分类的统计数据 stats = category_stats[category] # 月销量(确保为整数) sales_volume = int(stats['sales_volume']) result_df.loc[2, chr(66 + i)] = sales_volume total_sales_volume += sales_volume # 月销售额 sales_amount = stats['sales_amount'] result_df.loc[3, chr(66 + i)] = sales_amount total_sales_amount += sales_amount # ASIN数 asin_count = stats['asin_count'] result_df.loc[5, chr(66 + i)] = asin_count total_asin_count += asin_count # 平均月销售额 = 月销售额 / ASIN数 avg_sales_amount = sales_amount / asin_count if asin_count > 0 else 0 result_df.loc[4, chr(66 + i)] = avg_sales_amount # 平均价格 = 价格总和 / ASIN数 price_sum = stats['price_sum'] avg_price = price_sum / asin_count if asin_count > 0 else 0 result_df.loc[6, chr(66 + i)] = avg_price total_price_sum += price_sum # 平均review = 评分总和 / ASIN数 review_sum = stats['review_sum'] avg_review = review_sum / asin_count if asin_count > 0 else 0 result_df.loc[7, chr(66 + i)] = avg_review total_review_sum += review_sum # 添加总计列 result_df.loc[2, chr(66 + len(categories))] = int(total_sales_volume) # 月销量总计(确保为整数) result_df.loc[3, chr(66 + len(categories))] = total_sales_amount # 月销售额总计 result_df.loc[5, chr(66 + len(categories))] = total_asin_count # ASIN数总计 # 总体平均月销售额 = 总月销售额 / 总ASIN数 total_avg_sales_amount = total_sales_amount / total_asin_count if total_asin_count > 0 else 0 result_df.loc[4, chr(66 + len(categories))] = total_avg_sales_amount # 总体平均价格 = 总价格总和 / 总ASIN数 total_avg_price = total_price_sum / total_asin_count if total_asin_count > 0 else 0 result_df.loc[6, chr(66 + len(categories))] = total_avg_price # 总体平均review = 总review总和 / 总ASIN数 total_avg_review = total_review_sum / total_asin_count if total_asin_count > 0 else 0 result_df.loc[7, chr(66 + len(categories))] = total_avg_review # 计算月销量占比和月销售额占比 for i, category in enumerate(categories): # 月销量占比 sales_volume = int(result_df.loc[2, chr(66 + i)]) # 确保为整数 sales_volume_percentage = (sales_volume / total_sales_volume * 100) if total_sales_volume > 0 else 0 result_df.loc[8, chr(66 + i)] = sales_volume_percentage # 月销售额占比 sales_amount = float(result_df.loc[3, chr(66 + i)]) sales_amount_percentage = (sales_amount / total_sales_amount * 100) if total_sales_amount > 0 else 0 result_df.loc[9, chr(66 + i)] = sales_amount_percentage # 计算半年新品总数 total_new_product_count = 0 for category in categories: new_product_count = category_stats[category]['new_product_count'] result_df.loc[11, chr(66 + categories.index(category))] = new_product_count total_new_product_count += new_product_count # 添加半年新品总数到总计列 result_df.loc[11, chr(66 + len(categories))] = total_new_product_count # 计算ASIN占比 for i, category in enumerate(categories): asin_count = float(result_df.loc[5, chr(66 + i)]) asin_percentage = (asin_count / total_asin_count * 100) if total_asin_count > 0 else 0 result_df.loc[10, chr(66 + i)] = asin_percentage # 计算半年新品占比 for i, category in enumerate(categories): new_product_count = float(result_df.loc[11, chr(66 + i)]) new_product_percentage = (new_product_count / total_new_product_count * 100) if total_new_product_count > 0 else 0 result_df.loc[12, chr(66 + i)] = new_product_percentage # 总计列的占比都是100% result_df.loc[8, chr(66 + len(categories))] = 100 # 月销量占比 result_df.loc[9, chr(66 + len(categories))] = 100 # 月销售额占比 result_df.loc[10, chr(66 + len(categories))] = 100 # ASIN占比 result_df.loc[12, chr(66 + len(categories))] = 100 # 半年新品占比 # 将数据框转换为字符串类型以避免Arrow兼容性问题 # 但先保存一个数值版本用于Excel导出 result_df_numeric = result_df.copy() # 显示结果(转换为字符串以避免Arrow兼容性问题) st.subheader("处理后的数据") # 格式化数值显示 display_df = result_df.copy() for i in range(2, 13): # 处理第3-13行(索引2-12)的数值格式 for j in range(1, len(categories) + 2): # B列到最后一列 col = chr(65 + j) if i in [4, 6, 7]: # 平均月销售额、平均价格、平均review保留2位小数 try: display_df.loc[i, col] = f"{float(display_df.loc[i, col]):.2f}" except: pass elif i in [8, 9, 10, 12]: # 月销量占比、月销售额占比、ASIN占比、半年新品占比保留2位小数并添加%符号 try: display_df.loc[i, col] = f"{float(display_df.loc[i, col]):.2f}%" except: pass elif i in [2, 3, 5, 11]: # 销量、销售额、ASIN数、半年新品数量使用千分位分隔符 try: display_df.loc[i, col] = f"{int(float(display_df.loc[i, col])):,}" except: pass st.dataframe(display_df) # 提供下载功能 # 创建一个新的BytesIO对象 output = io.BytesIO() # 预处理所有数据 export_df = result_df_numeric.copy() # 1. 预处理百分比数据 for i in [8, 9, 10, 12]: # 月销量占比、月销售额占比、ASIN占比、半年新品占比 for j in range(1, len(categories) + 2): col = chr(65 + j) try: cell_value = export_df.loc[i, col] if pd.notna(cell_value): # 将百分比值除以100转换为小数 export_df.loc[i, col] = float(cell_value) / 100 except: pass # 2. 处理小数值(平均月销售额、平均价格、平均review) for i in [4, 6, 7]: # 平均月销售额、平均价格、平均review for j in range(1, len(categories) + 2): col = chr(65 + j) try: cell_value = export_df.loc[i, col] if pd.notna(cell_value): # 将小数值四舍五入为两位小数 export_df.loc[i, col] = round(float(cell_value), 2) except: pass # 3. 处理销售额(保留两位小数) for j in range(1, len(categories) + 2): col = chr(65 + j) try: cell_value = export_df.loc[3, col] # 月销售额在第4行(索引3) if pd.notna(cell_value): export_df.loc[3, col] = round(float(cell_value), 2) except: pass # 写入Excel with pd.ExcelWriter(output, engine='openpyxl') as writer: # 直接写入数据,不包含表头 export_df.to_excel(writer, sheet_name="汇总结果", index=False, header=False) # 获取工作表 workbook = writer.book worksheet = writer.sheets["汇总结果"] # 设置图片行的高度为150 worksheet.row_dimensions[2].height = 150 # 设置A列宽度为15 worksheet.column_dimensions['A'].width = 15 # 设置所有分类列的宽度为25 for i in range(len(categories) + 1): col_letter = chr(66 + i) # B, C, D... worksheet.column_dimensions[col_letter].width = 25 # Excel列宽单位约为7像素 # 将已经在网页上显示的随机图片添加到工作表中 for i, category in enumerate(categories): if category in export_images: # 使用之前选择的随机图片,但重新创建新的Image对象 img_data = export_images[category] img_cell = chr(66 + i) + "2" # B2, C2, D2...等单元格 # 重新创建XLImage对象而不是直接使用原来的 new_img = XLImage(io.BytesIO(img_data)) worksheet.add_image(new_img, img_cell) # 导入所需的样式模块 from openpyxl.styles import numbers, Font, Border, Side, Alignment # 定义边框样式 - 实线边框 thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) # 定义粗体字体 bold_font = Font(bold=True) # 对A列标题加粗 for i in range(13): # 假设有13行数据 cell = worksheet.cell(row=i+1, column=1) # A列 cell.font = bold_font # 为所有单元格添加边框 # 获取工作表的最大行和列 max_row = 13 # 根据您的数据结构,可能需要调整 max_col = len(categories) + 2 # A列 + 所有分类列 + 总计列 for row in range(1, max_row + 1): for col in range(1, max_col + 1): cell = worksheet.cell(row=row, column=col) cell.border = thin_border # 对所有数值应用格式 # 1. 对百分比列应用格式 for i in [8, 9, 10, 12]: # 月销量占比、月销售额占比、ASIN占比、半年新品占比 for j in range(1, len(categories) + 2): # 计算单元格位置(不考虑标题行) cell = worksheet.cell(row=i+1, column=j+1) # +1是因为没有标题行,只考虑索引从0开始 # 设置为百分比格式 cell.number_format = '0.00%' # 2. 对小数值应用格式(平均月销售额、平均价格、平均review) for i in [4, 6, 7]: # 平均月销售额、平均价格、平均review for j in range(1, len(categories) + 2): cell = worksheet.cell(row=i+1, column=j+1) cell.number_format = '0.00' # 3. 对整数值应用千分位格式(月销量、月销售额、ASIN数、半年新品数量) for i in [2, 3, 5, 11]: # 销量、销售额、ASIN数、半年新品数量 for j in range(1, len(categories) + 2): cell = worksheet.cell(row=i+1, column=j+1) # 对于销售额(i=3),设置为带两位小数的千分位格式 if i == 3: cell.number_format = '#,##0.00' else: cell.number_format = '#,##0' # 确保output已经被正确写入并准备好读取 output.seek(0) # 使用download_button提供下载功能 st.download_button( label="下载处理后的Excel文件", data=output, file_name="处理后的数据.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) if uploaded_file is not None: process_excel(uploaded_file) else: st.info("请上传一个Excel文件")