File size: 6,880 Bytes
fa6e3b2 bf00a95 fa6e3b2 822d179 fa6e3b2 8300a7c 6a17841 d5e4b99 6a17841 fa6e3b2 77bb999 2ba3143 eb1cacb fa6e3b2 2ba3143 c915bab da3b0f7 35130c5 53558ec da3b0f7 6abbd80 da3b0f7 2445a33 cac428c 7555506 cac428c 2445a33 da3b0f7 2445a33 da3b0f7 2445a33 9065332 2ba3143 5c18dcf fa6e3b2 553c037 fa6e3b2 c915bab 379cd26 c915bab 553c037 c915bab fa6e3b2 eb1cacb 6a17841 fa6e3b2 e619c58 af688b0 c915bab fa6e3b2 fb4f1a7 fa6e3b2 6a17841 fb4f1a7 6a17841 4bc235e fa6e3b2 13edd55 0a39678 1bb9b5d 9da9263 fa6e3b2 35130c5 fa6e3b2 9da9263 fa6e3b2 553c037 fa6e3b2 27407e5 fa6e3b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import os
import re
import json
import requests
from flask import Flask, request
from simpleeval import simple_eval
app = Flask(__name__)
GROUP_ID = os.environ.get("GROUP_ID")
LINE_CHANNEL_ACCESS_TOKEN = os.environ.get("LINE_CHANNEL_ACCESS_TOKEN")
message_list = {}
@app.route('/', methods=['GET'])
def index():
return {}, 200
@app.route("/api/", methods=["POST"])
def api():
global message_list
try:
payload = get_payload_dict(request.get_json())
if payload.get("group_id") != GROUP_ID:
raise ValueError("Invalid Group")
if payload.get("unsend_msg_id"):
unsend_msg_id = payload.get("unsend_msg_id")
message_list.pop(unsend_msg_id, None)
message_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") != unsend_msg_id}
raise ValueError("Unsend Success")
if "$$$" in payload.get("msg_text"):
if "結算" in payload.get("msg_text"):
users_number = get_users_number()
users = list({item["user_id"] for item in message_list.values()})
if len(users) != users_number:
users.append("others")
print(users)
matrix = [[0 for _ in range(len(users))] for _ in range(len(users))]
for msg_id, data in message_list.items():
quoted_msg_id = data.get("quoted_msg_id")
quoted_number = len([key for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id])
if quoted_msg_id:
fr = users.index(data.get("user_id"))
to = users.index(message_list.get(quoted_msg_id).get("user_id"))
matrix[fr][to] += data.get("amount")
if not quoted_msg_id and quoted_number == 0: # 要平分的情況
to = users.index(data.get("user_id"))
for row in matrix:
row[to] += (data.get("amount") / users_number)
print(matrix)
# users.index("b")
# username_list = get_username_list()
# print(username_list)
# for user_id in username_list:
# print(get_username(user_id))
raise ValueError("Action Success")
if "$" not in payload.get("msg_text"):
raise ValueError("Keyword not Found")
if get_amount(payload.get("msg_text")) == None:
raise ValueError("Amount is None")
if payload.get("quoted_msg_id"):
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quoted_msg_id": payload.get("quoted_msg_id")}
else:
message_list[payload.get("msg_id")] = {"user_id": payload.get("user_id"),
"amount": get_amount(payload.get("msg_text")),
"quote_token": get_amount(payload.get("quote_token"))}
for msg_id, data in message_list.items():
quoted_msg_id = data.get("quoted_msg_id")
quoted_msg_list = {key: value for key, value in message_list.items() if value.get("quoted_msg_id") == msg_id}
if not quoted_msg_id and len(quoted_msg_list) != 0 and payload.get("quoted_msg_id") != msg_id:
amount: float = data.get("amount")
paid: float = 0.0
for _, value in quoted_msg_list.items():
paid += value.get("amount")
print(amount, paid)
if (amount-paid > 1):
send_text(payload.get("token"), f"$ {amount-paid} 未付清", data.get("quote_token"))
break
# if quoted_msg_id:
# fr = users.index(data.get("user_id"))
# to = users.index(message_list.get(quoted_msg_id).get("user_id"))
# matrix[fr][to] += data.get("amount")
# if not quoted_msg_id and quoted_number == 0: # 要平分的情況
# to = users.index(data.get("user_id"))
# for row in matrix:
# row[to] += (data.get("amount") / users_number)
# username = get_username(payload)
# text = payload.get("msg_text")
# print(username)
# send_text(payload.get("token"), f"{username}:{text}")
except Exception as e:
print(f"An error occurred: {e}")
print(message_list)
print()
return "", 200
def get_payload_dict(raw_payload) -> dict:
print(raw_payload)
events = raw_payload.get("events", [{}])[0]
return {"token": events.get("replyToken"),
"quote_token": events.get("message", {}).get("quoteToken"),
"group_id": events.get("source", {}).get("groupId"),
"user_id": events.get("source", {}).get("userId"),
"msg_type": events.get("message", {}).get("type"),
"msg_id": events.get("message", {}).get("id"),
"msg_text": events.get("message", {}).get("text"),
"quoted_msg_id": events.get("message", {}).get("quotedMessageId"),
"unsend_msg_id": events.get("unsend", {}).get("messageId")}
def send_text(token: str, text: str, quote_token: str | None = None):
requests.post("https://api.line.me/v2/bot/message/reply", headers={
"Content-Type": "application/json; charset=UTF-8",
"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN
}, json={
"replyToken": token,
"messages": [{"type": "text", "text": text, "quoteToken": quote_token}]
})
def get_username(user_id: str):
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/member/{user_id}"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return res_json.get("displayName")
except:
return "Unknow"
def get_users_number() -> int:
url = f"https://api.line.me/v2/bot/group/{GROUP_ID}/members/count"
try:
res_json = requests.get(url, headers={"Authorization": "Bearer " + LINE_CHANNEL_ACCESS_TOKEN}).json()
return int(res_json.get("count"))
except:
return 0
def get_amount(text: str) -> float | None:
try:
after_dollar = text.split("$")[1]
rows = after_dollar.split("\n")
number_string = rows[0]
divisor = 1
if len(rows) > 1 and "/" in rows[1]:
divisor = int(rows[1].replace("/", ""))
return round(float(simple_eval(number_string))/divisor, 2)
except:
return None
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)
|