eths / pages /1_🔍_批量查询铭文状态.py
Ethscriptions's picture
Add application file
6802a9c
import base64
import hashlib
import json
import requests
import streamlit as st
import re
import os
import sqlite3
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
# 获取当前文件目录
current_dir = os.path.dirname(__file__)
parent_dir = os.path.dirname(current_dir)
name_set_file = os.path.join(parent_dir, 'data', 'name_set.db')
name_set_conn = sqlite3.connect(name_set_file)
name_set_cursor = name_set_conn.cursor()
content_set = ''
check_type = ''
name_not_yet_total = 0
etherscan_api_key = os.environ.get('etherscan_api_key', '')
# 图片Base64
def image_to_base64(img_path):
with open(img_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode()
# 把文字转换成 16 进制
def text_to_hex(text):
return '0x' + ''.join(format(byte, '02x') for byte in text.encode('utf-8'))
# str SHA256
def sha256(input_string):
sha256 = hashlib.sha256()
sha256.update(input_string.encode('utf-8'))
return sha256.hexdigest()
# 使用 SHA256 值检查内容是否已被 ethscribed
def check_content_exists(sha):
endpoint = f"/ethscriptions/exists/{sha}"
try:
response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint, timeout=5)
response.raise_for_status() # 如果响应状态码不是200,这会引发HTTPError
return response.json()['result']
except requests.RequestException: # 捕获所有与requests相关的异常
return "未知"
# 处理数据
def orginize_data(index):
global name_not_yet_total
sha_value = sha256(names[index])
response_data = check_content_exists(sha_value)
if hide_inscribed:
if not response_data:
result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
if response_data == '未知':
result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
else:
if response_data:
result.append({"铭文": names[index], "状态": '已题写', "十六进制数据": all_items[index][2:]})
elif response_data == '未知':
result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
else:
result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
# 确定索引器是否落后
def get_block_status():
endpoint = "/block_status"
response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint)
try:
data = response.json()
return data.get('blocks_behind', None)
except json.JSONDecodeError:
print("Failed to decode JSON. Response content:")
print(response.text)
return None
# 获取题写铭文的矿工费
def estimate_transaction_fee(text_length: int, speed: str = 'Propose') -> float:
"""
Estimate the transaction fee for an Ethereum transaction based on text length using Etherscan API.
:param api_key: Etherscan API key.
:param text_length: Length of the input text.
:param speed: Desired confirmation speed. Can be 'Safe', 'Propose', or 'Fast'.
:param request_timeout: Timeout for the HTTP request in seconds.
:return: Estimated transaction fee in ETH or None in case of errors.
"""
try:
# Get current gas prices from Etherscan
base_url = "https://api.etherscan.io/api"
params = {
"module": "gastracker",
"action": "gasoracle",
"apikey": etherscan_api_key
}
response = requests.get(base_url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
if data['status'] != '1':
return None
if speed not in ['Safe', 'Propose', 'Fast']:
return None
selected_gas_price = int(data['result'][f"{speed}GasPrice"])
# Calculate gas for the input text
NON_ZERO_BYTE_GAS = 68
ZERO_BYTE_GAS = 4
# Assuming every character is 1 byte (for simplification)
# In reality, certain UTF-8 characters may be more than 1 byte
data_gas = text_length * NON_ZERO_BYTE_GAS
# Calculate total gas and fee
BASE_GAS = 21000
total_gas = BASE_GAS + data_gas
total_fee_eth = total_gas * selected_gas_price * 1e-9
return total_fee_eth
except (requests.Timeout, requests.RequestException):
return None
# 查询 ETH token 价格
def get_eth_price():
try:
# 使用Etherscan的API endpoint获取以太坊的价格
url = f"https://api.etherscan.io/api?module=stats&action=ethprice&apikey=1I2ERGUVJZNJAFKY7MG4S317DUGRPXEQPF"
response = requests.get(url)
response.raise_for_status() # 将引发HTTPError,如果HTTP请求返回了不成功的状态码
data = response.json()
if data["message"] == "OK":
eth_price = float(data["result"]["ethusd"])
return eth_price
else:
print("Error fetching the ETH price")
return None
except requests.RequestException as e:
print(f"Error while accessing the API: {e}")
return None
st.set_page_config(page_title="EthPen - 批量查询铭文状态", page_icon="🔍", layout='centered', initial_sidebar_state='auto')
st.markdown(f'# <img src="data:image/svg+xml;base64,{image_to_base64(os.path.join("img", "ethpen_logo.svg"))}" alt="Image" width="64px" height="64px" style="border-radius: 8px; box-shadow: 3px 3px 10px rgba(0,0,0,0.1);"/> :rainbow[批量查询铭文状态]', unsafe_allow_html=True)
st.subheader(r' ', anchor=False, divider='rainbow')
st.write("<style>div.row-widget.stRadio > div{background-color:white;border: 1px solid #e6e9ef;border-radius:8px;padding:10px;box-shadow: 2px 2px 10px #e6e9ef;}</style>", unsafe_allow_html=True)
st.markdown(f'### 💖 查找你心仪的铭文')
name_selected_option = st.radio("选择查询的类型:",
['🔤纯文本', '🪙代币铭文', '🆔.eths', '🆔.eth', '🌲.tree', '🦛.honk', '🔄.etch', '🌐.com', '🥳.club'], index=0,
horizontal=True)
if name_selected_option == '🔤纯文本':
check_type = ''
if name_selected_option == '🪙代币铭文':
token_check = True
token_str = st.text_input('填写代币铭文文本(变动的部分文本如 id 请用 "@#" 代替):', 'data:,{"p":"erc-20","op":"mint","tick":"eths","id":"@#","amt":"1000"}')
col1, col2 = st.columns(2)
with col1:
token_min_id = st.number_input('填写最小 ID:', min_value=1, value=1, step=1)
with col2:
token_max_id = st.number_input('填写最大 ID:', min_value=2, value=21000, step=1)
token_str_list = []
if token_str != '' and '@#' in token_str:
for token_id in range(token_min_id, token_max_id + 1):
token_str_list.append(token_str.replace('@#', str(token_id)))
content_set = '\n'.join(token_str_list)
st.code(token_str_list[0], language="json", line_numbers=False)
content = st.text_area('铭文列表:', value=content_set)
else:
token_check = False
if name_selected_option == '🆔.eths':
check_type = '.eths'
if name_selected_option == '🆔.eth':
check_type = '.eth'
if name_selected_option == '🌲.tree':
check_type = '.tree'
if name_selected_option == '🦛.honk':
check_type = '.honk'
if name_selected_option == '🔄.etch':
check_type = '.etch'
if name_selected_option == '🌐.com':
check_type = '.com'
if name_selected_option == '🥳.club':
check_type = '.club'
if not token_check:
# 执行SQL查询,获取所有表名
name_set_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
# 检索查询结果
tables = name_set_cursor.fetchall()
name_srt_tables_options = []
name_srt_tables_options.append('⌨️ 直接输入')
for table in tables:
name_srt_tables_options.append(table[0])
name_srt_tables_options.append('📃 导入文件')
name_set_table = st.radio("选择系列:", name_srt_tables_options, index=0, disabled=token_check, horizontal=True)
if name_set_table and name_set_table != '⌨️ 直接输入' and name_set_table != '📃 导入文件':
name_set_tables_name_options = []
# 执行SQL查询,获取指定表的所有 'name' 列的值
name_set_cursor.execute(f"SELECT name FROM {name_set_table}")
# 检索查询结果
names = name_set_cursor.fetchall()
# 打印所有 'name' 列的值
for name in names:
name_set_tables_name_options.append(name[0])
name_set_tables_name_options.sort()
name_set_table_name = st.radio("系列", name_set_tables_name_options, index=0, disabled=token_check, horizontal=True,
label_visibility="collapsed")
if name_set_table_name:
# 执行SQL查询,获取指定表的 'name' 和 'data' 列的值
name_set_cursor.execute(f"SELECT data FROM {name_set_table} WHERE name='{name_set_table_name}'")
# 检索查询结果
all_data = name_set_cursor.fetchall()
content_set = f'{all_data[0][0]}'
if name_set_table == "⌨️ 直接输入":
content_set = ''
if name_set_table == "📃 导入文件":
uploaded_file = st.file_uploader("上传文件", type=['txt', 'csv', 'xlsx'])
if uploaded_file is not None:
if uploaded_file.type not in ['text/plain', 'text/csv', 'application/vnd.ms-excel']:
st.error('文件类型不支持')
else:
# 读取上传的文件内容
uploaded_file_contents = uploaded_file.read().decode('utf-8')
# 将文件内容分割成列表,使用空格、换行符和逗号作为分隔符
separators = [' ', '\n', ',']
uploaded_file_contents_list = re.split('|'.join(map(re.escape, separators)), uploaded_file_contents)
# 在这里,file_contents_list 包含了分割后的文本元素
# 去除重复元素,将列表转换为集合,然后再转回列表
uploaded_unique_contents_list = list(set(uploaded_file_contents_list))
content_set = ' '.join(uploaded_unique_contents_list)
if not token_check:
custom_suffix = st.toggle('自定义前后缀')
if custom_suffix:
col1, col2 = st.columns(2)
with col1:
custom_prefix = st.text_input("自定义前缀")
with col2:
if check_type == '':
suffix_check = False
else:
suffix_check = True
custom_suffix = st.text_input("自定义后缀", disabled=suffix_check)
else:
custom_prefix, custom_suffix = '', ''
if not token_check:
filter = st.toggle('过滤器')
if filter:
col1, col2, col3, col4, col5, col6 = st.columns(6)
with col1:
start_with = st.text_input("以*开头", key='过滤器(可选)1')
with col2:
end_with = st.text_input("以*结尾", key='过滤器(可选)2')
with col3:
contain = st.text_input("包含*", key='过滤器(可选)3')
with col4:
no_contain = st.text_input("不包含*", key='过滤器(可选)4')
with col5:
min_length = st.text_input("最少长度", key='过滤器(可选)5')
with col6:
max_length = st.text_input("最大长度", key='过滤器(可选)6')
# 将过滤条件应用到原始 name_set 字符串列表
filtered_list = content_set.split() # 使用空格、换行符和逗号分隔字符串,得到列表
if start_with:
filtered_list = [item for item in filtered_list if item.startswith(start_with)]
if end_with:
filtered_list = [item for item in filtered_list if item.endswith(end_with)]
if contain:
filtered_list = [item for item in filtered_list if contain in item]
if no_contain:
filtered_list = [item for item in filtered_list if no_contain not in item]
if min_length:
filtered_list = [item for item in filtered_list if len(item) >= int(min_length)]
if max_length:
filtered_list = [item for item in filtered_list if len(item) <= int(max_length)]
# 将处理后的列表重新合并成字符串
content_set = ' '.join(filtered_list)
if not token_check:
content = st.text_area('铭文列表:', value=content_set, key=1)
hide_inscribed = st.toggle('隐藏已题写')
enable_api = st.toggle('启用 Ethscriptions.com API (v1)')
if enable_api:
print(get_block_status())
st.success(f"铭文数据来自 [Ethscriptions.com](https://ethscriptions.com/) 官方网站,当前索引器状态落后: {get_block_status()} 个区块。我们将以 100 个线程进行网络请求查询。尽管查询速度稍显缓慢,但所得数据确保了真实性与准确性。")
if st.button('🔎 查询', key='🔎 查询'):
st.markdown(f'### 📄 查询结果')
all_items = []
names = []
if token_check:
for i in token_str_list:
names.append(i)
all_items.append(text_to_hex(i))
else:
if check_type == '':
names = re.split(r"\s+", content)
# names = list(set(names))
names = [custom_prefix + name + custom_suffix for name in names]
names = ["data:," + name if not name.startswith("data:,") else name for name in names]
for i in names:
if names != 'data:,':
all_items.append(text_to_hex(i))
else:
names = re.split(r"\s+", content)
# names = list(set(names))
names = [custom_prefix + name + check_type for name in names]
names = ["data:," + name if not name.startswith("data:,") else name for name in names]
for i in names:
if names != 'data:,':
all_items.append(text_to_hex(i))
ethscrptions_db_file = os.path.join(parent_dir, 'data', 'ethscriptions_data.db')
ethscriptions_conn = sqlite3.connect(ethscrptions_db_file)
ethscriptions_cursor = ethscriptions_conn.cursor()
names_total = len(names)
result = []
if enable_api:
with ThreadPoolExecutor(max_workers=100) as executor:
for index in range(len(names)):
executor.submit(orginize_data, index) # Submit task to thread pool
else:
for index in range(len(names)):
ethscriptions_cursor.execute("SELECT * FROM data WHERE data=?", (all_items[index],))
row = ethscriptions_cursor.fetchone()
if hide_inscribed:
if not row:
result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
else:
if row:
result.append({"铭文": names[index], "状态": "已题写", "十六进制数据": all_items[index][2:]})
else:
result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]})
name_not_yet_total += 1
st.progress(1 - (name_not_yet_total / names_total), f'题写进度 {names_total - name_not_yet_total}/{names_total} ({(1 - (name_not_yet_total / names_total)) * 100:.0f}%):')
name_not_yet_total = 0
if result:
# 将结果转换为Pandas DataFrame
result_df = pd.DataFrame(result)
# print(result_df)
st.dataframe(result_df, use_container_width=True, hide_index=True)
# 使用列表解析获取所有"未题写"状态的"十六进制数据"
not_written_data = [item['十六进制数据'] for item in result if item['状态'] == '未题写']
# 获取未题写数据的个数和总长度
num_not_written = len(not_written_data)
total_length = sum(len(data) for data in not_written_data)
gas_spent = 0 if num_not_written == 0 else estimate_transaction_fee(total_length // num_not_written)
eth_price = get_eth_price()
st.success(
f'题写上面未题写的铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 {gas_spent * eth_price:.2f}USD;题写全部未题写铭文最低需要花费 {gas_spent * num_not_written:.10f} ETH,约为 {gas_spent * num_not_written * eth_price:.2f}USD。')
# Convert DataFrame to CSV with proper encoding
csv_export = result_df.to_csv(index=False, encoding='utf-8')
# Add a download button for the DataFrame
st.download_button(
label="⏬ 下载查询结果",
data=csv_export,
file_name="ethpen_result_data.csv",
mime="text/csv"
)
else:
st.markdown(f'### ☹️ 你来迟了~')
#
# print(len(content_set))
# gas_spent = 0 if len(content_set.split()) == 0 else estimate_transaction_fee(len(content_set) // len(content_set.split()))
# st.success(f'题写上面铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 ${gas_spent * get_eth_price():.2f}。')