Spaces:
Running
Running
| # 云端Space代码/router_tasks.py | |
| # ========================================== | |
| # 📝 任务榜API路由 | |
| # ========================================== | |
| # 功能:任务发布、接单、提交、验收、申诉全流程 | |
| # 状态:open → in_progress → submitted → completed/disputed | |
| # 💳 P6支付增强: | |
| # - 发布时冻结全额,避免支付不足 | |
| # - 记录每笔交易流水 | |
| # - 超时自动退款 | |
| # - 支付通知推送 | |
| # ========================================== | |
| from fastapi import APIRouter, HTTPException, Depends, Query | |
| from sqlalchemy.orm import Session | |
| from models import TaskCreate, TaskUpdate, TaskApply, TaskAssign, TaskSubmit, TaskAccept | |
| import 数据库连接 as db | |
| from 安全认证 import require_auth, is_admin | |
| from notifications import add_notification | |
| from database_sql import get_db | |
| from models_sql import Wallet, Transaction | |
| from db_utils import record_view, sort_cache | |
| import time | |
| import uuid | |
| import hashlib | |
| import logging | |
| router = APIRouter() | |
| # 📝 审计日志 | |
| logger = logging.getLogger("ComfyUI-Ranking.Tasks") | |
| # ========================================== | |
| # 💳 交易记录工具函数 | |
| # ========================================== | |
| def calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash): | |
| data = f"{tx_id}{account}{tx_type}{amount}{prev_hash}" | |
| return hashlib.sha256(data.encode()).hexdigest() | |
| def create_task_transaction(db_session: Session, account: str, tx_type: str, amount: int, | |
| related_account: str = None, task_id: str = None, | |
| task_title: str = None, related_user_name: str = None): | |
| """ | |
| 创建任务相关交易记录 | |
| tx_type: TASK_FREEZE, TASK_DEPOSIT, TASK_PAYMENT, TASK_INCOME, TASK_REFUND | |
| """ | |
| tx_id = f"TASK_{tx_type}_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| last_tx = db_session.query(Transaction).filter(Transaction.account == account).order_by(Transaction.created_at.desc()).first() | |
| prev_hash = last_tx.tx_hash if last_tx else "GENESIS_HASH" | |
| tx_hash = calculate_tx_hash(tx_id, account, tx_type, amount, prev_hash) | |
| # 根据 tx_type 构造 description | |
| type_labels = { | |
| "TASK_FREEZE": "任务冻结", | |
| "TASK_DEPOSIT": "任务订金", | |
| "TASK_PAYMENT": "任务尾款", | |
| "TASK_INCOME": "任务收入", | |
| "TASK_REFUND": "任务退款", | |
| "TASK_CANCEL_REFUND": "任务取消退款", | |
| } | |
| label = type_labels.get(tx_type, tx_type) | |
| desc = f"{label}: {task_title}" if task_title else label | |
| new_tx = Transaction( | |
| tx_id=tx_id, | |
| account=account, | |
| tx_type=tx_type, | |
| amount=amount, | |
| related_account=related_account, | |
| item_id=task_id, # 复用 item_id 字段存储任务ID | |
| prev_hash=prev_hash, | |
| tx_hash=tx_hash, | |
| description=desc, | |
| item_title=task_title, | |
| item_type="task", | |
| related_user_name=related_user_name | |
| ) | |
| db_session.add(new_tx) | |
| logger.info(f"TASK_TX | type={tx_type} | account={account} | amount={amount} | task={task_id} | tx={tx_id}") | |
| return tx_id | |
| # ========================================== | |
| # 📋 任务状态常量 | |
| # ========================================== | |
| TASK_STATUS = { | |
| "open": "开放接单", | |
| "in_progress": "进行中", | |
| "submitted": "待验收", | |
| "completed": "已完成", | |
| "disputed": "争议中", | |
| "cancelled": "已取消", | |
| "expired": "已过期" | |
| } | |
| # ========================================== | |
| # 📝 任务CRUD接口 | |
| # ========================================== | |
| # ========================================== | |
| # 🔄 过期任务自动处理 + 自动退款 | |
| # ========================================== | |
| import datetime | |
| def check_and_update_expired_tasks(tasks_db, db_session=None): | |
| """ | |
| 检查并更新过期任务状态 | |
| - open 状态且超过截止日期:自动取消,退还冻结金额 | |
| - in_progress 状态且超过截止日期:标记为过期(不自动取消,需双方处理) | |
| 💳 P6支付增强:过期时自动退款 | |
| 💳 P0修复:两阶段提交,确保SQL事务与JSON修改原子性 | |
| """ | |
| today = datetime.date.today().isoformat() # "2026-03-30" | |
| updated = False | |
| expired_task_indices = [] # 记录过期任务的索引 | |
| # ========== 第一阶段:预检查(只读,收集需要退款的任务列表) ========== | |
| for idx, task in enumerate(tasks_db): | |
| deadline = task.get("deadline", "") | |
| status = task.get("status", "") | |
| if not deadline: | |
| continue | |
| # 检查是否过期 | |
| if deadline < today: | |
| if status == "open": | |
| # 开放接单状态且过期:记录需要处理 | |
| frozen_amount = task.get("frozen_amount", task.get("total_price", 0)) | |
| expired_task_indices.append({ | |
| "index": idx, | |
| "task_id": task["id"], | |
| "publisher": task.get("publisher"), | |
| "amount": frozen_amount, | |
| "title": task.get("title", "") | |
| }) | |
| updated = True | |
| elif status == "in_progress": | |
| # 进行中且过期:直接标记(不涉及资金) | |
| task["is_overdue"] = True | |
| updated = True | |
| # 如果没有过期任务,直接返回 | |
| if not expired_task_indices: | |
| return updated | |
| # 如果没有 db_session,只标记状态,不处理退款 | |
| if not db_session: | |
| for item in expired_task_indices: | |
| task = tasks_db[item["index"]] | |
| task["status"] = "expired" | |
| task["expired_at"] = int(time.time()) | |
| # 不标记退款,等待下次带 db_session 的调用 | |
| return updated | |
| # ========== 第二阶段:执行SQL事务(钱包退款+交易记录) ========== | |
| refund_results = [] # 记录退款成功的任务 | |
| sql_committed = False | |
| try: | |
| for item in expired_task_indices: | |
| if item["amount"] > 0: | |
| wallet = db_session.query(Wallet).filter(Wallet.account == item["publisher"]).with_for_update().first() | |
| if wallet: | |
| wallet.frozen_balance = max(0, wallet.frozen_balance - item["amount"]) | |
| wallet.balance += item["amount"] | |
| # 记录退款交易 | |
| create_task_transaction( | |
| db_session, item["publisher"], "TASK_REFUND", | |
| item["amount"], task_id=item["task_id"], | |
| task_title=item["title"] | |
| ) | |
| refund_results.append(item) | |
| # 💳 P0修复:先 commit SQL 事务 | |
| db_session.commit() | |
| sql_committed = True | |
| logger.info(f"TASK_EXPIRED_SQL_COMMIT | refunded_tasks={len(refund_results)}") | |
| except Exception as e: | |
| # 💳 P0修复:SQL事务失败,回滚所有操作,不修改JSON | |
| db_session.rollback() | |
| logger.error(f"TASK_EXPIRED_SQL_ROLLBACK | error={str(e)}") | |
| # 不修改 JSON,下次再试 | |
| return False | |
| # ========== 第三阶段:SQL成功后修改JSON任务状态 ========== | |
| if sql_committed: | |
| try: | |
| for item in expired_task_indices: | |
| task = tasks_db[item["index"]] | |
| task["status"] = "expired" | |
| task["expired_at"] = int(time.time()) | |
| if item["amount"] > 0: | |
| task["refunded"] = True | |
| task["refund_amount"] = item["amount"] | |
| # 💳 P0修复:保存JSON数据 | |
| db.save_data("tasks.json", tasks_db) | |
| logger.info(f"TASK_EXPIRED_JSON_SAVED | tasks={len(expired_task_indices)}") | |
| except Exception as e: | |
| # 💳 P0修复:JSON保存失败,记录warning但不回滚SQL(资金已转移,可后续手动恢复) | |
| logger.warning(f"TASK_EXPIRED_JSON_SAVE_FAILED | error={str(e)} | 资金已退款但任务状态未更新,需手动修复") | |
| # 不抛出异常,因为SQL事务已经成功,资金已经转移 | |
| # 发送退款通知(在事务外执行,失败不影响主流程) | |
| for item in refund_results: | |
| try: | |
| add_notification(item["publisher"], { | |
| "type": "task_refund", | |
| "from_user": "system", | |
| "target_item_id": item["task_id"], | |
| "target_item_title": item["title"], | |
| "content": f"💰 任务《{item['title']}》已过期自动取消,{item['amount']}积分已退还" | |
| }) | |
| logger.info(f"TASK_REFUND | publisher={item['publisher']} | task={item['task_id']} | amount={item['amount']}") | |
| except Exception as e: | |
| logger.warning(f"TASK_REFUND_NOTIFY_ERROR | task={item['task_id']} | error={str(e)}") | |
| return updated | |
| async def get_tasks( | |
| page: int = 1, | |
| limit: int = 20, | |
| status: str = None, | |
| sort: str = "latest", | |
| db_session: Session = Depends(get_db) | |
| ): | |
| """ | |
| 获取任务列表(分页) | |
| - status: 筛选状态(open/in_progress/completed/expired等) | |
| - sort: latest(最新)/price(价格高)/deadline(截止日期近) | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # 自动检查并更新过期任务(带自动退款) | |
| if check_and_update_expired_tasks(tasks_db, db_session): | |
| db.save_data("tasks.json", tasks_db) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| # 状态筛选(默认排除已取消和过期的) | |
| filtered = tasks_db | |
| if status: | |
| filtered = [t for t in filtered if t.get("status") == status] | |
| else: | |
| # 默认不显示已取消和过期的任务 | |
| filtered = [t for t in filtered if t.get("status") not in ["cancelled", "expired"]] | |
| # 🗂️ 使用排序缓存优化排序性能 | |
| cache_key = f"tasks:{status or 'all'}:{sort}" | |
| def sort_fn(data): | |
| if sort == "price": | |
| data.sort(key=lambda x: x.get("total_price", 0), reverse=True) | |
| elif sort == "deadline": | |
| data.sort(key=lambda x: x.get("deadline", "9999")) | |
| elif sort == "views": # 👁️ 按总访问量排序 | |
| data.sort(key=lambda x: x.get("views", 0), reverse=True) | |
| elif sort == "daily_views": # 👁️ 按日访问量排序 | |
| data.sort(key=lambda x: x.get("daily_views", 0), reverse=True) | |
| elif sort == "likes": # ❤️ 按点赞数排序 | |
| data.sort(key=lambda x: x.get("likes", 0), reverse=True) | |
| elif sort == "favorites": # ❤️ 按收藏数排序 | |
| data.sort(key=lambda x: x.get("favorites", 0), reverse=True) | |
| else: # latest | |
| data.sort(key=lambda x: x.get("created_at", 0), reverse=True) | |
| filtered = sort_cache.get_sorted(cache_key, filtered, sort_fn) | |
| # 分页 | |
| start = (page - 1) * limit | |
| end = start + limit | |
| paged = filtered[start:end] | |
| # 附加发布者信息 | |
| result = [] | |
| for task in paged: | |
| publisher_info = user_map.get(task.get("publisher"), {}) | |
| # 👁️ 过滤敏感字段(列表接口过滤 liked_by, favorited_by, viewed_by) | |
| task_data = {k: v for k, v in task.items() if k not in ["viewed_by", "liked_by", "favorited_by"]} | |
| result.append({ | |
| **task_data, | |
| "publisher_name": publisher_info.get("name", task.get("publisher")), | |
| "publisher_avatar": publisher_info.get("avatarDataUrl", ""), | |
| "status_text": TASK_STATUS.get(task.get("status"), "未知") | |
| }) | |
| return { | |
| "status": "success", | |
| "data": result, | |
| "total": len(filtered), | |
| "page": page, | |
| "limit": limit | |
| } | |
| async def get_task_detail(task_id: str, current_user: str = None): | |
| """ | |
| 获取任务详情 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| publisher_info = user_map.get(task.get("publisher"), {}) | |
| assignee_info = user_map.get(task.get("assignee"), {}) if task.get("assignee") else None | |
| # 构建申请者列表(带用户信息) | |
| applicants_with_info = [] | |
| for app in task.get("applicants", []): | |
| app_user = user_map.get(app.get("account"), {}) | |
| applicants_with_info.append({ | |
| **app, | |
| "name": app_user.get("name", app.get("account")), | |
| "avatar": app_user.get("avatarDataUrl", "") | |
| }) | |
| # 👁️ 过滤敏感字段(详情接口保留 liked_by 和 favorited_by,过滤 viewed_by) | |
| task_data = {k: v for k, v in task.items() if k != "viewed_by"} | |
| return { | |
| "status": "success", | |
| "data": { | |
| **task_data, | |
| "publisher_name": publisher_info.get("name", task.get("publisher")), | |
| "publisher_avatar": publisher_info.get("avatarDataUrl", ""), | |
| "assignee_name": assignee_info.get("name") if assignee_info else None, | |
| "assignee_avatar": assignee_info.get("avatarDataUrl") if assignee_info else None, | |
| "applicants": applicants_with_info, | |
| "status_text": TASK_STATUS.get(task.get("status"), "未知") | |
| } | |
| } | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| async def create_task(task: TaskCreate, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): | |
| """ | |
| 发布新任务 | |
| 💳 P6支付增强:发布时冻结全额,避免支付时余额不足 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| # 验证订金比例 | |
| if task.depositRatio not in [10, 20, 30, 50]: | |
| raise HTTPException(status_code=400, detail="订金比例必须是 10/20/30/50 之一") | |
| # 验证价格 | |
| if task.totalPrice < 0: | |
| raise HTTPException(status_code=400, detail="任务价格不能低于0积分") | |
| # 💳 使用SQL钱包检查余额并冻结 | |
| wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() | |
| if not wallet: | |
| wallet = Wallet(account=current_user, balance=0) | |
| db_session.add(wallet) | |
| db_session.flush() | |
| if wallet.balance < task.totalPrice: | |
| raise HTTPException(status_code=400, detail=f"余额不足,需要{task.totalPrice}积分,当前{wallet.balance}积分") | |
| # 💳 冻结全额(从余额转移到冻结余额) | |
| wallet.balance -= task.totalPrice | |
| wallet.frozen_balance += task.totalPrice | |
| # 计算订金金额 | |
| deposit_amount = int(task.totalPrice * task.depositRatio / 100) | |
| task_id = f"task_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| new_task = { | |
| "id": task_id, | |
| "title": task.title, | |
| "description": task.description, | |
| "reference_images": (task.referenceImages or [])[:6], | |
| "reference_link": task.referenceLink, | |
| "total_price": task.totalPrice, | |
| "deposit_ratio": task.depositRatio, | |
| "deposit_amount": deposit_amount, | |
| "deadline": task.deadline, | |
| "publisher": current_user, | |
| "status": "open", | |
| "created_at": int(time.time()), | |
| # 接单相关 | |
| "applicants": [], # 申请接单的用户列表 | |
| "assignee": None, # 指派的接单者 | |
| "assigned_at": None, # 指派时间 | |
| # 交付相关 | |
| "deliverables": [], # 提交的成果 | |
| "submit_note": None, # 提交备注 | |
| "submitted_at": None, # 提交时间 | |
| # 验收相关 | |
| "completed_at": None, # 完成时间 | |
| "feedback": None, # 验收反馈 | |
| # 申诉相关 | |
| "dispute_id": None, # 关联的申诉ID | |
| # 💳 支付状态 | |
| "frozen_amount": task.totalPrice, # 已冻结金额 | |
| # 互动数据 | |
| "likes": 0, | |
| "favorites": 0, | |
| "comments": 0, | |
| "liked_by": [], | |
| "favorited_by": [], | |
| "tip_board": [], # 打赏榜单 | |
| "views": 0, | |
| "daily_views": 0, | |
| "viewed_by": [], | |
| "daily_views_date": "" | |
| } | |
| tasks_db.insert(0, new_task) | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("tasks:") | |
| # 💳 记录冻结交易 | |
| create_task_transaction( | |
| db_session, current_user, "TASK_FREEZE", | |
| -task.totalPrice, task_id=task_id, | |
| task_title=task.title | |
| ) | |
| db_session.commit() | |
| logger.info(f"TASK_CREATE | publisher={current_user} | task={task_id} | price={task.totalPrice} | frozen={task.totalPrice}") | |
| return {"status": "success", "data": new_task, "frozen_amount": task.totalPrice} | |
| async def update_task(task_id: str, update_data: TaskUpdate, current_user: str = Depends(require_auth)): | |
| """ | |
| 更新任务(仅发布者可操作) | |
| - open状态:可修改所有字段 | |
| - in_progress/submitted状态:只能修改标题/描述/参考图/链接,不能改价格,并通知接单人 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| if task.get("publisher") != current_user: | |
| raise HTTPException(status_code=403, detail="无权修改他人任务") | |
| status = task.get("status") | |
| if status not in ["open", "in_progress", "submitted"]: | |
| raise HTTPException(status_code=400, detail="当前状态不允许修改") | |
| # 记录修改内容(用于通知) | |
| changes = [] | |
| if update_data.title is not None and update_data.title != task.get("title"): | |
| task["title"] = update_data.title | |
| changes.append("标题") | |
| if update_data.description is not None and update_data.description != task.get("description"): | |
| task["description"] = update_data.description | |
| changes.append("描述") | |
| if update_data.referenceImages is not None: | |
| task["reference_images"] = update_data.referenceImages[:6] | |
| changes.append("参考图") | |
| if update_data.referenceLink is not None and update_data.referenceLink != task.get("reference_link"): | |
| task["reference_link"] = update_data.referenceLink | |
| changes.append("参考链接") | |
| # 价格和订金只能在open状态修改 | |
| if status == "open": | |
| if update_data.totalPrice is not None: | |
| # 验证价格 | |
| if update_data.totalPrice < 0: | |
| raise HTTPException(status_code=400, detail="任务价格不能低于0积分") | |
| task["total_price"] = update_data.totalPrice | |
| task["deposit_amount"] = int(update_data.totalPrice * task["deposit_ratio"] / 100) | |
| changes.append("价格") | |
| if update_data.depositRatio is not None and update_data.depositRatio in [10, 20, 30, 50]: | |
| task["deposit_ratio"] = update_data.depositRatio | |
| task["deposit_amount"] = int(task["total_price"] * update_data.depositRatio / 100) | |
| changes.append("订金比例") | |
| if update_data.deadline is not None: | |
| task["deadline"] = update_data.deadline | |
| changes.append("截止日期") | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("tasks:") | |
| # 📢 如果有接单人,发送通知 | |
| if task.get("assignee") and changes: | |
| add_notification(task.get("assignee"), { | |
| "type": "task_updated", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task.get("title", ""), | |
| "content": f"任务《{task.get('title', '')}》已更新:{'、'.join(changes)}" | |
| }) | |
| return {"status": "success"} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| async def cancel_task(task_id: str, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): | |
| """ | |
| 取消任务(仅发布者可操作,且仅在 open 状态时可取消) | |
| 💳 P0修复:取消时退还冻结资金 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| if task.get("publisher") != current_user: | |
| raise HTTPException(status_code=403, detail="无权取消他人任务") | |
| if task.get("status") != "open": | |
| raise HTTPException(status_code=400, detail="只能取消开放状态的任务") | |
| # 💳 P0修复:退还冻结资金 | |
| frozen_amount = task.get("frozen_amount", task.get("total_price", 0)) | |
| refund_success = False | |
| if frozen_amount > 0: | |
| try: | |
| wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() | |
| if wallet: | |
| wallet.frozen_balance = max(0, wallet.frozen_balance - frozen_amount) | |
| wallet.balance += frozen_amount | |
| # 记录退款交易 | |
| create_task_transaction( | |
| db_session, current_user, "TASK_CANCEL_REFUND", | |
| frozen_amount, task_id=task_id, | |
| task_title=task.get("title") | |
| ) | |
| db_session.commit() | |
| refund_success = True | |
| logger.info(f"TASK_CANCEL_REFUND | user={current_user} | task={task_id} | amount={frozen_amount}") | |
| except Exception as e: | |
| db_session.rollback() | |
| logger.error(f"TASK_CANCEL_REFUND_ERROR | task={task_id} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="退款失败,请稍后重试") | |
| else: | |
| refund_success = True # 无需退款 | |
| # 只有退款成功才更新状态 | |
| if refund_success: | |
| task["status"] = "cancelled" | |
| task["cancelled_at"] = int(time.time()) | |
| task["refunded"] = frozen_amount > 0 | |
| task["refund_amount"] = frozen_amount | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("tasks:") | |
| return {"status": "success", "refunded_amount": frozen_amount} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # ========================================== | |
| # 🙋 申请接单 | |
| # ========================================== | |
| async def apply_task(task_id: str, message: str = None, current_user: str = Depends(require_auth)): | |
| """ | |
| 申请接单 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| if task.get("status") != "open": | |
| raise HTTPException(status_code=400, detail="该任务不在开放接单状态") | |
| if task.get("publisher") == current_user: | |
| raise HTTPException(status_code=400, detail="不能申请自己发布的任务") | |
| # 检查是否已申请 | |
| applicants = task.get("applicants", []) | |
| if any(a["account"] == current_user for a in applicants): | |
| raise HTTPException(status_code=400, detail="您已申请过该任务") | |
| # 添加申请 | |
| applicants.append({ | |
| "account": current_user, | |
| "message": message or "", | |
| "applied_at": int(time.time()) | |
| }) | |
| task["applicants"] = applicants | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("tasks:") | |
| # 🔔 通知发布者:有人申请接单 | |
| add_notification(task.get("publisher"), { | |
| "type": "task_apply", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task.get("title", ""), | |
| "content": f"申请了您的任务《{task.get('title', '')}》" | |
| }) | |
| return {"status": "success", "message": "申请成功,等待发布者选择"} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| async def cancel_apply(task_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 取消申请接单 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| if task.get("status") != "open": | |
| raise HTTPException(status_code=400, detail="任务已开始,无法取消申请") | |
| applicants = task.get("applicants", []) | |
| new_applicants = [a for a in applicants if a["account"] != current_user] | |
| if len(new_applicants) == len(applicants): | |
| raise HTTPException(status_code=400, detail="您未申请该任务") | |
| task["applicants"] = new_applicants | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("tasks:") | |
| return {"status": "success"} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # ========================================== | |
| # 🎯 指派接单者 | |
| # ========================================== | |
| async def assign_task(task_id: str, assignee: str, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): | |
| """ | |
| 发布者选择接单者 | |
| 💳 P6支付增强:从冻结金额中扣除订金,记录交易 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| if task.get("publisher") != current_user: | |
| raise HTTPException(status_code=403, detail="只有发布者可以指派接单者") | |
| if task.get("status") != "open": | |
| raise HTTPException(status_code=400, detail="该任务不在开放状态") | |
| # 验证接单者是否在申请列表中 | |
| applicants = task.get("applicants", []) | |
| if not any(a["account"] == assignee for a in applicants): | |
| raise HTTPException(status_code=400, detail="该用户未申请此任务") | |
| deposit = task.get("deposit_amount", 0) | |
| # 💳 从冻结金额中扣除订金(订金暂时不给接单者,待任务完成后一起给) | |
| wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() | |
| if not wallet or wallet.frozen_balance < deposit: | |
| raise HTTPException(status_code=400, detail="冻结余额异常") | |
| # 订金从冻结金额中扣除(但还不给接单者,待任务完成后一起支付) | |
| wallet.frozen_balance -= deposit | |
| # 获取接单者用户名 | |
| users_db = db.load_data("users.json", default_data={}) | |
| assignee_info = users_db.get(assignee, {}) | |
| assignee_name = assignee_info.get("name", assignee) | |
| # 💳 记录订金支付交易 | |
| create_task_transaction( | |
| db_session, current_user, "TASK_DEPOSIT", | |
| -deposit, related_account=assignee, task_id=task_id, | |
| task_title=task.get("title"), | |
| related_user_name=assignee_name | |
| ) | |
| # 更新任务状态 | |
| task["assignee"] = assignee | |
| task["assigned_at"] = int(time.time()) | |
| task["status"] = "in_progress" | |
| task["deposit_paid"] = deposit # 💳 记录已支付订金 | |
| db.save_data("tasks.json", tasks_db) | |
| db_session.commit() | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("tasks:") | |
| # 🔔 通知接单者:被指派接单 | |
| add_notification(assignee, { | |
| "type": "task_assigned", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task.get("title", ""), | |
| "content": f"您已被选为任务《{task.get('title', '')}》的接单者,订金{deposit}积分已缓冲" | |
| }) | |
| logger.info(f"TASK_ASSIGN | publisher={current_user} | assignee={assignee} | task={task_id} | deposit={deposit}") | |
| return {"status": "success", "message": f"已指派 {assignee} 接单,订金 {deposit} 积分已开始缓冲"} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # ========================================== | |
| # 📤 提交成果 | |
| # ========================================== | |
| async def submit_task(task_id: str, deliverables: list, note: str = None, current_user: str = Depends(require_auth)): | |
| """ | |
| 接单者提交成果 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| if task.get("assignee") != current_user: | |
| raise HTTPException(status_code=403, detail="只有接单者可以提交成果") | |
| if task.get("status") != "in_progress": | |
| raise HTTPException(status_code=400, detail="任务状态不允许提交") | |
| if not deliverables: | |
| raise HTTPException(status_code=400, detail="请上传交付成果") | |
| task["deliverables"] = deliverables | |
| task["submit_note"] = note | |
| task["submitted_at"] = int(time.time()) | |
| task["status"] = "submitted" | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存 | |
| sort_cache.invalidate("tasks:") | |
| # 🔔 通知发布者:接单者已提交成果 | |
| add_notification(task.get("publisher"), { | |
| "type": "task_submitted", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task.get("title", ""), | |
| "content": f"已提交任务《{task.get('title', '')}》的成果,请验收" | |
| }) | |
| return {"status": "success", "message": "成果已提交,等待发布者验收"} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # ========================================== | |
| # ✅ 验收成果 | |
| # ========================================== | |
| async def accept_task(task_id: str, is_accepted: bool, feedback: str = None, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): | |
| """ | |
| 发布者验收成果 | |
| - is_accepted=True: 验收通过,支付尾款给接单者 | |
| - is_accepted=False: 验收不通过,可以要求修改或发起申诉 | |
| 💳 P6支付增强:使用SQL钱包支付,记录交易流水,发送支付通知 | |
| 💳 P0修复:事务完整性保护,失败时回滚 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| if task.get("publisher") != current_user: | |
| raise HTTPException(status_code=403, detail="只有发布者可以验收") | |
| if task.get("status") != "submitted": | |
| raise HTTPException(status_code=400, detail="任务状态不允许验收") | |
| task["feedback"] = feedback | |
| assignee_account = task.get("assignee") | |
| total_price = task.get("total_price", 0) | |
| deposit = task.get("deposit_paid", task.get("deposit_amount", 0)) # 优先使用已记录的支付订金 | |
| remaining = total_price - deposit # 尾款 | |
| if is_accepted: | |
| # 💳 验收通过:支付尾款给接单者 | |
| # 💳 P0修复:使用事务保护,失败则回滚 | |
| try: | |
| publisher_wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() | |
| if not publisher_wallet or publisher_wallet.frozen_balance < remaining: | |
| raise HTTPException(status_code=400, detail="冻结余额不足支付尾款") | |
| # 扣除发布者尾款(从冻结余额) | |
| publisher_wallet.frozen_balance -= remaining | |
| # 给接单者全款(订金+尾款) | |
| assignee_wallet = db_session.query(Wallet).filter(Wallet.account == assignee_account).with_for_update().first() | |
| if not assignee_wallet: | |
| assignee_wallet = Wallet(account=assignee_account, balance=0) | |
| db_session.add(assignee_wallet) | |
| db_session.flush() | |
| assignee_wallet.balance += total_price # 全款进入可用余额 | |
| assignee_wallet.task_balance += total_price # 累计任务收益统计(只增不减) | |
| # 获取双方用户名 | |
| users_db = db.load_data("users.json", default_data={}) | |
| publisher_info = users_db.get(current_user, {}) | |
| assignee_info = users_db.get(assignee_account, {}) | |
| publisher_name = publisher_info.get("name", current_user) | |
| assignee_name = assignee_info.get("name", assignee_account) | |
| # 💳 记录交易流水 | |
| # 1. 发布者支付尾款 | |
| create_task_transaction( | |
| db_session, current_user, "TASK_PAYMENT", | |
| -remaining, related_account=assignee_account, task_id=task_id, | |
| task_title=task.get("title"), | |
| related_user_name=assignee_name | |
| ) | |
| # 2. 接单者收入 | |
| create_task_transaction( | |
| db_session, assignee_account, "TASK_INCOME", | |
| total_price, related_account=current_user, task_id=task_id, | |
| task_title=task.get("title"), | |
| related_user_name=publisher_name | |
| ) | |
| task["status"] = "completed" | |
| task["completed_at"] = int(time.time()) | |
| # 💳 P0修复:先commit SQL事务(资金为真),再最佳努力写入JSON | |
| db_session.commit() | |
| # 再最佳努力写入JSON | |
| try: | |
| db.save_data("tasks.json", tasks_db) | |
| except Exception as e: | |
| logger.warning(f"TASK_COMPLETE_JSON_SAVE_FAILED | 资金已结算但任务状态未更新,需手动修复: {str(e)}") | |
| logger.info(f"TASK_COMPLETE | publisher={current_user} | assignee={assignee_account} | task={task_id} | total={total_price}") | |
| # 🗂️ 清除排序缓存(任务状态变化) | |
| sort_cache.invalidate("tasks:") | |
| message = f"验收通过,已支付 {total_price} 积分给接单者" | |
| # 🔔 支付通知:接单者收到款项 | |
| add_notification(assignee_account, { | |
| "type": "task_payment", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task.get("title", ""), | |
| "content": f"💰 任务《{task.get('title', '')}》验收通过,{total_price}积分已到账" | |
| }) | |
| except HTTPException: | |
| raise # 重新抛出 HTTP 异常 | |
| except Exception as e: | |
| db_session.rollback() | |
| logger.error(f"TASK_ACCEPT_ERROR | task={task_id} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="验收处理失败,请稍后重试") | |
| else: | |
| # 验收不通过:回到进行中状态,允许修改后重新提交 | |
| task["status"] = "in_progress" | |
| task["deliverables"] = [] | |
| task["submitted_at"] = None | |
| message = "验收不通过,接单者可以修改后重新提交" | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存(任务状态变化) | |
| sort_cache.invalidate("tasks:") | |
| # 🔔 通知接单者:验收未通过 | |
| add_notification(assignee_account, { | |
| "type": "task_rejected", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task.get("title", ""), | |
| "content": f"任务《{task.get('title', '')}》验收未通过,请修改后重新提交" | |
| }) | |
| return {"status": "success", "message": message} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # ========================================== | |
| # ⚖️ 申诉仲裁系统 | |
| # ========================================== | |
| async def create_dispute(task_id: str, reason: str, evidence: list = None, current_user: str = Depends(require_auth)): | |
| """ | |
| 发起申诉(发布者或接单者均可) | |
| - evidence: 证据图片URL列表(可选) | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| disputes_db = db.load_data("disputes.json", default_data=[]) | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| # 验证权限 | |
| if current_user not in [task.get("publisher"), task.get("assignee")]: | |
| raise HTTPException(status_code=403, detail="只有发布者或接单者可以发起申诉") | |
| # 验证状态 | |
| if task.get("status") not in ["in_progress", "submitted"]: | |
| raise HTTPException(status_code=400, detail="当前状态不允许申诉") | |
| # 检查是否已有申诉 | |
| if task.get("dispute_id"): | |
| raise HTTPException(status_code=400, detail="该任务已有进行中的申诉") | |
| # 确定角色 | |
| is_publisher = current_user == task.get("publisher") | |
| role = "publisher" if is_publisher else "assignee" | |
| # 创建申诉 | |
| dispute = { | |
| "id": f"dispute_{int(time.time())}_{uuid.uuid4().hex[:6]}", | |
| "task_id": task_id, | |
| "task_title": task.get("title", ""), | |
| "publisher": task.get("publisher"), | |
| "assignee": task.get("assignee"), | |
| "initiator": current_user, | |
| "initiator_role": role, | |
| "reason": reason, | |
| "evidence": (evidence or [])[:6], | |
| "status": "pending", # pending/responded/resolved | |
| "created_at": int(time.time()), | |
| # 被申诉方回应 | |
| "response": None, | |
| "response_evidence": [], | |
| "responded_at": None, | |
| # 仲裁结果 | |
| "resolution": None, # favor_initiator/favor_respondent/split | |
| "resolution_ratio": None, # 分成比例(如果是split) | |
| "resolution_note": None, | |
| "resolved_by": None, | |
| "resolved_at": None | |
| } | |
| disputes_db.append(dispute) | |
| task["status"] = "disputed" | |
| task["dispute_id"] = dispute["id"] | |
| db.save_data("tasks.json", tasks_db) | |
| db.save_data("disputes.json", disputes_db) | |
| # 🗂️ 清除排序缓存(任务状态变为争议中) | |
| sort_cache.invalidate("tasks:") | |
| # 🔔 通知对方:已发起申诉 | |
| other_party = task.get("assignee") if is_publisher else task.get("publisher") | |
| add_notification(other_party, { | |
| "type": "task_disputed", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task.get("title", ""), | |
| "content": f"对任务《{task.get('title', '')}》发起了申诉,请回应" | |
| }) | |
| return {"status": "success", "data": dispute, "message": "申诉已提交,等待对方回应"} | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| async def get_dispute_detail(dispute_id: str): | |
| """ | |
| 获取申诉详情 | |
| """ | |
| disputes_db = db.load_data("disputes.json", default_data=[]) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| for dispute in disputes_db: | |
| if dispute["id"] == dispute_id: | |
| publisher_info = user_map.get(dispute.get("publisher"), {}) | |
| assignee_info = user_map.get(dispute.get("assignee"), {}) | |
| return { | |
| "status": "success", | |
| "data": { | |
| **dispute, | |
| "publisher_name": publisher_info.get("name", dispute.get("publisher")), | |
| "publisher_avatar": publisher_info.get("avatarDataUrl", ""), | |
| "assignee_name": assignee_info.get("name", dispute.get("assignee")), | |
| "assignee_avatar": assignee_info.get("avatarDataUrl", "") | |
| } | |
| } | |
| raise HTTPException(status_code=404, detail="申诉不存在") | |
| async def respond_dispute(dispute_id: str, response: str, evidence: list = None, current_user: str = Depends(require_auth)): | |
| """ | |
| 被申诉方回应申诉 | |
| """ | |
| disputes_db = db.load_data("disputes.json", default_data=[]) | |
| for dispute in disputes_db: | |
| if dispute["id"] == dispute_id: | |
| # 验证权限:必须是被申诉方 | |
| is_publisher = dispute.get("initiator_role") == "publisher" | |
| respondent = dispute.get("assignee") if is_publisher else dispute.get("publisher") | |
| if current_user != respondent: | |
| raise HTTPException(status_code=403, detail="只有被申诉方可以回应") | |
| if dispute.get("status") != "pending": | |
| raise HTTPException(status_code=400, detail="该申诉已回应或已解决") | |
| dispute["response"] = response | |
| dispute["response_evidence"] = (evidence or [])[:6] | |
| dispute["responded_at"] = int(time.time()) | |
| dispute["status"] = "responded" | |
| db.save_data("disputes.json", disputes_db) | |
| # 🗂️ 清除排序缓存(争议状态变化可能影响排序) | |
| sort_cache.invalidate("tasks:") | |
| # 🔔 通知申诉方:对方已回应 | |
| add_notification(dispute.get("initiator"), { | |
| "type": "dispute_responded", | |
| "from_user": current_user, | |
| "target_item_id": dispute.get("task_id"), | |
| "target_item_title": dispute.get("task_title", ""), | |
| "content": f"对方已回应您关于任务《{dispute.get('task_title', '')}》的申诉" | |
| }) | |
| return {"status": "success", "message": "回应已提交,等待管理员仲裁"} | |
| raise HTTPException(status_code=404, detail="申诉不存在") | |
| async def get_admin_disputes(status: str = None, current_user: str = Depends(require_auth)): | |
| """ | |
| 管理员获取申诉列表 | |
| - status: pending/responded/resolved | |
| """ | |
| if not is_admin(current_user): | |
| raise HTTPException(status_code=403, detail="需要管理员权限") | |
| disputes_db = db.load_data("disputes.json", default_data=[]) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| # 筛选 | |
| filtered = disputes_db | |
| if status: | |
| filtered = [d for d in filtered if d.get("status") == status] | |
| # 按时间倒序 | |
| filtered = sorted(filtered, key=lambda x: x.get("created_at", 0), reverse=True) | |
| # 附加用户信息 | |
| result = [] | |
| for dispute in filtered: | |
| publisher_info = user_map.get(dispute.get("publisher"), {}) | |
| assignee_info = user_map.get(dispute.get("assignee"), {}) | |
| result.append({ | |
| **dispute, | |
| "publisher_name": publisher_info.get("name", dispute.get("publisher")), | |
| "assignee_name": assignee_info.get("name", dispute.get("assignee")) | |
| }) | |
| return {"status": "success", "data": result} | |
| async def resolve_dispute(dispute_id: str, resolution: str, ratio: int = None, note: str = None, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): | |
| """ | |
| 管理员裁决申诉 | |
| - resolution: favor_initiator(支持申诉方) / favor_respondent(支持被申诉方) / split(按比例分成) | |
| - ratio: 分成比例(0-100),表示申诉方获得的比例。仅split时有效 | |
| - note: 裁决说明 | |
| 💳 P0改造:使用 SQL Wallet 系统处理资金,替代 JSON balance 操作 | |
| """ | |
| if not is_admin(current_user): | |
| raise HTTPException(status_code=403, detail="需要管理员权限") | |
| if resolution not in ["favor_initiator", "favor_respondent", "split"]: | |
| raise HTTPException(status_code=400, detail="无效的裁决结果") | |
| if resolution == "split" and (ratio is None or ratio < 0 or ratio > 100): | |
| raise HTTPException(status_code=400, detail="分成比例必须在0-100之间") | |
| disputes_db = db.load_data("disputes.json", default_data=[]) | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| users_db = db.load_data("users.json", default_data={}) | |
| for dispute in disputes_db: | |
| if dispute["id"] == dispute_id: | |
| if dispute.get("status") == "resolved": | |
| raise HTTPException(status_code=400, detail="该申诉已裁决") | |
| # 查找关联任务 | |
| task = next((t for t in tasks_db if t["id"] == dispute.get("task_id")), None) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="关联任务不存在") | |
| total_price = task.get("total_price", 0) | |
| deposit = task.get("deposit_amount", 0) | |
| publisher_account = dispute.get("publisher") | |
| assignee_account = dispute.get("assignee") | |
| is_publisher_initiator = dispute.get("initiator_role") == "publisher" | |
| # 💳 P0改造:使用 SQL Wallet 处理资金 | |
| try: | |
| # 获取双方钱包(带悲观锁) | |
| publisher_wallet = db_session.query(Wallet).filter(Wallet.account == publisher_account).with_for_update().first() | |
| assignee_wallet = db_session.query(Wallet).filter(Wallet.account == assignee_account).with_for_update().first() | |
| # 如果钱包不存在,创建新钱包 | |
| if not publisher_wallet: | |
| publisher_wallet = Wallet(account=publisher_account, balance=0) | |
| db_session.add(publisher_wallet) | |
| db_session.flush() | |
| if not assignee_wallet: | |
| assignee_wallet = Wallet(account=assignee_account, balance=0) | |
| db_session.add(assignee_wallet) | |
| db_session.flush() | |
| # 获取双方用户名 | |
| publisher_info = users_db.get(publisher_account, {}) | |
| assignee_info = users_db.get(assignee_account, {}) | |
| publisher_name = publisher_info.get("name", publisher_account) | |
| assignee_name = assignee_info.get("name", assignee_account) | |
| # 计算资金分配 | |
| if resolution == "favor_initiator": | |
| # 支持申诉方 | |
| if is_publisher_initiator: | |
| # 发布者胜诉:退还订金(从冻结余额释放到可用余额) | |
| publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - deposit) | |
| publisher_wallet.balance += deposit | |
| # 记录交易 | |
| create_task_transaction( | |
| db_session, publisher_account, "TASK_REFUND", | |
| deposit, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=assignee_name | |
| ) | |
| initiator_amount = deposit | |
| respondent_amount = 0 | |
| else: | |
| # 接单者胜诉:获得全款 | |
| remaining = total_price - deposit | |
| # 发布者支付尾款(从冻结余额扣除) | |
| publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining) | |
| # 接单者获得全款 | |
| assignee_wallet.balance += total_price | |
| assignee_wallet.task_balance += total_price | |
| # 记录交易 | |
| create_task_transaction( | |
| db_session, publisher_account, "TASK_PAYMENT", | |
| -remaining, related_account=assignee_account, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=assignee_name | |
| ) | |
| create_task_transaction( | |
| db_session, assignee_account, "TASK_INCOME", | |
| total_price, related_account=publisher_account, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=publisher_name | |
| ) | |
| initiator_amount = total_price | |
| respondent_amount = 0 | |
| elif resolution == "favor_respondent": | |
| # 支持被申诉方 | |
| if is_publisher_initiator: | |
| # 接单者胜诉:获得全款 | |
| remaining = total_price - deposit | |
| # 发布者支付尾款(从冻结余额扣除) | |
| publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining) | |
| # 接单者获得全款 | |
| assignee_wallet.balance += total_price | |
| assignee_wallet.task_balance += total_price | |
| # 记录交易 | |
| create_task_transaction( | |
| db_session, publisher_account, "TASK_PAYMENT", | |
| -remaining, related_account=assignee_account, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=assignee_name | |
| ) | |
| create_task_transaction( | |
| db_session, assignee_account, "TASK_INCOME", | |
| total_price, related_account=publisher_account, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=publisher_name | |
| ) | |
| initiator_amount = 0 | |
| respondent_amount = total_price | |
| else: | |
| # 发布者胜诉:退还订金(从冻结余额释放到可用余额) | |
| publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - deposit) | |
| publisher_wallet.balance += deposit | |
| # 记录交易 | |
| create_task_transaction( | |
| db_session, publisher_account, "TASK_REFUND", | |
| deposit, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=assignee_name | |
| ) | |
| initiator_amount = 0 | |
| respondent_amount = deposit | |
| else: | |
| # 按比例分成 | |
| initiator_share = int(total_price * ratio / 100) | |
| respondent_share = total_price - initiator_share | |
| remaining = total_price - deposit | |
| # 发布者支付尾款(从冻结余额扣除) | |
| publisher_wallet.frozen_balance = max(0, publisher_wallet.frozen_balance - remaining) | |
| # 发布者获得退款部分,接单者获得报酬部分 | |
| if is_publisher_initiator: | |
| # 申诉方是发布者 | |
| pub_refund = initiator_share | |
| assignee_earn = respondent_share | |
| else: | |
| # 申诉方是接单者 | |
| assignee_earn = initiator_share | |
| pub_refund = respondent_share | |
| # 分配资金 | |
| publisher_wallet.balance += pub_refund | |
| assignee_wallet.balance += assignee_earn | |
| assignee_wallet.task_balance += assignee_earn | |
| # 记录交易 | |
| create_task_transaction( | |
| db_session, publisher_account, "TASK_PAYMENT", | |
| -remaining, related_account=assignee_account, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=assignee_name | |
| ) | |
| create_task_transaction( | |
| db_session, assignee_account, "TASK_INCOME", | |
| assignee_earn, related_account=publisher_account, task_id=task.get("id"), | |
| task_title=task.get("title"), | |
| related_user_name=publisher_name | |
| ) | |
| initiator_amount = initiator_share | |
| respondent_amount = respondent_share | |
| # 提交 SQL 事务 | |
| db_session.commit() | |
| except Exception as e: | |
| db_session.rollback() | |
| logger.error(f"DISPUTE_RESOLVE_ERROR | dispute={dispute_id} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="仲裁资金处理失败,请稍后重试") | |
| # 更新申诉状态 | |
| dispute["resolution"] = resolution | |
| dispute["resolution_ratio"] = ratio | |
| dispute["resolution_note"] = note | |
| dispute["resolved_by"] = current_user | |
| dispute["resolved_at"] = int(time.time()) | |
| dispute["status"] = "resolved" | |
| # 更新任务状态 | |
| task["status"] = "completed" | |
| task["completed_at"] = int(time.time()) | |
| task["feedback"] = f"申诉裁决:{note or '无'}(结果:{resolution})" | |
| db.save_data("disputes.json", disputes_db) | |
| db.save_data("tasks.json", tasks_db) | |
| # 🗂️ 清除排序缓存(任务状态变为已完成) | |
| sort_cache.invalidate("tasks:") | |
| # 🔔 通知双方:仲裁结果 | |
| resolution_text = { | |
| "favor_initiator": "支持申诉方", | |
| "favor_respondent": "支持被申诉方", | |
| "split": f"双方协商分成({ratio}%:{100-ratio}%)" | |
| }.get(resolution, "已裁决") | |
| # 通知申诉方 | |
| add_notification(dispute.get("initiator"), { | |
| "type": "dispute_resolved", | |
| "from_user": current_user, | |
| "target_item_id": dispute.get("task_id"), | |
| "target_item_title": dispute.get("task_title", ""), | |
| "content": f"您的申诉已裁决:{resolution_text},您获得{initiator_amount}积分" | |
| }) | |
| # 通知被申诉方 | |
| respondent_account = dispute.get("assignee") if is_publisher_initiator else dispute.get("publisher") | |
| add_notification(respondent_account, { | |
| "type": "dispute_resolved", | |
| "from_user": current_user, | |
| "target_item_id": dispute.get("task_id"), | |
| "target_item_title": dispute.get("task_title", ""), | |
| "content": f"任务申诉已裁决:{resolution_text},您获得{respondent_amount}积分" | |
| }) | |
| logger.info(f"DISPUTE_RESOLVED | dispute={dispute_id} | resolution={resolution} | initiator_amount={initiator_amount} | respondent_amount={respondent_amount}") | |
| return {"status": "success", "message": f"裁决完成:{resolution_text}"} | |
| raise HTTPException(status_code=404, detail="申诉不存在") | |
| # ========================================== | |
| # 📊 我的任务 | |
| # ========================================== | |
| async def get_my_tasks(role: str = "publisher", current_user: str = Depends(require_auth)): | |
| """ | |
| 获取我的任务 | |
| - role=publisher: 我发布的任务 | |
| - role=assignee: 我接的任务 | |
| """ | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| if role == "publisher": | |
| my_tasks = [t for t in tasks_db if t.get("publisher") == current_user] | |
| else: | |
| my_tasks = [t for t in tasks_db if t.get("assignee") == current_user] | |
| # 按创建时间倒序 | |
| my_tasks = sorted(my_tasks, key=lambda x: x.get("created_at", 0), reverse=True) | |
| # 过滤敏感字段(列表接口过滤 viewed_by、liked_by、favorited_by) | |
| result = [] | |
| for task in my_tasks: | |
| task_data = {k: v for k, v in task.items() if k not in ["viewed_by", "liked_by", "favorited_by"]} | |
| result.append(task_data) | |
| return {"status": "success", "data": result} | |
| async def record_task_view(task_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 记录任务访问量 | |
| 👁️ 需要用户认证,每个用户只计算一次总访问量,日访问量每次调用都增加 | |
| """ | |
| result = record_view("tasks.json", task_id, current_user) | |
| if result is None: | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # 🗂️ 清除排序缓存(浏览量变化可能影响排序) | |
| sort_cache.invalidate("tasks:") | |
| return {"status": "success", "views": result["views"], "daily_views": result["daily_views"]} | |
| # ========================================== | |
| # ❤️ 互动接口(点赞/收藏) | |
| # ========================================== | |
| async def toggle_like(task_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 点赞/取消点赞(原子操作,并发安全) | |
| """ | |
| result_container = [None] | |
| def updater(data): | |
| for task in data: | |
| if task["id"] == task_id: | |
| liked_by = task.get("liked_by", []) | |
| if current_user in liked_by: | |
| liked_by.remove(current_user) | |
| task["likes"] = max(0, task.get("likes", 0) - 1) | |
| action = "unliked" | |
| else: | |
| liked_by.append(current_user) | |
| task["likes"] = task.get("likes", 0) + 1 | |
| action = "liked" | |
| task["liked_by"] = liked_by | |
| result_container[0] = {"status": "success", "action": action, "likes": task["likes"]} | |
| return | |
| result_container[0] = None # 未找到任务 | |
| db.atomic_update("tasks.json", updater, default_data=[]) | |
| if result_container[0] is None: | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # 🗂️ 清除排序缓存(点赞数变化可能影响排序) | |
| sort_cache.invalidate("tasks:") | |
| return result_container[0] | |
| async def toggle_favorite(task_id: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 收藏/取消收藏(原子操作,并发安全) | |
| """ | |
| result_container = [None] | |
| def updater(data): | |
| for task in data: | |
| if task["id"] == task_id: | |
| favorited_by = task.get("favorited_by", []) | |
| if current_user in favorited_by: | |
| favorited_by.remove(current_user) | |
| task["favorites"] = max(0, task.get("favorites", 0) - 1) | |
| action = "unfavorited" | |
| else: | |
| favorited_by.append(current_user) | |
| task["favorites"] = task.get("favorites", 0) + 1 | |
| action = "favorited" | |
| task["favorited_by"] = favorited_by | |
| result_container[0] = {"status": "success", "action": action, "favorites": task["favorites"]} | |
| return | |
| result_container[0] = None # 未找到任务 | |
| db.atomic_update("tasks.json", updater, default_data=[]) | |
| if result_container[0] is None: | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| # 🗂️ 清除排序缓存(收藏数变化可能影响排序) | |
| sort_cache.invalidate("tasks:") | |
| return result_container[0] | |
| # ========================================== | |
| # 🎁 打赏接口 | |
| # ========================================== | |
| async def tip_task(task_id: str, amount: int, is_anon: bool = False, current_user: str = Depends(require_auth), db_session: Session = Depends(get_db)): | |
| """ | |
| 打赏任务(原子操作,并发安全)- 使用 SQL Wallet 系统 | |
| """ | |
| if amount <= 0: | |
| raise HTTPException(status_code=400, detail="打赏金额必须大于0") | |
| result_container = [None] | |
| publisher_account = [None] # 用于在原子操作外获取发布者账号 | |
| def updater(data): | |
| # 在锁内查找任务 | |
| target_task = None | |
| for task in data: | |
| if task["id"] == task_id: | |
| target_task = task | |
| break | |
| if not target_task: | |
| result_container[0] = {"error": "not_found"} | |
| return | |
| # 不能打赏自己 | |
| if target_task.get("publisher") == current_user: | |
| result_container[0] = {"error": "self_tip"} | |
| return | |
| publisher_account[0] = target_task.get("publisher") | |
| # 更新打赏榜单 | |
| tip_board = target_task.get("tip_board", []) | |
| existing = next((t for t in tip_board if t["account"] == current_user), None) | |
| if existing: | |
| existing["amount"] += amount | |
| else: | |
| tip_board.append({"account": current_user, "amount": amount, "is_anon": is_anon}) | |
| tip_board.sort(key=lambda x: x["amount"], reverse=True) | |
| target_task["tip_board"] = tip_board | |
| result_container[0] = {"status": "success", "message": f"成功打赏 {amount} 积分"} | |
| db.atomic_update("tasks.json", updater, default_data=[]) | |
| result = result_container[0] | |
| if result is None or result.get("error") == "not_found": | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| if result.get("error") == "self_tip": | |
| raise HTTPException(status_code=400, detail="不能打赏自己的任务") | |
| # 💳 使用 SQL Wallet 系统处理余额转账 | |
| try: | |
| publisher = publisher_account[0] | |
| if not publisher: | |
| raise HTTPException(status_code=404, detail="作者账户不存在") | |
| # 🔒 P1幂等性防护:检查最近5秒内是否存在相同交易 | |
| recent_cutoff = datetime.datetime.utcnow() - datetime.timedelta(seconds=5) | |
| duplicate_tx = db_session.query(Transaction).filter( | |
| Transaction.account == current_user, | |
| Transaction.tx_type == "TIP_OUT", | |
| Transaction.amount == -amount, | |
| Transaction.related_account == publisher, | |
| Transaction.created_at >= recent_cutoff | |
| ).first() | |
| if duplicate_tx: | |
| return {"status": "success", "message": "打赏已处理(重复请求)"} | |
| # 🔒 并发安全:使用悲观锁获取双方钱包 | |
| tipper_wallet = db_session.query(Wallet).filter(Wallet.account == current_user).with_for_update().first() | |
| author_wallet = db_session.query(Wallet).filter(Wallet.account == publisher).with_for_update().first() | |
| if not tipper_wallet or tipper_wallet.balance < amount: | |
| raise HTTPException(status_code=400, detail="余额不足") | |
| if not author_wallet: | |
| # 如果作者钱包不存在,创建一个 | |
| author_wallet = Wallet(account=publisher, balance=0, earn_balance=0, tip_balance=0, frozen_balance=0) | |
| db_session.add(author_wallet) | |
| # 执行转账 | |
| tipper_wallet.balance -= amount | |
| author_wallet.balance += amount # 实际收入进统一余额 | |
| author_wallet.tip_balance += amount # 累计打赏收益统计(只增不减) | |
| # 创建交易记录 | |
| tx_id_tipper = f"TIP_OUT_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| tx_id_author = f"TIP_IN_{int(time.time())}_{uuid.uuid4().hex[:6]}" | |
| # 获取最后交易记录的哈希 | |
| last_tx_tipper = db_session.query(Transaction).filter(Transaction.account == current_user).order_by(Transaction.created_at.desc()).first() | |
| last_tx_author = db_session.query(Transaction).filter(Transaction.account == publisher).order_by(Transaction.created_at.desc()).first() | |
| prev_hash_tipper = last_tx_tipper.tx_hash if last_tx_tipper else "GENESIS_HASH" | |
| prev_hash_author = last_tx_author.tx_hash if last_tx_author else "GENESIS_HASH" | |
| # 获取用户信息和任务标题 | |
| users_db = db.load_data("users.json", default_data={}) | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| author_info = users_db.get(publisher, {}) | |
| tipper_info = users_db.get(current_user, {}) | |
| author_name = author_info.get("name", publisher) | |
| tipper_name = tipper_info.get("name", current_user) | |
| task_title = None | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| task_title = task.get("title") | |
| break | |
| # 打赏方交易记录 (TIP_OUT) | |
| tx_tipper = Transaction( | |
| tx_id=tx_id_tipper, | |
| account=current_user, | |
| tx_type="TIP_OUT", | |
| amount=-amount, | |
| related_account=publisher, | |
| item_id=task_id, | |
| prev_hash=prev_hash_tipper, | |
| tx_hash=calculate_tx_hash(tx_id_tipper, current_user, "TIP_OUT", -amount, prev_hash_tipper), | |
| description=f"打赏给 {author_name}" + (f" 的任务《{task_title}》" if task_title else ""), | |
| item_title=task_title, | |
| item_type="task", | |
| related_user_name=author_name | |
| ) | |
| # 接收方交易记录 (TIP_IN) | |
| tx_author = Transaction( | |
| tx_id=tx_id_author, | |
| account=publisher, | |
| tx_type="TIP_IN", | |
| amount=amount, | |
| related_account=current_user, | |
| item_id=task_id, | |
| prev_hash=prev_hash_author, | |
| tx_hash=calculate_tx_hash(tx_id_author, publisher, "TIP_IN", amount, prev_hash_author), | |
| description=f"收到 {tipper_name} 的任务打赏" + (f" ({task_title})" if task_title else ""), | |
| item_title=task_title, | |
| item_type="task", | |
| related_user_name=tipper_name if not is_anon else "匿名用户" | |
| ) | |
| db_session.add(tx_tipper) | |
| db_session.add(tx_author) | |
| db_session.commit() | |
| # 📝 审计日志 | |
| logger.info(f"TASK_TIP | from={current_user} | to={publisher} | amount={amount} | task={task_id} | anon={is_anon}") | |
| # 🔔 打赏通知(考虑匿名) | |
| if not is_anon: | |
| add_notification(publisher, { | |
| "type": "tip", | |
| "from_user": current_user, | |
| "target_item_id": task_id, | |
| "target_item_title": task_title or "", | |
| "content": f"您收到来自 {tipper_name} 的 {amount} 积分任务打赏" | |
| }) | |
| else: | |
| add_notification(publisher, { | |
| "type": "tip", | |
| "from_user": "anonymous", | |
| "target_item_id": task_id, | |
| "target_item_title": "", | |
| "content": f"您收到了一份 {amount} 积分的匿名任务打赏" | |
| }) | |
| # 🗂️ 清除排序缓存(打赏可能影响排序) | |
| sort_cache.invalidate("tasks:") | |
| except HTTPException: | |
| db_session.rollback() | |
| raise | |
| except Exception as e: | |
| db_session.rollback() | |
| logger.error(f"TASK_TIP_ERROR | from={current_user} | task={task_id} | amount={amount} | error={str(e)}") | |
| raise HTTPException(status_code=500, detail="打赏处理失败,请稍后重试") | |
| return result | |
| # ========================================== | |
| # 💬 评论接口(复用通用评论系统) | |
| # ========================================== | |
| async def get_task_comments(task_id: str): | |
| """ | |
| 获取任务评论 | |
| """ | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| users_db = db.load_data("users.json", default_data={}) | |
| # users_db 已经是 {account: user_info} 格式,直接使用 | |
| user_map = users_db | |
| # comments_db 是 {item_id: [comments]} 格式 | |
| task_comments = comments_db.get(task_id, []) | |
| # 附加用户信息 | |
| result = [] | |
| for c in task_comments: | |
| author_info = user_map.get(c.get("author"), {}) | |
| result.append({ | |
| **c, | |
| "author_name": author_info.get("name", c.get("author")), | |
| "author_avatar": author_info.get("avatarDataUrl", "") | |
| }) | |
| return {"status": "success", "data": result} | |
| async def add_task_comment(task_id: str, content: str, current_user: str = Depends(require_auth)): | |
| """ | |
| 添加任务评论 | |
| """ | |
| if not content or not content.strip(): | |
| raise HTTPException(status_code=400, detail="评论内容不能为空") | |
| tasks_db = db.load_data("tasks.json", default_data=[]) | |
| comments_db = db.load_data("comments.json", default_data={}) | |
| # 检查任务是否存在 | |
| task_exists = any(t["id"] == task_id for t in tasks_db) | |
| if not task_exists: | |
| raise HTTPException(status_code=404, detail="任务不存在") | |
| new_comment = { | |
| "id": f"comment_{int(time.time())}_{uuid.uuid4().hex[:6]}", | |
| "author": current_user, | |
| "content": content.strip(), | |
| "created_at": int(time.time()) | |
| } | |
| # comments_db 是 {item_id: [comments]} 格式 | |
| task_comments = comments_db.get(task_id, []) | |
| task_comments.insert(0, new_comment) | |
| comments_db[task_id] = task_comments | |
| db.save_data("comments.json", comments_db) | |
| # 🗂️ 清除排序缓存(评论数变化可能影响排序) | |
| sort_cache.invalidate("tasks:") | |
| # 更新任务评论数 | |
| for task in tasks_db: | |
| if task["id"] == task_id: | |
| task["comments"] = task.get("comments", 0) + 1 | |
| break | |
| db.save_data("tasks.json", tasks_db) | |
| return {"status": "success", "data": new_comment} | |