arshy's picture
initial commit
c892f97
raw
history blame
25.8 kB
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import json
import os.path
import re
import sys
import time
import random
from dataclasses import dataclass
from enum import Enum
from io import StringIO
from typing import (
Optional,
List,
Dict,
Any,
Union,
Callable,
Tuple,
)
import pandas as pd
import requests
from json.decoder import JSONDecodeError
from eth_typing import ChecksumAddress
from eth_utils import to_checksum_address
from requests.adapters import HTTPAdapter
from requests.exceptions import (
ReadTimeout as RequestsReadTimeoutError,
HTTPError as RequestsHTTPError,
)
from tqdm import tqdm
from urllib3 import Retry
from urllib3.exceptions import (
ReadTimeoutError as Urllib3ReadTimeoutError,
HTTPError as Urllib3HTTPError,
)
from web3 import Web3, HTTPProvider
from web3.exceptions import MismatchedABI
from web3.types import BlockParams
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
CONTRACTS_PATH = "contracts"
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879),
}
# optionally set the latest block to stop searching for the delivered events
LATEST_BLOCK: Optional[int] = None
LATEST_BLOCK_NAME: BlockParams = "latest"
BLOCK_DATA_NUMBER = "number"
BLOCKS_CHUNK_SIZE = 10_000
REDUCE_FACTOR = 0.25
EVENT_ARGUMENTS = "args"
DATA = "data"
REQUEST_ID = "requestId"
REQUEST_ID_FIELD = "request_id"
REQUEST_SENDER = "sender"
PROMPT_FIELD = "prompt"
BLOCK_FIELD = "block"
CID_PREFIX = "f01701220"
HTTP = "http://"
HTTPS = HTTP[:4] + "s" + HTTP[4:]
IPFS_ADDRESS = f"{HTTPS}gateway.autonolas.tech/ipfs/"
IPFS_LINKS_SERIES_NAME = "ipfs_links"
BACKOFF_FACTOR = 1
STATUS_FORCELIST = [404, 500, 502, 503, 504]
DEFAULT_FILENAME = "tools.csv"
RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist."
ABI_ERROR = "The event signature did not match the provided ABI"
SLEEP = 0.5
HTTP_TIMEOUT = 10
N_IPFS_RETRIES = 1
N_RPC_RETRIES = 100
RPC_POLL_INTERVAL = 0.05
IPFS_POLL_INTERVAL = 0.05
FORMAT_UPDATE_BLOCK_NUMBER = 30411638
IRRELEVANT_TOOLS = [
"openai-text-davinci-002",
"openai-text-davinci-003",
"openai-gpt-3.5-turbo",
"openai-gpt-4",
"stabilityai-stable-diffusion-v1-5",
"stabilityai-stable-diffusion-xl-beta-v2-2-2",
"stabilityai-stable-diffusion-512-v2-1",
"stabilityai-stable-diffusion-768-v2-1",
"deepmind-optimization-strong",
"deepmind-optimization",
]
# this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches
# for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search, we also store the snapshot
SNAPSHOT_RATE = 10
NUM_WORKERS = 10
GET_CONTENTS_BATCH_SIZE = 1000
SCRIPTS_DIR = Path(__file__).parent
ROOT_DIR = SCRIPTS_DIR.parent
DATA_DIR = ROOT_DIR / "data"
class MechEventName(Enum):
"""The mech's event names."""
REQUEST = "Request"
DELIVER = "Deliver"
@dataclass
class MechEvent:
"""A mech's on-chain event representation."""
for_block: int
requestId: int
data: bytes
sender: str
def _ipfs_link(self) -> Optional[str]:
"""Get the ipfs link for the data."""
return f"{IPFS_ADDRESS}{CID_PREFIX}{self.data.hex()}"
@property
def ipfs_request_link(self) -> Optional[str]:
"""Get the IPFS link for the request."""
return f"{self._ipfs_link()}/metadata.json"
@property
def ipfs_deliver_link(self) -> Optional[str]:
"""Get the IPFS link for the deliver."""
if self.requestId is None:
return None
return f"{self._ipfs_link()}/{self.requestId}"
def ipfs_link(self, event_name: MechEventName) -> Optional[str]:
"""Get the ipfs link based on the event."""
if event_name == MechEventName.REQUEST:
if self.for_block < FORMAT_UPDATE_BLOCK_NUMBER:
return self._ipfs_link()
return self.ipfs_request_link
if event_name == MechEventName.DELIVER:
return self.ipfs_deliver_link
return None
@dataclass(init=False)
class MechRequest:
"""A structure for a request to a mech."""
request_id: Optional[int]
request_block: Optional[int]
prompt_request: Optional[str]
tool: Optional[str]
nonce: Optional[str]
trader_address: Optional[str]
def __init__(self, **kwargs: Any) -> None:
"""Initialize the request ignoring extra keys."""
self.request_id = int(kwargs.pop(REQUEST_ID, 0))
self.request_block = int(kwargs.pop(BLOCK_FIELD, 0))
self.prompt_request = kwargs.pop(PROMPT_FIELD, None)
self.tool = kwargs.pop("tool", None)
self.nonce = kwargs.pop("nonce", None)
self.trader_address = kwargs.pop("sender", None)
@dataclass(init=False)
class PredictionResponse:
"""A response of a prediction."""
p_yes: float
p_no: float
confidence: float
info_utility: float
vote: Optional[str]
win_probability: Optional[float]
def __init__(self, **kwargs: Any) -> None:
"""Initialize the mech's prediction ignoring extra keys."""
try:
self.p_yes = float(kwargs.pop("p_yes"))
self.p_no = float(kwargs.pop("p_no"))
self.confidence = float(kwargs.pop("confidence"))
self.info_utility = float(kwargs.pop("info_utility"))
self.win_probability = 0
# Validate probabilities
probabilities = {
"p_yes": self.p_yes,
"p_no": self.p_no,
"confidence": self.confidence,
"info_utility": self.info_utility,
}
for name, prob in probabilities.items():
if not 0 <= prob <= 1:
raise ValueError(f"{name} probability is out of bounds: {prob}")
if self.p_yes + self.p_no != 1:
raise ValueError(
f"Sum of p_yes and p_no is not 1: {self.p_yes} + {self.p_no}"
)
self.vote = self.get_vote()
self.win_probability = self.get_win_probability()
except KeyError as e:
raise KeyError(f"Missing key in PredictionResponse: {e}")
except ValueError as e:
raise ValueError(f"Invalid value in PredictionResponse: {e}")
def get_vote(self) -> Optional[str]:
"""Return the vote."""
if self.p_no == self.p_yes:
return None
if self.p_no > self.p_yes:
return "No"
return "Yes"
def get_win_probability(self) -> Optional[float]:
"""Return the probability estimation for winning with vote."""
return max(self.p_no, self.p_yes)
@dataclass(init=False)
class MechResponse:
"""A structure for the response of a mech."""
request_id: int
deliver_block: Optional[int]
result: Optional[PredictionResponse]
error: Optional[str]
error_message: Optional[str]
prompt_response: Optional[str]
mech_address: Optional[str]
def __init__(self, **kwargs: Any) -> None:
"""Initialize the mech's response ignoring extra keys."""
self.error = kwargs.get("error", None)
self.request_id = int(kwargs.get(REQUEST_ID, 0))
self.deliver_block = int(kwargs.get(BLOCK_FIELD, 0))
self.result = kwargs.get("result", None)
self.prompt_response = kwargs.get(PROMPT_FIELD, None)
self.mech_address = kwargs.get("sender", None)
if self.result != "Invalid response":
self.error_message = kwargs.get("error_message", None)
try:
if isinstance(self.result, str):
kwargs = json.loads(self.result)
self.result = PredictionResponse(**kwargs)
self.error = str(False)
except JSONDecodeError:
self.error_message = "Response parsing error"
self.error = str(True)
except Exception as e:
self.error_message = str(e)
self.error = str(True)
else:
self.error_message = "Invalid response from tool"
self.error = str(True)
self.result = None
EVENT_TO_MECH_STRUCT = {
MechEventName.REQUEST: MechRequest,
MechEventName.DELIVER: MechResponse,
}
def parse_args() -> str:
"""Parse the arguments and return the RPC."""
if len(sys.argv) != 2:
raise ValueError("Expected the RPC as a positional argument.")
return sys.argv[1]
def read_abi(abi_path: str) -> str:
"""Read and return the wxDAI contract's ABI."""
with open(abi_path) as abi_file:
return abi_file.read()
def reduce_window(contract_instance, event, from_block, batch_size, latest_block):
"""Dynamically reduce the batch size window."""
keep_fraction = 1 - REDUCE_FACTOR
events_filter = contract_instance.events[event].build_filter()
events_filter.fromBlock = from_block
batch_size = int(batch_size * keep_fraction)
events_filter.toBlock = min(from_block + batch_size, latest_block)
tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.")
time.sleep(SLEEP)
return events_filter, batch_size
def get_events(
w3: Web3,
event: str,
mech_address: ChecksumAddress,
mech_abi_path: str,
earliest_block: int,
latest_block: int,
) -> List:
"""Get the delivered events."""
abi = read_abi(mech_abi_path)
contract_instance = w3.eth.contract(address=mech_address, abi=abi)
events = []
from_block = earliest_block
batch_size = BLOCKS_CHUNK_SIZE
with tqdm(
total=latest_block - from_block,
desc=f"Searching {event} events for mech {mech_address}",
unit="blocks",
) as pbar:
while from_block < latest_block:
events_filter = contract_instance.events[event].build_filter()
events_filter.fromBlock = from_block
events_filter.toBlock = min(from_block + batch_size, latest_block)
entries = None
retries = 0
while entries is None:
try:
entries = events_filter.deploy(w3).get_all_entries()
retries = 0
except (RequestsHTTPError, Urllib3HTTPError) as exc:
if "Request Entity Too Large" in exc.args[0]:
events_filter, batch_size = reduce_window(
contract_instance,
event,
from_block,
batch_size,
latest_block,
)
except (Urllib3ReadTimeoutError, RequestsReadTimeoutError):
events_filter, batch_size = reduce_window(
contract_instance, event, from_block, batch_size, latest_block
)
except Exception as exc:
retries += 1
if retries == N_RPC_RETRIES:
tqdm.write(
f"Skipping events for blocks {events_filter.fromBlock} - {events_filter.toBlock} "
f"as the retries have been exceeded."
)
break
sleep = SLEEP * retries
if (
(
isinstance(exc, ValueError)
and re.match(
RE_RPC_FILTER_ERROR, exc.args[0].get("message", "")
)
is None
)
and not isinstance(exc, ValueError)
and not isinstance(exc, MismatchedABI)
):
tqdm.write(
f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds."
)
time.sleep(sleep)
from_block += batch_size
pbar.update(batch_size)
if entries is None:
continue
chunk = list(entries)
events.extend(chunk)
time.sleep(RPC_POLL_INTERVAL)
return events
def parse_events(raw_events: List) -> List[MechEvent]:
"""Parse all the specified MechEvents."""
parsed_events = []
for event in raw_events:
for_block = event.get("blockNumber", 0)
args = event.get(EVENT_ARGUMENTS, {})
request_id = args.get(REQUEST_ID, 0)
data = args.get(DATA, b"")
sender = args.get(REQUEST_SENDER, "")
parsed_event = MechEvent(for_block, request_id, data, sender)
parsed_events.append(parsed_event)
return parsed_events
def create_session() -> requests.Session:
"""Create a session with a retry strategy."""
session = requests.Session()
retry_strategy = Retry(
total=N_IPFS_RETRIES + 1,
backoff_factor=BACKOFF_FACTOR,
status_forcelist=STATUS_FORCELIST,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
for protocol in (HTTP, HTTPS):
session.mount(protocol, adapter)
return session
def request(
session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT
) -> Optional[requests.Response]:
"""Perform a request with a session."""
try:
response = session.get(url, timeout=timeout)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
tqdm.write(f"HTTP error occurred: {exc}.")
except Exception as exc:
tqdm.write(f"Unexpected error occurred: {exc}.")
else:
return response
return None
def limit_text(text: str, limit: int = 200) -> str:
"""Limit the given text"""
if len(text) > limit:
return f"{text[:limit]}..."
return text
def parse_ipfs_response(
session: requests.Session,
url: str,
event: MechEvent,
event_name: MechEventName,
response: requests.Response,
) -> Optional[Dict[str, str]]:
"""Parse a response from IPFS."""
try:
return response.json()
except requests.exceptions.JSONDecodeError:
# this is a workaround because the `metadata.json` file was introduced and removed multiple times
if event_name == MechEventName.REQUEST and url != event.ipfs_request_link:
url = event.ipfs_request_link
response = request(session, url)
if response is None:
tqdm.write(f"Skipping {event=}.")
return None
try:
return response.json()
except requests.exceptions.JSONDecodeError:
pass
tqdm.write(f"Failed to parse response into json for {url=}.")
return None
def parse_ipfs_tools_content(
raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName
) -> Optional[Union[MechRequest, MechResponse]]:
"""Parse tools content from IPFS."""
struct = EVENT_TO_MECH_STRUCT.get(event_name)
raw_content[REQUEST_ID] = str(event.requestId)
raw_content[BLOCK_FIELD] = str(event.for_block)
raw_content["sender"] = str(event.sender)
try:
mech_response = struct(**raw_content)
except (ValueError, TypeError, KeyError):
tqdm.write(f"Could not parse {limit_text(str(raw_content))}")
return None
if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS:
return None
return mech_response
def get_contents(
session: requests.Session, events: List[MechEvent], event_name: MechEventName
) -> pd.DataFrame:
"""Fetch the tools' responses."""
contents = []
for event in tqdm(events, desc=f"Tools' results", unit="results"):
url = event.ipfs_link(event_name)
response = request(session, url)
if response is None:
tqdm.write(f"Skipping {event=}.")
continue
raw_content = parse_ipfs_response(session, url, event, event_name, response)
if raw_content is None:
continue
mech_response = parse_ipfs_tools_content(raw_content, event, event_name)
if mech_response is None:
continue
contents.append(mech_response)
time.sleep(IPFS_POLL_INTERVAL)
return pd.DataFrame(contents)
def check_for_dicts(df: pd.DataFrame) -> List[str]:
"""Check for columns that contain dictionaries."""
dict_columns = []
for column in df.columns:
if df[column].apply(lambda x: isinstance(x, dict)).any():
dict_columns.append(column)
return dict_columns
def drop_dict_rows(df: pd.DataFrame,
dict_columns: List[str]) -> pd.DataFrame:
"""Drop rows that contain dictionaries."""
for column in dict_columns:
df = df[~df[column].apply(lambda x: isinstance(x, dict))]
return df
def clean(df: pd.DataFrame) -> pd.DataFrame:
"""Clean the dataframe."""
dict_columns = check_for_dicts(df)
df = drop_dict_rows(df, dict_columns)
cleaned = df.drop_duplicates()
cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str")
return cleaned
def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
"""Transform the requests dataframe."""
return clean(contents)
def transform_deliver(contents: pd.DataFrame, full_contents=False) -> pd.DataFrame:
"""Transform the delivers dataframe."""
unpacked_result = pd.json_normalize(contents.result)
# # drop result column if it exists
if "result" in unpacked_result.columns:
unpacked_result.drop(columns=["result"], inplace=True)
# drop prompt column if it exists
if "prompt" in unpacked_result.columns:
unpacked_result.drop(columns=["prompt"], inplace=True)
# rename prompt column to prompt_deliver
unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True)
contents = pd.concat((contents, unpacked_result), axis=1)
if "result" in contents.columns:
contents.drop(columns=["result"], inplace=True)
if "prompt" in contents.columns:
contents.drop(columns=["prompt"], inplace=True)
return clean(contents)
def gen_event_filename(event_name: MechEventName) -> str:
"""Generate the filename of an event."""
return f"{event_name.value.lower()}s.csv"
def read_n_last_lines(filename: str, n: int = 1) -> str:
"""Return the `n` last lines' content of a file."""
num_newlines = 0
with open(filename, "rb") as f:
try:
f.seek(-2, os.SEEK_END)
while num_newlines < n:
f.seek(-2, os.SEEK_CUR)
if f.read(1) == b"\n":
num_newlines += 1
except OSError:
f.seek(0)
last_line = f.readline().decode()
return last_line
def get_earliest_block(event_name: MechEventName) -> int:
"""Get the earliest block number to use when filtering for events."""
filename = gen_event_filename(event_name)
if not os.path.exists(DATA_DIR / filename):
return 0
cols = pd.read_csv(DATA_DIR / filename, index_col=0, nrows=0).columns.tolist()
last_line_buff = StringIO(read_n_last_lines(DATA_DIR/filename))
last_line_series = pd.read_csv(last_line_buff, names=cols)
block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}"
return int(last_line_series[block_field].values[0])
def store_progress(
filename: str,
event_to_contents: Dict[MechEventName, pd.DataFrame],
tools: pd.DataFrame,
) -> None:
"""Store the given progress."""
if filename:
for event_name, content in event_to_contents.items():
event_filename = gen_event_filename(event_name)
if "result" in content.columns:
content.drop(columns=["result"], inplace=True)
content.to_csv(DATA_DIR / event_filename, index=False, escapechar="\\")
# drop result and error columns
if "result" in tools.columns:
tools.drop(columns=["result"], inplace=True)
tools.to_csv(DATA_DIR / filename, index=False, escapechar="\\")
def etl(
rpcs: List[str], filename: Optional[str] = None, full_contents: bool = True
) -> pd.DataFrame:
"""Fetch from on-chain events, process, store and return the tools' results on all the questions as a Dataframe."""
w3s = [Web3(HTTPProvider(r)) for r in rpcs]
session = create_session()
event_to_transformer = {
MechEventName.REQUEST: transform_request,
MechEventName.DELIVER: transform_deliver,
}
mech_to_info = {
to_checksum_address(address): (
os.path.join(CONTRACTS_PATH, filename),
earliest_block,
)
for address, (filename, earliest_block) in MECH_TO_INFO.items()
}
event_to_contents = {}
latest_block = LATEST_BLOCK
if latest_block is None:
latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER]
next_start_block = None
# Loop through events in event_to_transformer
for event_name, transformer in event_to_transformer.items():
if next_start_block is None:
next_start_block_base = get_earliest_block(event_name)
# Loop through mech addresses in mech_to_info
events = []
for address, (abi, earliest_block) in mech_to_info.items():
if next_start_block_base == 0:
next_start_block = earliest_block
else:
next_start_block = next_start_block_base
print(
f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}."
)
# parallelize the fetching of events
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(
next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE
):
futures.append(
executor.submit(
get_events,
random.choice(w3s),
event_name.value,
address,
abi,
i,
min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block),
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching {event_name.value} Events",
):
current_mech_events = future.result()
events.extend(current_mech_events)
parsed = parse_events(events)
contents = []
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(0, len(parsed), GET_CONTENTS_BATCH_SIZE):
futures.append(
executor.submit(
get_contents,
session,
parsed[i : i + GET_CONTENTS_BATCH_SIZE],
event_name,
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching {event_name.value} Contents",
):
current_mech_contents = future.result()
contents.append(current_mech_contents)
contents = pd.concat(contents, ignore_index=True)
full_contents = True
if event_name == MechEventName.REQUEST:
transformed = transformer(contents)
elif event_name == MechEventName.DELIVER:
transformed = transformer(contents, full_contents=full_contents)
events_filename = gen_event_filename(event_name)
if os.path.exists(DATA_DIR / events_filename):
old = pd.read_csv(DATA_DIR / events_filename)
# Reset index to avoid index conflicts
old.reset_index(drop=True, inplace=True)
transformed.reset_index(drop=True, inplace=True)
# Concatenate DataFrames
transformed = pd.concat([old, transformed], ignore_index=True)
# Drop duplicates if necessary
transformed.drop_duplicates(subset=REQUEST_ID_FIELD, inplace=True)
event_to_contents[event_name] = transformed.copy()
# Store progress
tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD)
store_progress(filename, event_to_contents, tools)
return tools
if __name__ == "__main__":
RPCs = [
"https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a",
]
tools = etl(rpcs=RPCs, filename=DEFAULT_FILENAME, full_contents=True)