import os import requests from flask import Flask, request from simpleeval import simple_eval app = Flask(__name__) GROUP_ID = os.environ.get("GROUP_ID") LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN") message_list = {} group_id = "" @app.route('/', methods=['GET']) def index(): return {}, 200 @app.route("/api/", methods=["POST"]) def api(): global message_list try: payload = get_payload_dict(request.get_json()) print(group_id) if group_id != GROUP_ID: raise ValueError("Invalid Group") if payload.get("unsend_msg_id"): unsend_msg_id = payload.get("unsend_msg_id") message_list.pop(unsend_msg_id, None) message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id} raise ValueError("Unsend Success") if "$$$" in payload.get("msg_text"): if "清除" in payload.get("msg_text"): message_list = {} return "", 200 if "結算" in payload.get("msg_text"): users_number = get_users_number() users = list({item["user_id"] for item in message_list.values()}) if len(users) != users_number: users.append("others") print(users) matrix = [[0.0 for _ in range(len(users))] for _ in range(len(users))] should_checkout = True for msg_id, data in message_list.items(): quoted_msg_id = data.get("quoted_msg_id") quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id} if not quoted_msg_id and len(quoted_msg_list) != 0: # 檢查是否付清 amount: float = data.get("amount") paid: float = 0.0 for _, value in quoted_msg_list.items(): paid += value.get("amount") if amount-paid > 1: send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token")) should_checkout = False break if not quoted_msg_id and len(quoted_msg_list) == 0: # 要平分的情況 to = users.index(data.get("user_id")) for row in matrix: row[to] += (data.get("amount") / users_number) if quoted_msg_id: fr = users.index(data.get("user_id")) to = users.index(message_list.get(quoted_msg_id).get("user_id")) matrix[fr][to] += data.get("amount") print(matrix) if not should_checkout: return "", 200 matrix_copy = [[0 for _ in range(len(matrix))] for _ in range(len(matrix))] for i in range(len(matrix_copy)): for j in range(len(matrix_copy)): if i < j: matrix_copy[i][j] = matrix[i][j] - matrix[j][i] result = [] for i in range(len(matrix_copy)): for j in range(len(matrix_copy)): amount = matrix_copy[i][j] if amount > 0: result.append({"from": get_username(users[i]), "to": get_username(users[j]), "amount": amount}) if amount < 0: result.append({"from": get_username(users[j]), "to": get_username(users[i]), "amount": -amount}) if result: sorted_result = sorted(result, key=lambda x: x["from"]) bubble = get_checkout_bubble(sorted_result) send_flex_text(payload.get("token"), bubble) message_list = {} raise ValueError("Action Success") if "$" not in payload.get("msg_text"): if "計算" in payload.get("msg_text"): try: text = payload.get("msg_text") number = float(simple_eval(text.split("計算")[1])) send_text(payload.get("token"), f"$ {round(number,2)}", payload.get("quote_token")) except: raise ValueError("Calculation Error") raise ValueError("Calculation Successful") raise ValueError("Keyword not Found") if get_amount(payload.get("msg_text")) == None: raise ValueError("Amount is None") if payload.get("quoted_msg_id"): message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), "amount": get_amount(payload.get("msg_text")), "quoted_msg_id": payload.get("quoted_msg_id")} else: message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"), "amount": get_amount(payload.get("msg_text")), "quote_token": payload.get("quote_token"), "msg_text": payload.get("msg_text")} for msg_id, data in message_list.items(): quoted_msg_id = data.get("quoted_msg_id") quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id} if not quoted_msg_id and len(quoted_msg_list) != 0: amount: float = data.get("amount") paid: float = 0.0 for _, value in quoted_msg_list.items(): paid += value.get("amount") if amount-paid <= 1 and payload.get("quoted_msg_id") == msg_id: borrowers = [] for _, q_data in quoted_msg_list.items(): borrowers.append({"name": get_username(q_data.get("user_id")), "amount": q_data.get("amount")}) bubble = get_summary_bubble(data.get("msg_text"), get_username(data.get("user_id")), borrowers) send_flex_text(payload.get("token"), bubble) break if amount-paid > 1 and payload.get("quoted_msg_id") != msg_id: send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token")) break except Exception as e: print(f"An error occurred: {e}") print("="*20) for key in message_list: print(message_list[key]) print("="*20, end="\n\n") return "", 200 def get_payload_dict(raw_payload) -> dict: # print(raw_payload) global group_id events = raw_payload.get("events", [{}])[0] group_id = events.get("source", {}).get("groupId") return {"token": events.get("replyToken"), "quote_token": events.get("message", {}).get("quoteToken"), "group_id": events.get("source", {}).get("groupId"), "user_id": events.get("source", {}).get("userId"), "msg_type": events.get("message", {}).get("type"), "msg_id": events.get("message", {}).get("id"), "msg_text": events.get("message", {}).get("text"), "quoted_msg_id": events.get("message", {}).get("quotedMessageId"), "unsend_msg_id": events.get("unsend", {}).get("messageId")} def send_text(token: str, text: str, quote_token: str | None = None): requests.post("https://api.line.me/v2/bot/message/reply", headers={ "Content-Type": "application/json; charset=UTF-8", "Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN }, json={ "replyToken": token, "messages": [{"type": "text", "text": text, "quoteToken": quote_token}] }) def send_flex_text(token, bubble): requests.post("https://api.line.me/v2/bot/message/reply", headers={ "Content-Type": "application/json; charset=UTF-8", "Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN }, json={ "replyToken": token, "messages": [{ "type": "flex", "altText": "您有一則新訊息", "contents": bubble }] }) def get_username(user_id: str): if user_id == "others": return "其他人" url = f"https://api.line.me/v2/bot/group/{group_id}/member/{user_id}" try: res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() return res_json.get("displayName") except: return "未知" def get_users_number() -> int: global group_id url = f"https://api.line.me/v2/bot/group/{group_id}/members/count" try: res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json() return int(res_json.get("count")) except: return 0 def get_amount(text: str) -> float | None: try: after_dollar = text.split("$")[1] rows = after_dollar.split("\n") number = float(simple_eval(rows[0])) divisor = 1 if len(rows) > 1 and "/" in rows[1]: divisor = int(rows[1].replace("/", "")) if divisor % 3 == 0: number += 0.3 if divisor == 0: raise ValueError("divisor = 0") return round(number/divisor, 2) except: return None def get_summary_bubble(title: str, payer: str, borrowers: list): total = 0.0 borrower_contents = [] for borrower in borrowers: amount = float(borrower.get("amount")) total += amount borrower_contents.append({ "type": "box", "layout": "horizontal", "contents": [ { "type": "text", "text": borrower.get("name"), "size": "sm", "color": "#555555", "flex": 0 }, { "type": "text", "text": f"${round(amount,2)}", "size": "sm", "color": "#111111", "align": "end" } ] }) return { "type": "bubble", "body": { "type": "box", "layout": "vertical", "contents": [ { "type": "text", "text": "對帳明細", "weight": "bold", "color": "#1DB446", "size": "sm" }, { "type": "text", "text": title, "weight": "bold", "size": "xxl", "wrap": True, "margin": "md" }, { "type": "text", "text": f"由 {payer} 付款", "size": "sm", "color": "#aaaaaa", "wrap": True, "weight": "bold" }, { "type": "separator", "margin": "lg" }, { "type": "box", "layout": "vertical", "margin": "lg", "spacing": "sm", "contents": borrower_contents }, { "type": "separator", "margin": "lg" }, { "type": "box", "layout": "horizontal", "contents": [ { "type": "text", "text": "合計", "size": "sm", "color": "#555555" }, { "type": "text", "text": f"${round(total,2)}", "size": "sm", "color": "#111111", "align": "end", "weight": "bold" } ], "margin": "lg" } ] }, "styles": { "footer": { "separator": True } } } def get_checkout_bubble(checkout_list: list): carousel_contents = [] checkout_total = 0.0 checkout_contents = [] checkout_items = {key: value for key, value in message_list.items() if not value.get("quoted_msg_id")} for _, value in checkout_items.items(): msg_text = value.get("msg_text") amount = round(float(value.get("amount")), 2) checkout_total += amount if msg_text: msg_text = msg_text.split("$")[0] checkout_contents.append({ "type": "box", "layout": "horizontal", "contents": [ { "type": "text", "text": msg_text, "size": "sm", "color": "#555555", "flex": 0 }, { "type": "text", "text": f"${amount}", "size": "sm", "color": "#111111", "align": "end" } ], "margin": "sm" }) carousel_contents.append({ "type": "bubble", "body": { "type": "box", "layout": "vertical", "contents": [ { "type": "text", "text": "結算明細", "weight": "bold", "color": "#1DB446", "size": "sm" }, { "type": "text", "text": "此次結算包含:", "size": "sm", "color": "#aaaaaa", "wrap": True, "margin": "lg" }, { "type": "box", "layout": "vertical", "margin": "md", "spacing": "sm", "contents": checkout_contents}, { "type": "separator", "margin": "lg" }, { "type": "box", "layout": "horizontal", "contents": [ { "type": "text", "text": "合計", "size": "sm", "color": "#555555" }, { "type": "text", "text": f"${round(checkout_total,2)}", "size": "sm", "color": "#111111", "align": "end", "weight": "bold" } ], "margin": "lg" } ] }, "styles": { "footer": { "separator": True } } } ) current_user = checkout_list[0].get("from") current_total = 0.0 current_pay_to_contents = [] checkout_list.append({"from": None, "to": None, "amount": 0}) for item in checkout_list: if current_user != item.get("from"): carousel_contents.append({ "type": "bubble", "body": { "type": "box", "layout": "vertical", "contents": [ { "type": "text", "text": "結算明細", "weight": "bold", "color": "#1DB446", "size": "sm" }, { "type": "text", "text": current_user, "weight": "bold", "size": "xxl", "wrap": True, "margin": "md" }, { "type": "text", "text": "需付款給:", "size": "sm", "color": "#aaaaaa", "wrap": True, "margin": "lg" }, { "type": "box", "layout": "vertical", "margin": "md", "spacing": "sm", "contents": current_pay_to_contents}, { "type": "separator", "margin": "lg" }, { "type": "box", "layout": "horizontal", "contents": [ { "type": "text", "text": "合計", "size": "sm", "color": "#555555" }, { "type": "text", "text": f"${round(current_total,2)}", "size": "sm", "color": "#111111", "align": "end", "weight": "bold" } ], "margin": "lg" } ] }, "styles": { "footer": { "separator": True } } } ) current_user = item.get("from") current_total = 0.0 current_pay_to_contents = [] amount = round(float(item.get("amount")), 2) current_total += amount current_pay_to_contents.append({ "type": "box", "layout": "horizontal", "contents": [ { "type": "text", "text": item.get("to"), "size": "sm", "color": "#555555", "flex": 0 }, { "type": "text", "text": f"${amount}", "size": "sm", "color": "#111111", "align": "end" } ] },) return { "type": "carousel", "contents": carousel_contents } if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)