import base64
import hashlib
import json
import requests
import streamlit as st
import re
import os
import sqlite3
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
# 获取当前文件目录
current_dir = os.path.dirname(__file__)
parent_dir = os.path.dirname(current_dir)
name_set_file = os.path.join(parent_dir, 'data', 'name_set.db')
name_set_conn = sqlite3.connect(name_set_file)
name_set_cursor = name_set_conn.cursor()
content_set = ''
check_type = ''
name_not_yet_total = 0
etherscan_api_key = os.environ.get('etherscan_api_key', '')
# 图片Base64
def image_to_base64(img_path):
with open(img_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode()
# 把文字转换成 16 进制
def text_to_hex(text):
return '0x' + ''.join(format(byte, '02x') for byte in text.encode('utf-8'))
# str SHA256
def sha256(input_string):
sha256 = hashlib.sha256()
sha256.update(input_string.encode('utf-8'))
return sha256.hexdigest()
# 使用 SHA256 值检查内容是否已被 ethscribed
def check_content_exists(sha):
endpoint = f"/ethscriptions/exists/{sha}"
try:
response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint, timeout=5)
response.raise_for_status() # 如果响应状态码不是200,这会引发HTTPError
return response.json()['result']
except requests.RequestException: # 捕获所有与requests相关的异常
return "未知"
# 处理数据
def orginize_data(index):
global name_not_yet_total
sha_value = sha256(names[index])
response_data = check_content_exists(sha_value)
if hide_inscribed:
if not response_data:
result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
if response_data == '未知':
result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
else:
if response_data:
result.append({"铭文": names[index], "状态": '已题写', "十六进制数据": all_items[index][2:]})
elif response_data == '未知':
result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
else:
result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
# 确定索引器是否落后
def get_block_status():
endpoint = "/block_status"
response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint)
try:
data = response.json()
return data.get('blocks_behind', None)
except json.JSONDecodeError:
print("Failed to decode JSON. Response content:")
print(response.text)
return None
# 获取题写铭文的矿工费
def estimate_transaction_fee(text_length: int, speed: str = 'Propose') -> float:
"""
Estimate the transaction fee for an Ethereum transaction based on text length using Etherscan API.
:param api_key: Etherscan API key.
:param text_length: Length of the input text.
:param speed: Desired confirmation speed. Can be 'Safe', 'Propose', or 'Fast'.
:param request_timeout: Timeout for the HTTP request in seconds.
:return: Estimated transaction fee in ETH or None in case of errors.
"""
try:
# Get current gas prices from Etherscan
base_url = "https://api.etherscan.io/api"
params = {
"module": "gastracker",
"action": "gasoracle",
"apikey": etherscan_api_key
}
response = requests.get(base_url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
if data['status'] != '1':
return None
if speed not in ['Safe', 'Propose', 'Fast']:
return None
selected_gas_price = int(data['result'][f"{speed}GasPrice"])
# Calculate gas for the input text
NON_ZERO_BYTE_GAS = 68
ZERO_BYTE_GAS = 4
# Assuming every character is 1 byte (for simplification)
# In reality, certain UTF-8 characters may be more than 1 byte
data_gas = text_length * NON_ZERO_BYTE_GAS
# Calculate total gas and fee
BASE_GAS = 21000
total_gas = BASE_GAS + data_gas
total_fee_eth = total_gas * selected_gas_price * 1e-9
return total_fee_eth
except (requests.Timeout, requests.RequestException):
return None
# 查询 ETH token 价格
def get_eth_price():
try:
# 使用Etherscan的API endpoint获取以太坊的价格
url = f"https://api.etherscan.io/api?module=stats&action=ethprice&apikey=1I2ERGUVJZNJAFKY7MG4S317DUGRPXEQPF"
response = requests.get(url)
response.raise_for_status() # 将引发HTTPError,如果HTTP请求返回了不成功的状态码
data = response.json()
if data["message"] == "OK":
eth_price = float(data["result"]["ethusd"])
return eth_price
else:
print("Error fetching the ETH price")
return None
except requests.RequestException as e:
print(f"Error while accessing the API: {e}")
return None
st.set_page_config(page_title="EthPen - 批量查询铭文状态", page_icon="🔍", layout='centered', initial_sidebar_state='auto')
st.markdown(f'# :rainbow[批量查询铭文状态]', unsafe_allow_html=True)
st.subheader(r' ', anchor=False, divider='rainbow')
st.write("", unsafe_allow_html=True)
st.markdown(f'### 💖 查找你心仪的铭文')
name_selected_option = st.radio("选择查询的类型:",
['🔤纯文本', '🪙代币铭文', '🆔.eths', '🆔.eth', '🌲.tree', '🦛.honk', '🔄.etch', '🌐.com', '🥳.club'], index=0,
horizontal=True)
if name_selected_option == '🔤纯文本':
check_type = ''
if name_selected_option == '🪙代币铭文':
token_check = True
token_str = st.text_input('填写代币铭文文本(变动的部分文本如 id 请用 "@#" 代替):', 'data:,{"p":"erc-20","op":"mint","tick":"eths","id":"@#","amt":"1000"}')
col1, col2 = st.columns(2)
with col1:
token_min_id = st.number_input('填写最小 ID:', min_value=1, value=1, step=1)
with col2:
token_max_id = st.number_input('填写最大 ID:', min_value=2, value=21000, step=1)
token_str_list = []
if token_str != '' and '@#' in token_str:
for token_id in range(token_min_id, token_max_id + 1):
token_str_list.append(token_str.replace('@#', str(token_id)))
content_set = '\n'.join(token_str_list)
st.code(token_str_list[0], language="json", line_numbers=False)
content = st.text_area('铭文列表:', value=content_set)
else:
token_check = False
if name_selected_option == '🆔.eths':
check_type = '.eths'
if name_selected_option == '🆔.eth':
check_type = '.eth'
if name_selected_option == '🌲.tree':
check_type = '.tree'
if name_selected_option == '🦛.honk':
check_type = '.honk'
if name_selected_option == '🔄.etch':
check_type = '.etch'
if name_selected_option == '🌐.com':
check_type = '.com'
if name_selected_option == '🥳.club':
check_type = '.club'
if not token_check:
# 执行SQL查询,获取所有表名
name_set_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
# 检索查询结果
tables = name_set_cursor.fetchall()
name_srt_tables_options = []
name_srt_tables_options.append('⌨️ 直接输入')
for table in tables:
name_srt_tables_options.append(table[0])
name_srt_tables_options.append('📃 导入文件')
name_set_table = st.radio("选择系列:", name_srt_tables_options, index=0, disabled=token_check, horizontal=True)
if name_set_table and name_set_table != '⌨️ 直接输入' and name_set_table != '📃 导入文件':
name_set_tables_name_options = []
# 执行SQL查询,获取指定表的所有 'name' 列的值
name_set_cursor.execute(f"SELECT name FROM {name_set_table}")
# 检索查询结果
names = name_set_cursor.fetchall()
# 打印所有 'name' 列的值
for name in names:
name_set_tables_name_options.append(name[0])
name_set_tables_name_options.sort()
name_set_table_name = st.radio("系列", name_set_tables_name_options, index=0, disabled=token_check, horizontal=True,
label_visibility="collapsed")
if name_set_table_name:
# 执行SQL查询,获取指定表的 'name' 和 'data' 列的值
name_set_cursor.execute(f"SELECT data FROM {name_set_table} WHERE name='{name_set_table_name}'")
# 检索查询结果
all_data = name_set_cursor.fetchall()
content_set = f'{all_data[0][0]}'
if name_set_table == "⌨️ 直接输入":
content_set = ''
if name_set_table == "📃 导入文件":
uploaded_file = st.file_uploader("上传文件", type=['txt', 'csv', 'xlsx'])
if uploaded_file is not None:
if uploaded_file.type not in ['text/plain', 'text/csv', 'application/vnd.ms-excel']:
st.error('文件类型不支持')
else:
# 读取上传的文件内容
uploaded_file_contents = uploaded_file.read().decode('utf-8')
# 将文件内容分割成列表,使用空格、换行符和逗号作为分隔符
separators = [' ', '\n', ',']
uploaded_file_contents_list = re.split('|'.join(map(re.escape, separators)), uploaded_file_contents)
# 在这里,file_contents_list 包含了分割后的文本元素
# 去除重复元素,将列表转换为集合,然后再转回列表
uploaded_unique_contents_list = list(set(uploaded_file_contents_list))
content_set = ' '.join(uploaded_unique_contents_list)
if not token_check:
custom_suffix = st.toggle('自定义前后缀')
if custom_suffix:
col1, col2 = st.columns(2)
with col1:
custom_prefix = st.text_input("自定义前缀")
with col2:
if check_type == '':
suffix_check = False
else:
suffix_check = True
custom_suffix = st.text_input("自定义后缀", disabled=suffix_check)
else:
custom_prefix, custom_suffix = '', ''
if not token_check:
filter = st.toggle('过滤器')
if filter:
col1, col2, col3, col4, col5, col6 = st.columns(6)
with col1:
start_with = st.text_input("以*开头", key='过滤器(可选)1')
with col2:
end_with = st.text_input("以*结尾", key='过滤器(可选)2')
with col3:
contain = st.text_input("包含*", key='过滤器(可选)3')
with col4:
no_contain = st.text_input("不包含*", key='过滤器(可选)4')
with col5:
min_length = st.text_input("最少长度", key='过滤器(可选)5')
with col6:
max_length = st.text_input("最大长度", key='过滤器(可选)6')
# 将过滤条件应用到原始 name_set 字符串列表
filtered_list = content_set.split() # 使用空格、换行符和逗号分隔字符串,得到列表
if start_with:
filtered_list = [item for item in filtered_list if item.startswith(start_with)]
if end_with:
filtered_list = [item for item in filtered_list if item.endswith(end_with)]
if contain:
filtered_list = [item for item in filtered_list if contain in item]
if no_contain:
filtered_list = [item for item in filtered_list if no_contain not in item]
if min_length:
filtered_list = [item for item in filtered_list if len(item) >= int(min_length)]
if max_length:
filtered_list = [item for item in filtered_list if len(item) <= int(max_length)]
# 将处理后的列表重新合并成字符串
content_set = ' '.join(filtered_list)
if not token_check:
content = st.text_area('铭文列表:', value=content_set, key=1)
hide_inscribed = st.toggle('隐藏已题写')
enable_api = st.toggle('启用 Ethscriptions.com API (v1)')
if enable_api:
print(get_block_status())
st.success(f"铭文数据来自 [Ethscriptions.com](https://ethscriptions.com/) 官方网站,当前索引器状态落后: {get_block_status()} 个区块。我们将以 100 个线程进行网络请求查询。尽管查询速度稍显缓慢,但所得数据确保了真实性与准确性。")
if st.button('🔎 查询', key='🔎 查询'):
st.markdown(f'### 📄 查询结果')
all_items = []
names = []
if token_check:
for i in token_str_list:
names.append(i)
all_items.append(text_to_hex(i))
else:
if check_type == '':
names = re.split(r"\s+", content)
# names = list(set(names))
names = [custom_prefix + name + custom_suffix for name in names]
names = ["data:," + name if not name.startswith("data:,") else name for name in names]
for i in names:
if names != 'data:,':
all_items.append(text_to_hex(i))
else:
names = re.split(r"\s+", content)
# names = list(set(names))
names = [custom_prefix + name + check_type for name in names]
names = ["data:," + name if not name.startswith("data:,") else name for name in names]
for i in names:
if names != 'data:,':
all_items.append(text_to_hex(i))
ethscrptions_db_file = os.path.join(parent_dir, 'data', 'ethscriptions_data.db')
ethscriptions_conn = sqlite3.connect(ethscrptions_db_file)
ethscriptions_cursor = ethscriptions_conn.cursor()
names_total = len(names)
result = []
if enable_api:
with ThreadPoolExecutor(max_workers=100) as executor:
for index in range(len(names)):
executor.submit(orginize_data, index) # Submit task to thread pool
else:
for index in range(len(names)):
ethscriptions_cursor.execute("SELECT * FROM data WHERE data=?", (all_items[index],))
row = ethscriptions_cursor.fetchone()
if hide_inscribed:
if not row:
result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
else:
if row:
result.append({"铭文": names[index], "状态": "已题写", "十六进制数据": all_items[index][2:]})
else:
result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
st.progress(1 - (name_not_yet_total / names_total), f'题写进度 {names_total - name_not_yet_total}/{names_total} ({(1 - (name_not_yet_total / names_total)) * 100:.0f}%):')
name_not_yet_total = 0
if result:
# 将结果转换为Pandas DataFrame
result_df = pd.DataFrame(result)
# print(result_df)
st.dataframe(result_df, use_container_width=True, hide_index=True)
# 使用列表解析获取所有"未题写"状态的"十六进制数据"
not_written_data = [item['十六进制数据'] for item in result if item['状态'] == '未题写']
# 获取未题写数据的个数和总长度
num_not_written = len(not_written_data)
total_length = sum(len(data) for data in not_written_data)
gas_spent = 0 if num_not_written == 0 else estimate_transaction_fee(total_length // num_not_written)
eth_price = get_eth_price()
st.success(
f'题写上面未题写的铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 {gas_spent * eth_price:.2f}USD;题写全部未题写铭文最低需要花费 {gas_spent * num_not_written:.10f} ETH,约为 {gas_spent * num_not_written * eth_price:.2f}USD。')
# Convert DataFrame to CSV with proper encoding
csv_export = result_df.to_csv(index=False, encoding='utf-8')
# Add a download button for the DataFrame
st.download_button(
label="⏬ 下载查询结果",
data=csv_export,
file_name="ethpen_result_data.csv",
mime="text/csv"
)
else:
st.markdown(f'### ☹️ 你来迟了~')
#
# print(len(content_set))
# gas_spent = 0 if len(content_set.split()) == 0 else estimate_transaction_fee(len(content_set) // len(content_set.split()))
# st.success(f'题写上面铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 ${gas_spent * get_eth_price():.2f}。')