import streamlit as st
import requests
import os
from web3 import Web3
import sqlite3
import time
import threading
import base64
import configparser
import pandas as pd
import json
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
my_style = '''
'''
# 获取 pushover api
pushover_api_token = os.environ.get('pushover_api_token', '')
pushover_user_token = os.environ.get('pushover_user_token', '')
# 获取当前文件目录
current_dir = os.path.dirname(__file__)
parent_dir = os.path.dirname(current_dir)
config_ini_file = os.path.join(parent_dir, 'data', 'config.ini') # 程序配置文件
push_messages_db_file = os.path.join(parent_dir, 'data', 'push_messages.db')
all_ethscription_db_file = os.path.join(parent_dir, 'data', 'all_ethscription.db')
# 设置数据库
push_messages_conn = sqlite3.connect(push_messages_db_file)
push_messages_cur = push_messages_conn.cursor()
# 使用你的Ethereum节点的RPC地址
infura_api_key_eths = os.environ.get('infura_api_key_eths', '')
w3 = Web3(Web3.HTTPProvider(infura_api_key_eths))
# 设置 telegram api
telegram_bot_token = os.environ.get('telegram_bot_token', '')
telegram_channel_id = "@ethspush"
telegram_base_url = f"https://api.telegram.org/bot{telegram_bot_token}"
thread_id = 0
executor = ThreadPoolExecutor(max_workers=10)
# 配置合约 ABI 文件
with open(os.path.join(parent_dir, 'data', 'batch_contract_abi.json'), "r") as batch:
contract_abi = json.load(batch)
batch_contract = w3.eth.contract(abi=contract_abi)
with open(os.path.join(parent_dir, 'data', 'single_contract_abi.json'), "r") as single:
contract_abi = json.load(single)
single_contract = w3.eth.contract(abi=contract_abi)
with open(os.path.join(parent_dir, 'data', 'single_contract_old_abi.json'), "r") as single_old:
contract_abi = json.load(single_old)
single_old_contract = w3.eth.contract(abi=contract_abi)
# 图片 Base64
def image_to_base64(img_path):
with open(img_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode()
# 通过 ID 从 all_ethscription 获取铭文内容
def get_ethscription_by_id(ethscription_id, all_ethscription_cur):
# 执行查询语句
all_ethscription_cur.execute("SELECT name FROM id_content WHERE ethscriptionId=?", (ethscription_id,))
result = all_ethscription_cur.fetchone()
# 返回结果
if result:
return result[0]
else:
return None
# 获取单个 ethscription 数据
def get_specific_ethscription(ethscription_identifier):
endpoint = f"/ethscriptions/{ethscription_identifier}"
try:
response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint, timeout=3)
response.raise_for_status() # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
if response.status_code == 200:
# Assuming the Content-Type is JSON
return response.json()
else:
print(f'Unexpected status code {response.status_code} for identifier: {ethscription_identifier}')
return None
except requests.Timeout:
print(f"Request timed out for identifier: {ethscription_identifier}")
except requests.ConnectionError:
print(f"Connection error for identifier: {ethscription_identifier}")
except requests.TooManyRedirects:
print(f"Too many redirects for identifier: {ethscription_identifier}")
except requests.HTTPError as http_err:
print(f"HTTP error for identifier {ethscription_identifier}: {http_err}")
except requests.RequestException as req_err:
print(f"General error for identifier {ethscription_identifier}: {req_err}")
except Exception as e:
# Catch-all for any other unforeseen exceptions
print(f"An unexpected error occurred for identifier {ethscription_identifier}: {e}")
return None # Ensure the function always returns a value even in the case of an error
def send_message_to_channel(text):
url = f"{telegram_base_url}/sendMessage"
payload = {
"chat_id": telegram_channel_id,
"text": text
}
response = requests.post(url, data=payload)
if response.status_code == 200:
return response.json()
else:
return None
def send_pushover_notification(token, user_key, message, title):
url = "https://api.pushover.net/1/messages.json"
data = {
"token": token,
"user": user_key,
"message": message,
"title": title
}
response = requests.post(url, data=data)
if response.status_code == 200:
return response.json()
else:
return None
def send_message_to_channel_wrapper(text):
return send_message_to_channel(text)
def send_pushover_notification_wrapper(token, user_key, message, title):
return send_pushover_notification(token, user_key, message, title)
def get_push_data():
push_messages_w_conn = sqlite3.connect(push_messages_db_file)
push_messages_w_cur = push_messages_w_conn.cursor()
all_ethscription_conn = sqlite3.connect(all_ethscription_db_file)
all_ethscription_cur = all_ethscription_conn.cursor()
with ThreadPoolExecutor() as executor:
while True:
block_number = w3.eth.block_number
# block_number = 18205797
block = w3.eth.get_block(block_number, full_transactions=True)
transactions = block['transactions']
for tx in transactions:
query = "SELECT EXISTS(SELECT 1 FROM messages WHERE hash=?)"
push_messages_w_cur.execute(query, (str(tx['hash'].hex()),))
exists = push_messages_w_cur.fetchone()[0]
if not exists:
if tx['to'] == '0x57b8792c775D34Aa96092400983c3e112fCbC296':
if tx['value'] >= 1:
if tx['input'][:10] == '0xd2234424' or tx['input'][:10] == '0xd92a1740':
try:
if tx['input'][:10] == '0xd2234424':
data = single_old_contract.decode_function_input(tx['input'])
if tx['input'][:10] == '0xd92a1740':
data = single_contract.decode_function_input(tx['input'])
params = data[1]
# Handle bytes data
for key, value in params.items():
if isinstance(value, bytes):
params[key] = value.hex()
# 获取并处理订单数据
order = params.get('order', {})
if order: # 确保order不为空
for key in ['ethscriptionId', 'r', 's', 'params']:
if key in order: # 确保键在order中
order[key] = order[key].hex()
ethscription = get_ethscription_by_id(f'0x{order["ethscriptionId"]}', all_ethscription_cur)
if ethscription:
ethscription = get_specific_ethscription()
if ethscription['collections']:
name = ethscription['collections'][0]['name']
else:
name = ethscription['content_uri']
if len(name) > 128:
name = ethscription['ethscription_number']
msg_title = f"ETCH new order: {float(order['price']) / 1e18} ETH"
msg_text = f"""
Etch 新的单笔大额订单
时间:{datetime.fromtimestamp(block['timestamp'])}
铭文:{name}
数量:{order['quantity']}
单价:{float(order['price']) / 1e18 / order['quantity']}
金额:{float(order['price']) / 1e18}
买家:{tx['from']}
卖家:{order['signer']}
链接:{'https://etherscan.io/tx/' + tx['hash'].hex()}
"""
# response = send_message_to_channel(msg_text)
telegram_future = executor.submit(send_message_to_channel_wrapper, msg_text)
pushover_future = executor.submit(send_pushover_notification_wrapper, pushover_api_token, pushover_user_token, msg_text, msg_title)
telegram_response = telegram_future.result()
pushover_response = pushover_future.result()
push_messages_w_cur.execute('''
INSERT INTO messages (time, title, hash) VALUES (?, ?, ?)
''', (str(datetime.fromtimestamp(block['timestamp'])), str(msg_title), str(tx['hash'].hex())))
push_messages_w_conn.commit()
except Exception as e:
print(f"Error processing transaction {tx['hash'].hex()}: {e}")
if tx['to'] == '0x941Bc2E04A776d436E183Fe4204Bb84FeBA564D3':
if tx['value'] >= 1:
if tx['input'][:10] == '0x3bb23351':
try:
data = batch_contract.decode_function_input(tx['input'])
params = data[1]
# Handle bytes data
for key, value in params.items():
if isinstance(value, bytes):
params[key] = value.hex()
# 获取并处理订单数据
orders_data = params.get('orders', [])
for order in orders_data:
for key in ['ethscriptionId', 'r', 's', 'params']:
order[key] = order[key].hex()
msg_title = f"ETCH new order: {float(tx['value']) / 1e18} ETH"
msg_text = f"""
Etch 新的批量大额订单
时间:{datetime.fromtimestamp(block['timestamp'])}
金额:{float(tx['value']) / 1e18}
买家:{tx['from']}
链接:{'https://etherscan.io/tx/' + tx['hash'].hex()}
"""
# response = send_message_to_channel(msg_text)
telegram_future = executor.submit(send_message_to_channel_wrapper, msg_text)
pushover_future = executor.submit(send_pushover_notification_wrapper, pushover_api_token, pushover_user_token, msg_text, msg_title)
telegram_response = telegram_future.result()
pushover_response = pushover_future.result()
push_messages_w_cur.execute('''
INSERT INTO messages (time, title, hash) VALUES (?, ?, ?)
''', (
str(datetime.fromtimestamp(block['timestamp'])), str(msg_title), str(tx['hash'].hex())))
push_messages_w_conn.commit()
except Exception as e:
print(f"Error processing transaction {tx['hash'].hex()}: {e}")
time.sleep(10)
# Streamlit app layout
st.set_page_config(page_title="EthPen - 推送通知服务", page_icon="📢", layout='centered', initial_sidebar_state='auto')
st.markdown(f'#
:rainbow[推送通知服务]', unsafe_allow_html=True)
st.subheader(r'', anchor=False, divider='rainbow')
st.error('开发中,仅供参考...')
st.error('若您有更好的建议或想法,请随时与我取得联系。')
st.markdown('### 订阅服务')
push_service_option = st.radio("选择通知的服务:", ['Telegram', 'Pushover', 'Bark', 'Server 酱', 'PushDeer', '钉钉', '企业微信', '飞书', 'Discord', '短信', '电话'], index=0,
horizontal=True, disabled=True)
col1, col2 = st.columns(2)
# 在第一个列中添加文本输入框
text_input = col1.text_input("API Keys:", label_visibility='collapsed', disabled=True)
# 在第二个列中添加按钮
button_clicked = col2.button("订阅", disabled=True)
st.markdown(f'''{my_style}
Pushover - EthPen.com
@ethspush
''', unsafe_allow_html=True)
st.markdown('考虑到通知的时效性及操作的便捷性,暂不支持个人定制服务,且推送的触发条件为合约交易金额大于等于 1ETH。目前我们只开放了 Telegram 频道和 Pushover 作为通知方式。欢迎各位订阅并体验推送服务。')
st.markdown('### 最近消息')
with sqlite3.connect(push_messages_db_file) as conn:
recent_messages = pd.read_sql_query("SELECT time, title, hash FROM messages ORDER BY time DESC LIMIT 10;", conn)
st.dataframe(recent_messages, use_container_width=True, hide_index=True)
config = configparser.ConfigParser()
config.read(config_ini_file)
thread_start_state_value = config['push_thread_start_state']['state']
if thread_start_state_value == '0':
print('消息推送线程未启动。')
config['push_thread_start_state']['state'] = '1'
with open(config_ini_file, 'w') as configfile:
config.write(configfile)
# 创建线程对象
push_thread = threading.Thread(target=get_push_data)
push_thread.start()
push_thread.join()
else:
print('消息推送线程已经启动。')