Spaces:
Runtime error
Runtime error
import json | |
import os | |
import time | |
from typing import List, Any, Optional, Union, Tuple | |
import numpy as np | |
import pandas as pd | |
import gc | |
import re | |
from dataclasses import dataclass | |
from datetime import datetime, timezone | |
from pathlib import Path | |
from enum import Enum | |
from string import Template | |
from json.decoder import JSONDecodeError | |
DEFAULT_MECH_FEE = 0.01 | |
REDUCE_FACTOR = 0.25 | |
SLEEP = 0.5 | |
REQUEST_ID_FIELD = "request_id" | |
SCRIPTS_DIR = Path(__file__).parent | |
ROOT_DIR = SCRIPTS_DIR.parent | |
DATA_DIR = ROOT_DIR / "data" | |
JSON_DATA_DIR = ROOT_DIR / "json_data" | |
HIST_DIR = ROOT_DIR / "historical_data" | |
TMP_DIR = ROOT_DIR / "tmp" | |
BLOCK_FIELD = "block" | |
CID_PREFIX = "f01701220" | |
REQUEST_ID = "requestId" | |
REQUEST_SENDER = "sender" | |
PROMPT_FIELD = "prompt" | |
HTTP = "http://" | |
HTTPS = HTTP[:4] + "s" + HTTP[4:] | |
FORMAT_UPDATE_BLOCK_NUMBER = 30411638 | |
INVALID_ANSWER_HEX = ( | |
"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" | |
) | |
OLD_IPFS_ADDRESS = "https://gateway.autonolas.tech/ipfs/" | |
IPFS_ADDRESS = "https://gateway.gcp.autonolas.tech/ipfs/" | |
INC_TOOLS = [ | |
"prediction-online", | |
"prediction-offline", | |
"claude-prediction-online", | |
"claude-prediction-offline", | |
"prediction-offline-sme", | |
"prediction-online-sme", | |
"prediction-request-rag", | |
"prediction-request-reasoning", | |
"prediction-url-cot-claude", | |
"prediction-request-rag-claude", | |
"prediction-request-reasoning-claude", | |
"superforcaster", | |
] | |
SUBGRAPH_URL = Template( | |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/7s9rGBffUTL8kDZuxvvpuc46v44iuDarbrADBFw5uVp2""" | |
) | |
OMEN_SUBGRAPH_URL = Template( | |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" | |
) | |
NETWORK_SUBGRAPH_URL = Template( | |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/FxV6YUix58SpYmLBwc9gEHkwjfkqwe1X5FJQjn8nKPyA""" | |
) | |
# THEGRAPH_ENDPOINT = ( | |
# "https://api.studio.thegraph.com/query/78829/mech-predict/version/latest" | |
# ) | |
MECH_SUBGRAPH_URL = Template( | |
"""https://gateway.thegraph.com/api/${subgraph_api_key}/subgraphs/id/4YGoX3iXUni1NBhWJS5xyKcntrAzssfytJK7PQxxQk5g""" | |
) | |
SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None) | |
RPC = os.environ.get("RPC", None) | |
class MechEventName(Enum): | |
"""The mech's event names.""" | |
REQUEST = "Request" | |
DELIVER = "Deliver" | |
class MechEvent: | |
"""A mech's on-chain event representation.""" | |
for_block: int | |
requestId: int | |
data: bytes | |
sender: str | |
def _ipfs_link(self) -> Optional[str]: | |
"""Get the ipfs link for the data.""" | |
return f"{IPFS_ADDRESS}{CID_PREFIX}{self.data.hex()}" | |
def ipfs_request_link(self) -> Optional[str]: | |
"""Get the IPFS link for the request.""" | |
return f"{self._ipfs_link()}/metadata.json" | |
def ipfs_deliver_link(self) -> Optional[str]: | |
"""Get the IPFS link for the deliver.""" | |
if self.requestId is None: | |
return None | |
return f"{self._ipfs_link()}/{self.requestId}" | |
def ipfs_link(self, event_name: MechEventName) -> Optional[str]: | |
"""Get the ipfs link based on the event.""" | |
if event_name == MechEventName.REQUEST: | |
if self.for_block < FORMAT_UPDATE_BLOCK_NUMBER: | |
return self._ipfs_link() | |
return self.ipfs_request_link | |
if event_name == MechEventName.DELIVER: | |
return self.ipfs_deliver_link | |
return None | |
class MechRequest: | |
"""A structure for a request to a mech.""" | |
request_id: Optional[int] | |
request_block: Optional[int] | |
prompt_request: Optional[str] | |
tool: Optional[str] | |
nonce: Optional[str] | |
trader_address: Optional[str] | |
def __init__(self, **kwargs: Any) -> None: | |
"""Initialize the request ignoring extra keys.""" | |
self.request_id = int(kwargs.pop(REQUEST_ID, 0)) | |
self.request_block = int(kwargs.pop(BLOCK_FIELD, 0)) | |
self.prompt_request = kwargs.pop(PROMPT_FIELD, None) | |
self.tool = kwargs.pop("tool", None) | |
self.nonce = kwargs.pop("nonce", None) | |
self.trader_address = kwargs.pop("sender", None) | |
class PredictionResponse: | |
"""A response of a prediction.""" | |
p_yes: float | |
p_no: float | |
confidence: float | |
info_utility: float | |
vote: Optional[str] | |
win_probability: Optional[float] | |
def __init__(self, **kwargs: Any) -> None: | |
"""Initialize the mech's prediction ignoring extra keys.""" | |
try: | |
self.p_yes = float(kwargs.pop("p_yes")) | |
self.p_no = float(kwargs.pop("p_no")) | |
self.confidence = float(kwargs.pop("confidence")) | |
self.info_utility = float(kwargs.pop("info_utility")) | |
self.win_probability = 0 | |
# Validate probabilities | |
probabilities = { | |
"p_yes": self.p_yes, | |
"p_no": self.p_no, | |
"confidence": self.confidence, | |
"info_utility": self.info_utility, | |
} | |
for name, prob in probabilities.items(): | |
if not 0 <= prob <= 1: | |
raise ValueError(f"{name} probability is out of bounds: {prob}") | |
if self.p_yes + self.p_no != 1: | |
raise ValueError( | |
f"Sum of p_yes and p_no is not 1: {self.p_yes} + {self.p_no}" | |
) | |
self.vote = self.get_vote() | |
self.win_probability = self.get_win_probability() | |
except KeyError as e: | |
raise KeyError(f"Missing key in PredictionResponse: {e}") | |
except ValueError as e: | |
raise ValueError(f"Invalid value in PredictionResponse: {e}") | |
def get_vote(self) -> Optional[str]: | |
"""Return the vote.""" | |
if self.p_no == self.p_yes: | |
return None | |
if self.p_no > self.p_yes: | |
return "No" | |
return "Yes" | |
def get_win_probability(self) -> Optional[float]: | |
"""Return the probability estimation for winning with vote.""" | |
return max(self.p_no, self.p_yes) | |
class MechResponse: | |
"""A structure for the response of a mech.""" | |
request_id: int | |
deliver_block: Optional[int] | |
result: Optional[PredictionResponse] | |
error: Optional[str] | |
error_message: Optional[str] | |
prompt_response: Optional[str] | |
mech_address: Optional[str] | |
def __init__(self, **kwargs: Any) -> None: | |
"""Initialize the mech's response ignoring extra keys.""" | |
self.error = kwargs.get("error", None) | |
self.request_id = int(kwargs.get(REQUEST_ID, 0)) | |
self.deliver_block = int(kwargs.get(BLOCK_FIELD, 0)) | |
self.result = kwargs.get("result", None) | |
self.prompt_response = kwargs.get(PROMPT_FIELD, None) | |
self.mech_address = kwargs.get("sender", None) | |
if self.result != "Invalid response": | |
self.error_message = kwargs.get("error_message", None) | |
try: | |
if isinstance(self.result, str): | |
kwargs = json.loads(self.result) | |
self.result = PredictionResponse(**kwargs) | |
self.error = 0 | |
except JSONDecodeError: | |
self.error_message = "Response parsing error" | |
self.error = 1 | |
except Exception as e: | |
self.error_message = str(e) | |
self.error = 1 | |
else: | |
self.error_message = "Invalid response from tool" | |
self.error = 1 | |
self.result = None | |
EVENT_TO_MECH_STRUCT = { | |
MechEventName.REQUEST: MechRequest, | |
MechEventName.DELIVER: MechResponse, | |
} | |
def transform_to_datetime(x): | |
return datetime.fromtimestamp(int(x), tz=timezone.utc) | |
def measure_execution_time(func): | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = func(*args, **kwargs) | |
end_time = time.time() | |
execution_time = end_time - start_time | |
print(f"Execution time: {execution_time:.6f} seconds") | |
return result | |
return wrapper | |
def limit_text(text: str, limit: int = 200) -> str: | |
"""Limit the given text""" | |
if len(text) > limit: | |
return f"{text[:limit]}..." | |
return text | |
def check_for_dicts(df: pd.DataFrame) -> List[str]: | |
"""Check for columns that contain dictionaries.""" | |
dict_columns = [] | |
for column in df.columns: | |
if df[column].apply(lambda x: isinstance(x, dict)).any(): | |
dict_columns.append(column) | |
return dict_columns | |
def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame: | |
"""Drop rows that contain dictionaries.""" | |
for column in dict_columns: | |
df = df[~df[column].apply(lambda x: isinstance(x, dict))] | |
return df | |
def clean(df: pd.DataFrame) -> pd.DataFrame: | |
"""Clean the dataframe.""" | |
dict_columns = check_for_dicts(df) | |
df = drop_dict_rows(df, dict_columns) | |
cleaned = df.drop_duplicates() | |
cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str") | |
return cleaned | |
def gen_event_filename(event_name: MechEventName) -> str: | |
"""Generate the filename of an event.""" | |
return f"{event_name.value.lower()}s.parquet" | |
def read_n_last_lines(filename: str, n: int = 1) -> str: | |
"""Return the `n` last lines' content of a file.""" | |
num_newlines = 0 | |
with open(filename, "rb") as f: | |
try: | |
f.seek(-2, os.SEEK_END) | |
while num_newlines < n: | |
f.seek(-2, os.SEEK_CUR) | |
if f.read(1) == b"\n": | |
num_newlines += 1 | |
except OSError: | |
f.seek(0) | |
last_line = f.readline().decode() | |
return last_line | |
def get_question(text: str) -> str: | |
"""Get the question from a text.""" | |
# Regex to find text within double quotes | |
pattern = r'"([^"]*)"' | |
# Find all occurrences | |
questions = re.findall(pattern, text) | |
# Assuming you want the first question if there are multiple | |
question = questions[0] if questions else None | |
return question | |
def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]: | |
"""Get the current answer for a question.""" | |
row = fpmms[fpmms["title"] == text] | |
if row.shape[0] == 0: | |
return None | |
return row["currentAnswer"].values[0] | |
def convert_hex_to_int(x: Union[str, float]) -> Union[int, float]: | |
"""Convert hex to int""" | |
if isinstance(x, float): | |
return np.nan | |
if isinstance(x, str): | |
if x == INVALID_ANSWER_HEX: | |
return -1 | |
return int(x, 16) | |
def wei_to_unit(wei: int) -> float: | |
"""Converts wei to currency unit.""" | |
return wei / 10**18 | |
def get_vote(p_yes, p_no) -> Optional[str]: | |
"""Return the vote.""" | |
if p_no == p_yes: | |
return None | |
if p_no > p_yes: | |
return "No" | |
return "Yes" | |
def get_win_probability(p_yes, p_no) -> Optional[float]: | |
"""Return the probability estimation for winning with vote.""" | |
return max(p_no, p_yes) | |
def get_result_values(result: str) -> Tuple: | |
if result == "Invalid response": | |
return 1, "Invalid response from tool", None | |
error_message = None | |
params = None | |
try: | |
if isinstance(result, str): | |
params = json.loads(result) | |
error_value = 0 | |
except JSONDecodeError: | |
error_message = "Response parsing error" | |
error_value = 1 | |
except Exception as e: | |
error_message = str(e) | |
error_value = 1 | |
return error_value, error_message, params | |
def get_prediction_values(params: dict) -> Tuple: | |
p_yes = float(params.pop("p_yes")) | |
p_no = float(params.pop("p_no")) | |
confidence = float(params.pop("confidence")) | |
info_utility = float(params.pop("info_utility")) | |
return p_yes, p_no, confidence, info_utility | |
def to_content(q: str) -> dict[str, Any]: | |
"""Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" | |
finalized_query = { | |
"query": q, | |
"variables": None, | |
"extensions": {"headers": None}, | |
} | |
return finalized_query | |
def read_parquet_files(tools_filename: str, trades_filename: str): | |
# Check if tools.parquet is in the same directory | |
try: | |
tools = pd.read_parquet(DATA_DIR / tools_filename) | |
# make sure creator_address is in the columns | |
assert "trader_address" in tools.columns, "trader_address column not found" | |
# lowercase and strip creator_address | |
tools["trader_address"] = tools["trader_address"].str.lower().str.strip() | |
# drop duplicates | |
tools.drop_duplicates(inplace=True) | |
print(f"{tools_filename} loaded") | |
except FileNotFoundError: | |
print("tools.parquet not found. Please run tools.py first.") | |
return | |
try: | |
fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) | |
fpmmTrades["trader_address"] = ( | |
fpmmTrades["trader_address"].str.lower().str.strip() | |
) | |
except FileNotFoundError: | |
print("fpmmsTrades.parquet not found.") | |
return | |
return tools, fpmmTrades | |