Spaces:
Running
Running
import base64 | |
import hashlib | |
import json | |
import requests | |
import streamlit as st | |
import re | |
import os | |
import sqlite3 | |
import pandas as pd | |
from concurrent.futures import ThreadPoolExecutor | |
# 获取当前文件目录 | |
current_dir = os.path.dirname(__file__) | |
parent_dir = os.path.dirname(current_dir) | |
name_set_file = os.path.join(parent_dir, 'data', 'name_set.db') | |
name_set_conn = sqlite3.connect(name_set_file) | |
name_set_cursor = name_set_conn.cursor() | |
content_set = '' | |
check_type = '' | |
name_not_yet_total = 0 | |
etherscan_api_key = os.environ.get('etherscan_api_key', '') | |
# 图片Base64 | |
def image_to_base64(img_path): | |
with open(img_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode() | |
# 把文字转换成 16 进制 | |
def text_to_hex(text): | |
return '0x' + ''.join(format(byte, '02x') for byte in text.encode('utf-8')) | |
# str SHA256 | |
def sha256(input_string): | |
sha256 = hashlib.sha256() | |
sha256.update(input_string.encode('utf-8')) | |
return sha256.hexdigest() | |
# 使用 SHA256 值检查内容是否已被 ethscribed | |
def check_content_exists(sha): | |
endpoint = f"/ethscriptions/exists/{sha}" | |
try: | |
response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint, timeout=5) | |
response.raise_for_status() # 如果响应状态码不是200,这会引发HTTPError | |
return response.json()['result'] | |
except requests.RequestException: # 捕获所有与requests相关的异常 | |
return "未知" | |
# 处理数据 | |
def orginize_data(index): | |
global name_not_yet_total | |
sha_value = sha256(names[index]) | |
response_data = check_content_exists(sha_value) | |
if hide_inscribed: | |
if not response_data: | |
result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]}) | |
name_not_yet_total += 1 | |
if response_data == '未知': | |
result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]}) | |
name_not_yet_total += 1 | |
else: | |
if response_data: | |
result.append({"铭文": names[index], "状态": '已题写', "十六进制数据": all_items[index][2:]}) | |
elif response_data == '未知': | |
result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]}) | |
name_not_yet_total += 1 | |
else: | |
result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]}) | |
name_not_yet_total += 1 | |
# 确定索引器是否落后 | |
def get_block_status(): | |
endpoint = "/block_status" | |
response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint) | |
try: | |
data = response.json() | |
return data.get('blocks_behind', None) | |
except json.JSONDecodeError: | |
print("Failed to decode JSON. Response content:") | |
print(response.text) | |
return None | |
# 获取题写铭文的矿工费 | |
def estimate_transaction_fee(text_length: int, speed: str = 'Propose') -> float: | |
""" | |
Estimate the transaction fee for an Ethereum transaction based on text length using Etherscan API. | |
:param api_key: Etherscan API key. | |
:param text_length: Length of the input text. | |
:param speed: Desired confirmation speed. Can be 'Safe', 'Propose', or 'Fast'. | |
:param request_timeout: Timeout for the HTTP request in seconds. | |
:return: Estimated transaction fee in ETH or None in case of errors. | |
""" | |
try: | |
# Get current gas prices from Etherscan | |
base_url = "https://api.etherscan.io/api" | |
params = { | |
"module": "gastracker", | |
"action": "gasoracle", | |
"apikey": etherscan_api_key | |
} | |
response = requests.get(base_url, params=params, timeout=5) | |
response.raise_for_status() | |
data = response.json() | |
if data['status'] != '1': | |
return None | |
if speed not in ['Safe', 'Propose', 'Fast']: | |
return None | |
selected_gas_price = int(data['result'][f"{speed}GasPrice"]) | |
# Calculate gas for the input text | |
NON_ZERO_BYTE_GAS = 68 | |
ZERO_BYTE_GAS = 4 | |
# Assuming every character is 1 byte (for simplification) | |
# In reality, certain UTF-8 characters may be more than 1 byte | |
data_gas = text_length * NON_ZERO_BYTE_GAS | |
# Calculate total gas and fee | |
BASE_GAS = 21000 | |
total_gas = BASE_GAS + data_gas | |
total_fee_eth = total_gas * selected_gas_price * 1e-9 | |
return total_fee_eth | |
except (requests.Timeout, requests.RequestException): | |
return None | |
# 查询 ETH token 价格 | |
def get_eth_price(): | |
try: | |
# 使用Etherscan的API endpoint获取以太坊的价格 | |
url = f"https://api.etherscan.io/api?module=stats&action=ethprice&apikey=1I2ERGUVJZNJAFKY7MG4S317DUGRPXEQPF" | |
response = requests.get(url) | |
response.raise_for_status() # 将引发HTTPError,如果HTTP请求返回了不成功的状态码 | |
data = response.json() | |
if data["message"] == "OK": | |
eth_price = float(data["result"]["ethusd"]) | |
return eth_price | |
else: | |
print("Error fetching the ETH price") | |
return None | |
except requests.RequestException as e: | |
print(f"Error while accessing the API: {e}") | |
return None | |
st.set_page_config(page_title="EthPen - 批量查询铭文状态", page_icon="🔍", layout='centered', initial_sidebar_state='auto') | |
st.markdown(f'# <img src="data:image/svg+xml;base64,{image_to_base64(os.path.join("img", "ethpen_logo.svg"))}" alt="Image" width="64px" height="64px" style="border-radius: 8px; box-shadow: 3px 3px 10px rgba(0,0,0,0.1);"/> :rainbow[批量查询铭文状态]', unsafe_allow_html=True) | |
st.subheader(r' ', anchor=False, divider='rainbow') | |
st.write("<style>div.row-widget.stRadio > div{background-color:white;border: 1px solid #e6e9ef;border-radius:8px;padding:10px;box-shadow: 2px 2px 10px #e6e9ef;}</style>", unsafe_allow_html=True) | |
st.markdown(f'### 💖 查找你心仪的铭文') | |
name_selected_option = st.radio("选择查询的类型:", | |
['🔤纯文本', '🪙代币铭文', '🆔.eths', '🆔.eth', '🌲.tree', '🦛.honk', '🔄.etch', '🌐.com', '🥳.club'], index=0, | |
horizontal=True) | |
if name_selected_option == '🔤纯文本': | |
check_type = '' | |
if name_selected_option == '🪙代币铭文': | |
token_check = True | |
token_str = st.text_input('填写代币铭文文本(变动的部分文本如 id 请用 "@#" 代替):', 'data:,{"p":"erc-20","op":"mint","tick":"eths","id":"@#","amt":"1000"}') | |
col1, col2 = st.columns(2) | |
with col1: | |
token_min_id = st.number_input('填写最小 ID:', min_value=1, value=1, step=1) | |
with col2: | |
token_max_id = st.number_input('填写最大 ID:', min_value=2, value=21000, step=1) | |
token_str_list = [] | |
if token_str != '' and '@#' in token_str: | |
for token_id in range(token_min_id, token_max_id + 1): | |
token_str_list.append(token_str.replace('@#', str(token_id))) | |
content_set = '\n'.join(token_str_list) | |
st.code(token_str_list[0], language="json", line_numbers=False) | |
content = st.text_area('铭文列表:', value=content_set) | |
else: | |
token_check = False | |
if name_selected_option == '🆔.eths': | |
check_type = '.eths' | |
if name_selected_option == '🆔.eth': | |
check_type = '.eth' | |
if name_selected_option == '🌲.tree': | |
check_type = '.tree' | |
if name_selected_option == '🦛.honk': | |
check_type = '.honk' | |
if name_selected_option == '🔄.etch': | |
check_type = '.etch' | |
if name_selected_option == '🌐.com': | |
check_type = '.com' | |
if name_selected_option == '🥳.club': | |
check_type = '.club' | |
if not token_check: | |
# 执行SQL查询,获取所有表名 | |
name_set_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
# 检索查询结果 | |
tables = name_set_cursor.fetchall() | |
name_srt_tables_options = [] | |
name_srt_tables_options.append('⌨️ 直接输入') | |
for table in tables: | |
name_srt_tables_options.append(table[0]) | |
name_srt_tables_options.append('📃 导入文件') | |
name_set_table = st.radio("选择系列:", name_srt_tables_options, index=0, disabled=token_check, horizontal=True) | |
if name_set_table and name_set_table != '⌨️ 直接输入' and name_set_table != '📃 导入文件': | |
name_set_tables_name_options = [] | |
# 执行SQL查询,获取指定表的所有 'name' 列的值 | |
name_set_cursor.execute(f"SELECT name FROM {name_set_table}") | |
# 检索查询结果 | |
names = name_set_cursor.fetchall() | |
# 打印所有 'name' 列的值 | |
for name in names: | |
name_set_tables_name_options.append(name[0]) | |
name_set_tables_name_options.sort() | |
name_set_table_name = st.radio("系列", name_set_tables_name_options, index=0, disabled=token_check, horizontal=True, | |
label_visibility="collapsed") | |
if name_set_table_name: | |
# 执行SQL查询,获取指定表的 'name' 和 'data' 列的值 | |
name_set_cursor.execute(f"SELECT data FROM {name_set_table} WHERE name='{name_set_table_name}'") | |
# 检索查询结果 | |
all_data = name_set_cursor.fetchall() | |
content_set = f'{all_data[0][0]}' | |
if name_set_table == "⌨️ 直接输入": | |
content_set = '' | |
if name_set_table == "📃 导入文件": | |
uploaded_file = st.file_uploader("上传文件", type=['txt', 'csv', 'xlsx']) | |
if uploaded_file is not None: | |
if uploaded_file.type not in ['text/plain', 'text/csv', 'application/vnd.ms-excel']: | |
st.error('文件类型不支持') | |
else: | |
# 读取上传的文件内容 | |
uploaded_file_contents = uploaded_file.read().decode('utf-8') | |
# 将文件内容分割成列表,使用空格、换行符和逗号作为分隔符 | |
separators = [' ', '\n', ','] | |
uploaded_file_contents_list = re.split('|'.join(map(re.escape, separators)), uploaded_file_contents) | |
# 在这里,file_contents_list 包含了分割后的文本元素 | |
# 去除重复元素,将列表转换为集合,然后再转回列表 | |
uploaded_unique_contents_list = list(set(uploaded_file_contents_list)) | |
content_set = ' '.join(uploaded_unique_contents_list) | |
if not token_check: | |
custom_suffix = st.toggle('自定义前后缀') | |
if custom_suffix: | |
col1, col2 = st.columns(2) | |
with col1: | |
custom_prefix = st.text_input("自定义前缀") | |
with col2: | |
if check_type == '': | |
suffix_check = False | |
else: | |
suffix_check = True | |
custom_suffix = st.text_input("自定义后缀", disabled=suffix_check) | |
else: | |
custom_prefix, custom_suffix = '', '' | |
if not token_check: | |
filter = st.toggle('过滤器') | |
if filter: | |
col1, col2, col3, col4, col5, col6 = st.columns(6) | |
with col1: | |
start_with = st.text_input("以*开头", key='过滤器(可选)1') | |
with col2: | |
end_with = st.text_input("以*结尾", key='过滤器(可选)2') | |
with col3: | |
contain = st.text_input("包含*", key='过滤器(可选)3') | |
with col4: | |
no_contain = st.text_input("不包含*", key='过滤器(可选)4') | |
with col5: | |
min_length = st.text_input("最少长度", key='过滤器(可选)5') | |
with col6: | |
max_length = st.text_input("最大长度", key='过滤器(可选)6') | |
# 将过滤条件应用到原始 name_set 字符串列表 | |
filtered_list = content_set.split() # 使用空格、换行符和逗号分隔字符串,得到列表 | |
if start_with: | |
filtered_list = [item for item in filtered_list if item.startswith(start_with)] | |
if end_with: | |
filtered_list = [item for item in filtered_list if item.endswith(end_with)] | |
if contain: | |
filtered_list = [item for item in filtered_list if contain in item] | |
if no_contain: | |
filtered_list = [item for item in filtered_list if no_contain not in item] | |
if min_length: | |
filtered_list = [item for item in filtered_list if len(item) >= int(min_length)] | |
if max_length: | |
filtered_list = [item for item in filtered_list if len(item) <= int(max_length)] | |
# 将处理后的列表重新合并成字符串 | |
content_set = ' '.join(filtered_list) | |
if not token_check: | |
content = st.text_area('铭文列表:', value=content_set, key=1) | |
hide_inscribed = st.toggle('隐藏已题写') | |
enable_api = st.toggle('启用 Ethscriptions.com API (v1)') | |
if enable_api: | |
print(get_block_status()) | |
st.success(f"铭文数据来自 [Ethscriptions.com](https://ethscriptions.com/) 官方网站,当前索引器状态落后: {get_block_status()} 个区块。我们将以 100 个线程进行网络请求查询。尽管查询速度稍显缓慢,但所得数据确保了真实性与准确性。") | |
if st.button('🔎 查询', key='🔎 查询'): | |
st.markdown(f'### 📄 查询结果') | |
all_items = [] | |
names = [] | |
if token_check: | |
for i in token_str_list: | |
names.append(i) | |
all_items.append(text_to_hex(i)) | |
else: | |
if check_type == '': | |
names = re.split(r"\s+", content) | |
# names = list(set(names)) | |
names = [custom_prefix + name + custom_suffix for name in names] | |
names = ["data:," + name if not name.startswith("data:,") else name for name in names] | |
for i in names: | |
if names != 'data:,': | |
all_items.append(text_to_hex(i)) | |
else: | |
names = re.split(r"\s+", content) | |
# names = list(set(names)) | |
names = [custom_prefix + name + check_type for name in names] | |
names = ["data:," + name if not name.startswith("data:,") else name for name in names] | |
for i in names: | |
if names != 'data:,': | |
all_items.append(text_to_hex(i)) | |
ethscrptions_db_file = os.path.join(parent_dir, 'data', 'ethscriptions_data.db') | |
ethscriptions_conn = sqlite3.connect(ethscrptions_db_file) | |
ethscriptions_cursor = ethscriptions_conn.cursor() | |
names_total = len(names) | |
result = [] | |
if enable_api: | |
with ThreadPoolExecutor(max_workers=100) as executor: | |
for index in range(len(names)): | |
executor.submit(orginize_data, index) # Submit task to thread pool | |
else: | |
for index in range(len(names)): | |
ethscriptions_cursor.execute("SELECT * FROM data WHERE data=?", (all_items[index],)) | |
row = ethscriptions_cursor.fetchone() | |
if hide_inscribed: | |
if not row: | |
result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]}) | |
name_not_yet_total += 1 | |
else: | |
if row: | |
result.append({"铭文": names[index], "状态": "已题写", "十六进制数据": all_items[index][2:]}) | |
else: | |
result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]}) | |
name_not_yet_total += 1 | |
st.progress(1 - (name_not_yet_total / names_total), f'题写进度 {names_total - name_not_yet_total}/{names_total} ({(1 - (name_not_yet_total / names_total)) * 100:.0f}%):') | |
name_not_yet_total = 0 | |
if result: | |
# 将结果转换为Pandas DataFrame | |
result_df = pd.DataFrame(result) | |
# print(result_df) | |
st.dataframe(result_df, use_container_width=True, hide_index=True) | |
# 使用列表解析获取所有"未题写"状态的"十六进制数据" | |
not_written_data = [item['十六进制数据'] for item in result if item['状态'] == '未题写'] | |
# 获取未题写数据的个数和总长度 | |
num_not_written = len(not_written_data) | |
total_length = sum(len(data) for data in not_written_data) | |
gas_spent = 0 if num_not_written == 0 else estimate_transaction_fee(total_length // num_not_written) | |
eth_price = get_eth_price() | |
st.success( | |
f'题写上面未题写的铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 {gas_spent * eth_price:.2f}USD;题写全部未题写铭文最低需要花费 {gas_spent * num_not_written:.10f} ETH,约为 {gas_spent * num_not_written * eth_price:.2f}USD。') | |
# Convert DataFrame to CSV with proper encoding | |
csv_export = result_df.to_csv(index=False, encoding='utf-8') | |
# Add a download button for the DataFrame | |
st.download_button( | |
label="⏬ 下载查询结果", | |
data=csv_export, | |
file_name="ethpen_result_data.csv", | |
mime="text/csv" | |
) | |
else: | |
st.markdown(f'### ☹️ 你来迟了~') | |
# | |
# print(len(content_set)) | |
# gas_spent = 0 if len(content_set.split()) == 0 else estimate_transaction_fee(len(content_set) // len(content_set.split())) | |
# st.success(f'题写上面铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 ${gas_spent * get_eth_price():.2f}。') |