import base64 import hashlib import json import requests import streamlit as st import re import os import sqlite3 import pandas as pd from concurrent.futures import ThreadPoolExecutor # 获取当前文件目录 current_dir = os.path.dirname(__file__) parent_dir = os.path.dirname(current_dir) name_set_file = os.path.join(parent_dir, 'data', 'name_set.db') name_set_conn = sqlite3.connect(name_set_file) name_set_cursor = name_set_conn.cursor() content_set = '' check_type = '' name_not_yet_total = 0 etherscan_api_key = os.environ.get('etherscan_api_key', '') # 图片Base64 def image_to_base64(img_path): with open(img_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode() # 把文字转换成 16 进制 def text_to_hex(text): return '0x' + ''.join(format(byte, '02x') for byte in text.encode('utf-8')) # str SHA256 def sha256(input_string): sha256 = hashlib.sha256() sha256.update(input_string.encode('utf-8')) return sha256.hexdigest() # 使用 SHA256 值检查内容是否已被 ethscribed def check_content_exists(sha): endpoint = f"/ethscriptions/exists/{sha}" try: response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint, timeout=5) response.raise_for_status() # 如果响应状态码不是200,这会引发HTTPError return response.json()['result'] except requests.RequestException: # 捕获所有与requests相关的异常 return "未知" # 处理数据 def orginize_data(index): global name_not_yet_total sha_value = sha256(names[index]) response_data = check_content_exists(sha_value) if hide_inscribed: if not response_data: result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]}) name_not_yet_total += 1 if response_data == '未知': result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]}) name_not_yet_total += 1 else: if response_data: result.append({"铭文": names[index], "状态": '已题写', "十六进制数据": all_items[index][2:]}) elif response_data == '未知': result.append({"铭文": names[index], "状态": '未知', "十六进制数据": all_items[index][2:]}) name_not_yet_total += 1 else: result.append({"铭文": names[index], "状态": '未题写', "十六进制数据": all_items[index][2:]}) name_not_yet_total += 1 # 确定索引器是否落后 def get_block_status(): endpoint = "/block_status" response = requests.get("https://mainnet-api.ethscriptions.com/api" + endpoint) try: data = response.json() return data.get('blocks_behind', None) except json.JSONDecodeError: print("Failed to decode JSON. Response content:") print(response.text) return None # 获取题写铭文的矿工费 def estimate_transaction_fee(text_length: int, speed: str = 'Propose') -> float: """ Estimate the transaction fee for an Ethereum transaction based on text length using Etherscan API. :param api_key: Etherscan API key. :param text_length: Length of the input text. :param speed: Desired confirmation speed. Can be 'Safe', 'Propose', or 'Fast'. :param request_timeout: Timeout for the HTTP request in seconds. :return: Estimated transaction fee in ETH or None in case of errors. """ try: # Get current gas prices from Etherscan base_url = "https://api.etherscan.io/api" params = { "module": "gastracker", "action": "gasoracle", "apikey": etherscan_api_key } response = requests.get(base_url, params=params, timeout=5) response.raise_for_status() data = response.json() if data['status'] != '1': return None if speed not in ['Safe', 'Propose', 'Fast']: return None selected_gas_price = int(data['result'][f"{speed}GasPrice"]) # Calculate gas for the input text NON_ZERO_BYTE_GAS = 68 ZERO_BYTE_GAS = 4 # Assuming every character is 1 byte (for simplification) # In reality, certain UTF-8 characters may be more than 1 byte data_gas = text_length * NON_ZERO_BYTE_GAS # Calculate total gas and fee BASE_GAS = 21000 total_gas = BASE_GAS + data_gas total_fee_eth = total_gas * selected_gas_price * 1e-9 return total_fee_eth except (requests.Timeout, requests.RequestException): return None # 查询 ETH token 价格 def get_eth_price(): try: # 使用Etherscan的API endpoint获取以太坊的价格 url = f"https://api.etherscan.io/api?module=stats&action=ethprice&apikey=1I2ERGUVJZNJAFKY7MG4S317DUGRPXEQPF" response = requests.get(url) response.raise_for_status() # 将引发HTTPError,如果HTTP请求返回了不成功的状态码 data = response.json() if data["message"] == "OK": eth_price = float(data["result"]["ethusd"]) return eth_price else: print("Error fetching the ETH price") return None except requests.RequestException as e: print(f"Error while accessing the API: {e}") return None st.set_page_config(page_title="EthPen - 批量查询铭文状态", page_icon="🔍", layout='centered', initial_sidebar_state='auto') st.markdown(f'# Image :rainbow[批量查询铭文状态]', unsafe_allow_html=True) st.subheader(r' ', anchor=False, divider='rainbow') st.write("", unsafe_allow_html=True) st.markdown(f'### 💖 查找你心仪的铭文') name_selected_option = st.radio("选择查询的类型:", ['🔤纯文本', '🪙代币铭文', '🆔.eths', '🆔.eth', '🌲.tree', '🦛.honk', '🔄.etch', '🌐.com', '🥳.club'], index=0, horizontal=True) if name_selected_option == '🔤纯文本': check_type = '' if name_selected_option == '🪙代币铭文': token_check = True token_str = st.text_input('填写代币铭文文本(变动的部分文本如 id 请用 "@#" 代替):', 'data:,{"p":"erc-20","op":"mint","tick":"eths","id":"@#","amt":"1000"}') col1, col2 = st.columns(2) with col1: token_min_id = st.number_input('填写最小 ID:', min_value=1, value=1, step=1) with col2: token_max_id = st.number_input('填写最大 ID:', min_value=2, value=21000, step=1) token_str_list = [] if token_str != '' and '@#' in token_str: for token_id in range(token_min_id, token_max_id + 1): token_str_list.append(token_str.replace('@#', str(token_id))) content_set = '\n'.join(token_str_list) st.code(token_str_list[0], language="json", line_numbers=False) content = st.text_area('铭文列表:', value=content_set) else: token_check = False if name_selected_option == '🆔.eths': check_type = '.eths' if name_selected_option == '🆔.eth': check_type = '.eth' if name_selected_option == '🌲.tree': check_type = '.tree' if name_selected_option == '🦛.honk': check_type = '.honk' if name_selected_option == '🔄.etch': check_type = '.etch' if name_selected_option == '🌐.com': check_type = '.com' if name_selected_option == '🥳.club': check_type = '.club' if not token_check: # 执行SQL查询,获取所有表名 name_set_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") # 检索查询结果 tables = name_set_cursor.fetchall() name_srt_tables_options = [] name_srt_tables_options.append('⌨️ 直接输入') for table in tables: name_srt_tables_options.append(table[0]) name_srt_tables_options.append('📃 导入文件') name_set_table = st.radio("选择系列:", name_srt_tables_options, index=0, disabled=token_check, horizontal=True) if name_set_table and name_set_table != '⌨️ 直接输入' and name_set_table != '📃 导入文件': name_set_tables_name_options = [] # 执行SQL查询,获取指定表的所有 'name' 列的值 name_set_cursor.execute(f"SELECT name FROM {name_set_table}") # 检索查询结果 names = name_set_cursor.fetchall() # 打印所有 'name' 列的值 for name in names: name_set_tables_name_options.append(name[0]) name_set_tables_name_options.sort() name_set_table_name = st.radio("系列", name_set_tables_name_options, index=0, disabled=token_check, horizontal=True, label_visibility="collapsed") if name_set_table_name: # 执行SQL查询,获取指定表的 'name' 和 'data' 列的值 name_set_cursor.execute(f"SELECT data FROM {name_set_table} WHERE name='{name_set_table_name}'") # 检索查询结果 all_data = name_set_cursor.fetchall() content_set = f'{all_data[0][0]}' if name_set_table == "⌨️ 直接输入": content_set = '' if name_set_table == "📃 导入文件": uploaded_file = st.file_uploader("上传文件", type=['txt', 'csv', 'xlsx']) if uploaded_file is not None: if uploaded_file.type not in ['text/plain', 'text/csv', 'application/vnd.ms-excel']: st.error('文件类型不支持') else: # 读取上传的文件内容 uploaded_file_contents = uploaded_file.read().decode('utf-8') # 将文件内容分割成列表,使用空格、换行符和逗号作为分隔符 separators = [' ', '\n', ','] uploaded_file_contents_list = re.split('|'.join(map(re.escape, separators)), uploaded_file_contents) # 在这里,file_contents_list 包含了分割后的文本元素 # 去除重复元素,将列表转换为集合,然后再转回列表 uploaded_unique_contents_list = list(set(uploaded_file_contents_list)) content_set = ' '.join(uploaded_unique_contents_list) if not token_check: custom_suffix = st.toggle('自定义前后缀') if custom_suffix: col1, col2 = st.columns(2) with col1: custom_prefix = st.text_input("自定义前缀") with col2: if check_type == '': suffix_check = False else: suffix_check = True custom_suffix = st.text_input("自定义后缀", disabled=suffix_check) else: custom_prefix, custom_suffix = '', '' if not token_check: filter = st.toggle('过滤器') if filter: col1, col2, col3, col4, col5, col6 = st.columns(6) with col1: start_with = st.text_input("以*开头", key='过滤器(可选)1') with col2: end_with = st.text_input("以*结尾", key='过滤器(可选)2') with col3: contain = st.text_input("包含*", key='过滤器(可选)3') with col4: no_contain = st.text_input("不包含*", key='过滤器(可选)4') with col5: min_length = st.text_input("最少长度", key='过滤器(可选)5') with col6: max_length = st.text_input("最大长度", key='过滤器(可选)6') # 将过滤条件应用到原始 name_set 字符串列表 filtered_list = content_set.split() # 使用空格、换行符和逗号分隔字符串,得到列表 if start_with: filtered_list = [item for item in filtered_list if item.startswith(start_with)] if end_with: filtered_list = [item for item in filtered_list if item.endswith(end_with)] if contain: filtered_list = [item for item in filtered_list if contain in item] if no_contain: filtered_list = [item for item in filtered_list if no_contain not in item] if min_length: filtered_list = [item for item in filtered_list if len(item) >= int(min_length)] if max_length: filtered_list = [item for item in filtered_list if len(item) <= int(max_length)] # 将处理后的列表重新合并成字符串 content_set = ' '.join(filtered_list) if not token_check: content = st.text_area('铭文列表:', value=content_set, key=1) hide_inscribed = st.toggle('隐藏已题写') enable_api = st.toggle('启用 Ethscriptions.com API (v1)') if enable_api: print(get_block_status()) st.success(f"铭文数据来自 [Ethscriptions.com](https://ethscriptions.com/) 官方网站,当前索引器状态落后: {get_block_status()} 个区块。我们将以 100 个线程进行网络请求查询。尽管查询速度稍显缓慢,但所得数据确保了真实性与准确性。") if st.button('🔎 查询', key='🔎 查询'): st.markdown(f'### 📄 查询结果') all_items = [] names = [] if token_check: for i in token_str_list: names.append(i) all_items.append(text_to_hex(i)) else: if check_type == '': names = re.split(r"\s+", content) # names = list(set(names)) names = [custom_prefix + name + custom_suffix for name in names] names = ["data:," + name if not name.startswith("data:,") else name for name in names] for i in names: if names != 'data:,': all_items.append(text_to_hex(i)) else: names = re.split(r"\s+", content) # names = list(set(names)) names = [custom_prefix + name + check_type for name in names] names = ["data:," + name if not name.startswith("data:,") else name for name in names] for i in names: if names != 'data:,': all_items.append(text_to_hex(i)) ethscrptions_db_file = os.path.join(parent_dir, 'data', 'ethscriptions_data.db') ethscriptions_conn = sqlite3.connect(ethscrptions_db_file) ethscriptions_cursor = ethscriptions_conn.cursor() names_total = len(names) result = [] if enable_api: with ThreadPoolExecutor(max_workers=100) as executor: for index in range(len(names)): executor.submit(orginize_data, index) # Submit task to thread pool else: for index in range(len(names)): ethscriptions_cursor.execute("SELECT * FROM data WHERE data=?", (all_items[index],)) row = ethscriptions_cursor.fetchone() if hide_inscribed: if not row: result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]}) name_not_yet_total += 1 else: if row: result.append({"铭文": names[index], "状态": "已题写", "十六进制数据": all_items[index][2:]}) else: result.append({"铭文": names[index], "状态": "未题写", "十六进制数据": all_items[index][2:]}) name_not_yet_total += 1 st.progress(1 - (name_not_yet_total / names_total), f'题写进度 {names_total - name_not_yet_total}/{names_total} ({(1 - (name_not_yet_total / names_total)) * 100:.0f}%):') name_not_yet_total = 0 if result: # 将结果转换为Pandas DataFrame result_df = pd.DataFrame(result) # print(result_df) st.dataframe(result_df, use_container_width=True, hide_index=True) # 使用列表解析获取所有"未题写"状态的"十六进制数据" not_written_data = [item['十六进制数据'] for item in result if item['状态'] == '未题写'] # 获取未题写数据的个数和总长度 num_not_written = len(not_written_data) total_length = sum(len(data) for data in not_written_data) gas_spent = 0 if num_not_written == 0 else estimate_transaction_fee(total_length // num_not_written) eth_price = get_eth_price() st.success( f'题写上面未题写的铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 {gas_spent * eth_price:.2f}USD;题写全部未题写铭文最低需要花费 {gas_spent * num_not_written:.10f} ETH,约为 {gas_spent * num_not_written * eth_price:.2f}USD。') # Convert DataFrame to CSV with proper encoding csv_export = result_df.to_csv(index=False, encoding='utf-8') # Add a download button for the DataFrame st.download_button( label="⏬ 下载查询结果", data=csv_export, file_name="ethpen_result_data.csv", mime="text/csv" ) else: st.markdown(f'### ☹️ 你来迟了~') # # print(len(content_set)) # gas_spent = 0 if len(content_set.split()) == 0 else estimate_transaction_fee(len(content_set) // len(content_set.split())) # st.success(f'题写上面铭文平均每个最低花费 {gas_spent:.10f} ETH,约为 ${gas_spent * get_eth_price():.2f}。')