rosacastillo's picture
Added RPC env variable. New updated data
a7bccf4
raw
history blame
19.9 kB
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import os.path
import re
import time
import random
from typing import (
Optional,
List,
Dict,
Union,
Any,
)
import pandas as pd
import requests
from eth_typing import ChecksumAddress
from eth_utils import to_checksum_address
from requests.adapters import HTTPAdapter
from requests.exceptions import (
ReadTimeout as RequestsReadTimeoutError,
HTTPError as RequestsHTTPError,
)
from tqdm import tqdm
from urllib3 import Retry
from urllib3.exceptions import (
ReadTimeoutError as Urllib3ReadTimeoutError,
HTTPError as Urllib3HTTPError,
)
from web3 import Web3, HTTPProvider
from web3.exceptions import MismatchedABI
from web3.types import BlockParams
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils import (
clean,
BLOCK_FIELD,
gen_event_filename,
read_abi,
SLEEP,
reduce_window,
limit_text,
DATA_DIR,
REQUEST_ID_FIELD,
MechEvent,
MechEventName,
MechRequest,
MechResponse,
EVENT_TO_MECH_STRUCT,
REQUEST_ID,
HTTP,
HTTPS,
REQUEST_SENDER,
)
CONTRACTS_PATH = "contracts"
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879),
}
# optionally set the latest block to stop searching for the delivered events
LATEST_BLOCK: Optional[int] = None
LATEST_BLOCK_NAME: BlockParams = "latest"
BLOCK_DATA_NUMBER = "number"
BLOCKS_CHUNK_SIZE = 10_000
EVENT_ARGUMENTS = "args"
DATA = "data"
IPFS_LINKS_SERIES_NAME = "ipfs_links"
BACKOFF_FACTOR = 1
STATUS_FORCELIST = [404, 500, 502, 503, 504]
DEFAULT_FILENAME = "tools.parquet"
RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist."
ABI_ERROR = "The event signature did not match the provided ABI"
HTTP_TIMEOUT = 10
N_IPFS_RETRIES = 1
N_RPC_RETRIES = 100
RPC_POLL_INTERVAL = 0.05
IPFS_POLL_INTERVAL = 0.05
IRRELEVANT_TOOLS = [
"openai-text-davinci-002",
"openai-text-davinci-003",
"openai-gpt-3.5-turbo",
"openai-gpt-4",
"stabilityai-stable-diffusion-v1-5",
"stabilityai-stable-diffusion-xl-beta-v2-2-2",
"stabilityai-stable-diffusion-512-v2-1",
"stabilityai-stable-diffusion-768-v2-1",
"deepmind-optimization-strong",
"deepmind-optimization",
]
# this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches
# for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search,
# we also store the snapshot
SNAPSHOT_RATE = 10
NUM_WORKERS = 10
GET_CONTENTS_BATCH_SIZE = 1000
def get_events(
w3: Web3,
event: str,
mech_address: ChecksumAddress,
mech_abi_path: str,
earliest_block: int,
latest_block: int,
) -> List:
"""Get the delivered events."""
abi = read_abi(mech_abi_path)
contract_instance = w3.eth.contract(address=mech_address, abi=abi)
events = []
from_block = earliest_block
batch_size = BLOCKS_CHUNK_SIZE
with tqdm(
total=latest_block - from_block,
desc=f"Searching {event} events for mech {mech_address}",
unit="blocks",
) as pbar:
while from_block < latest_block:
events_filter = contract_instance.events[event].build_filter()
events_filter.fromBlock = from_block
events_filter.toBlock = min(from_block + batch_size, latest_block)
entries = None
retries = 0
while entries is None:
try:
entries = events_filter.deploy(w3).get_all_entries()
retries = 0
except (RequestsHTTPError, Urllib3HTTPError) as exc:
if "Request Entity Too Large" in exc.args[0]:
events_filter, batch_size = reduce_window(
contract_instance,
event,
from_block,
batch_size,
latest_block,
)
except (Urllib3ReadTimeoutError, RequestsReadTimeoutError):
events_filter, batch_size = reduce_window(
contract_instance, event, from_block, batch_size, latest_block
)
except Exception as exc:
retries += 1
if retries == N_RPC_RETRIES:
tqdm.write(
f"Skipping events for blocks {events_filter.fromBlock} - {events_filter.toBlock} "
f"as the retries have been exceeded."
)
break
sleep = SLEEP * retries
# error_message = ""
# if isinstance(exc.args[0], str):
# error_message = exc.args[0]
# elif isinstance(exc, ValueError):
# error_message = exc.args[0].get("message", "")
# if (
# (
# isinstance(exc, ValueError)
# and re.match(RE_RPC_FILTER_ERROR, error_message) is None
# )
# and not isinstance(exc, ValueError)
# and not isinstance(exc, MismatchedABI)
# ):
tqdm.write(
f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds."
)
if hasattr(exc, "message"):
tqdm.write(f"Error message: {exc.messge}\n")
time.sleep(sleep)
from_block += batch_size
pbar.update(batch_size)
if entries is None:
continue
chunk = list(entries)
events.extend(chunk)
time.sleep(RPC_POLL_INTERVAL)
return events
def parse_events(raw_events: List) -> List[MechEvent]:
"""Parse all the specified MechEvents."""
parsed_events = []
for event in raw_events:
for_block = event.get("blockNumber", 0)
args = event.get(EVENT_ARGUMENTS, {})
request_id = args.get(REQUEST_ID, 0)
data = args.get(DATA, b"")
sender = args.get(REQUEST_SENDER, "")
parsed_event = MechEvent(for_block, request_id, data, sender)
parsed_events.append(parsed_event)
return parsed_events
def create_session() -> requests.Session:
"""Create a session with a retry strategy."""
session = requests.Session()
retry_strategy = Retry(
total=N_IPFS_RETRIES + 1,
backoff_factor=BACKOFF_FACTOR,
status_forcelist=STATUS_FORCELIST,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
for protocol in (HTTP, HTTPS):
session.mount(protocol, adapter)
return session
def request(
session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT
) -> Optional[requests.Response]:
"""Perform a request with a session."""
try:
response = session.get(url, timeout=timeout)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
tqdm.write(f"HTTP error occurred: {exc}.")
except Exception as exc:
tqdm.write(f"Unexpected error occurred: {exc}.")
else:
return response
return None
def parse_ipfs_response(
session: requests.Session,
url: str,
event: MechEvent,
event_name: MechEventName,
response: requests.Response,
) -> Optional[Dict[str, str]]:
"""Parse a response from IPFS."""
try:
return response.json()
except requests.exceptions.JSONDecodeError:
# this is a workaround because the `metadata.json` file was introduced and removed multiple times
if event_name == MechEventName.REQUEST and url != event.ipfs_request_link:
url = event.ipfs_request_link
response = request(session, url)
if response is None:
tqdm.write(f"Skipping {event=}.")
return None
try:
return response.json()
except requests.exceptions.JSONDecodeError:
pass
tqdm.write(f"Failed to parse response into json for {url=}.")
return None
def parse_ipfs_tools_content(
raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName
) -> Optional[Union[MechRequest, MechResponse]]:
"""Parse tools content from IPFS."""
struct = EVENT_TO_MECH_STRUCT.get(event_name)
raw_content[REQUEST_ID] = str(event.requestId)
raw_content[BLOCK_FIELD] = str(event.for_block)
raw_content["sender"] = str(event.sender)
try:
mech_response = struct(**raw_content)
except (ValueError, TypeError, KeyError):
tqdm.write(f"Could not parse {limit_text(str(raw_content))}")
return None
if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS:
return None
return mech_response
def get_contents(
session: requests.Session, events: List[MechEvent], event_name: MechEventName
) -> pd.DataFrame:
"""Fetch the tools' responses."""
contents = []
for event in tqdm(events, desc=f"Tools' results", unit="results"):
url = event.ipfs_link(event_name)
response = request(session, url)
if response is None:
tqdm.write(f"Skipping {event=}.")
continue
raw_content = parse_ipfs_response(session, url, event, event_name, response)
if raw_content is None:
continue
mech_response = parse_ipfs_tools_content(raw_content, event, event_name)
if mech_response is None:
continue
contents.append(mech_response)
time.sleep(IPFS_POLL_INTERVAL)
return pd.DataFrame(contents)
def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
"""Transform the requests dataframe."""
return clean(contents)
def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame:
"""Transform the delivers dataframe."""
unpacked_result = pd.json_normalize(contents.result)
# # drop result column if it exists
if "result" in unpacked_result.columns:
unpacked_result.drop(columns=["result"], inplace=True)
# drop prompt column if it exists
if "prompt" in unpacked_result.columns:
unpacked_result.drop(columns=["prompt"], inplace=True)
# rename prompt column to prompt_deliver
unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True)
contents = pd.concat((contents, unpacked_result), axis=1)
if "result" in contents.columns:
contents.drop(columns=["result"], inplace=True)
if "prompt" in contents.columns:
contents.drop(columns=["prompt"], inplace=True)
return clean(contents)
def store_progress(
filename: str,
event_to_contents: Dict[str, pd.DataFrame],
tools: pd.DataFrame,
) -> None:
"""Store the given progress."""
print("storing given progress")
if filename:
DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
for event_name, content in event_to_contents.items():
event_filename = gen_event_filename(
event_name
) # Ensure this function returns a valid filename string
try:
if "result" in content.columns:
content = content.drop(
columns=["result"]
) # Avoid in-place modification
content.to_parquet(DATA_DIR / event_filename, index=False)
except Exception as e:
print(f"Failed to write {event_name} data: {e}")
# Drop result and error columns for tools DataFrame
try:
if "result" in tools.columns:
tools = tools.drop(columns=["result"])
tools.to_parquet(DATA_DIR / filename, index=False)
except Exception as e:
print(f"Failed to write tools data: {e}")
def etl(
rpcs: List[str],
mech_info: dict[str, Any],
filename: Optional[str] = None,
) -> pd.DataFrame:
"""Fetch from on-chain events, process, store and return the tools' results on
all the questions as a Dataframe."""
w3s = [Web3(HTTPProvider(r)) for r in rpcs]
session = create_session()
event_to_transformer = {
MechEventName.REQUEST: transform_request,
MechEventName.DELIVER: transform_deliver,
}
mech_to_info = {
to_checksum_address(address): (
os.path.join(CONTRACTS_PATH, filename),
earliest_block,
)
for address, (filename, earliest_block) in mech_info.items()
}
event_to_contents = {}
latest_block = LATEST_BLOCK
if latest_block is None:
latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER]
next_start_block = None
# Loop through events in event_to_transformer
for event_name, transformer in event_to_transformer.items():
# if next_start_block is None:
# next_start_block_base = get_earliest_block(event_name)
# Loop through mech addresses in mech_to_info
events = []
for address, (abi, earliest_block) in mech_to_info.items():
next_start_block = earliest_block
print(
f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}."
)
# parallelize the fetching of events
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(
next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE
):
futures.append(
executor.submit(
get_events,
random.choice(w3s),
event_name.value,
address,
abi,
i,
min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block),
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching {event_name.value} Events",
):
current_mech_events = future.result()
events.extend(current_mech_events)
print("Parsing events")
parsed = parse_events(events)
contents = []
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(0, len(parsed), GET_CONTENTS_BATCH_SIZE):
futures.append(
executor.submit(
get_contents,
session,
parsed[i : i + GET_CONTENTS_BATCH_SIZE],
event_name,
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching {event_name.value} Contents",
):
current_mech_contents = future.result()
contents.append(current_mech_contents)
contents = pd.concat(contents, ignore_index=True)
transformed = transformer(contents)
event_to_contents[event_name] = transformed.copy()
# Store progress
tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD)
print(tools.info())
store_progress(filename, event_to_contents, tools)
return tools
def update_tools_accuracy(
tools_acc: pd.DataFrame, tools_df: pd.DataFrame, inc_tools: List[str]
) -> pd.DataFrame:
"""To compute/update the latest accuracy information for the different mech tools"""
# computation of the accuracy information
tools_inc = tools_df[tools_df["tool"].isin(inc_tools)]
# filtering errors
tools_non_error = tools_inc[tools_inc["error"] != 1]
tools_non_error.loc[:, "currentAnswer"] = tools_non_error["currentAnswer"].replace(
{"no": "No", "yes": "Yes"}
)
tools_non_error = tools_non_error[
tools_non_error["currentAnswer"].isin(["Yes", "No"])
]
tools_non_error = tools_non_error[tools_non_error["vote"].isin(["Yes", "No"])]
tools_non_error["win"] = (
tools_non_error["currentAnswer"] == tools_non_error["vote"]
).astype(int)
tools_non_error.columns = tools_non_error.columns.astype(str)
print("Tools dataset after filtering")
print(tools_non_error.head())
wins = tools_non_error.groupby(["tool", "win"]).size().unstack().fillna(0)
wins["tool_accuracy"] = (wins[1] / (wins[0] + wins[1])) * 100
wins.reset_index(inplace=True)
wins["total_requests"] = wins[0] + wins[1]
wins.columns = wins.columns.astype(str)
wins = wins[["tool", "tool_accuracy", "total_requests"]]
print("Wins dataset")
print(wins.head())
no_timeline_info = False
try:
timeline = tools_non_error.groupby(["tool"])["request_time"].agg(["min", "max"])
print("timeline dataset")
print(timeline.head())
acc_info = wins.merge(timeline, how="left", on="tool")
except:
print("NO REQUEST TIME INFORMATION AVAILABLE")
no_timeline_info = True
acc_info = wins
if tools_acc is None:
print("Creating accuracy file for the first time")
return acc_info
# update the old information
print("Updating accuracy information")
tools_to_update = list(acc_info["tool"].values)
print("tools to update")
print(tools_to_update)
existing_tools = list(tools_acc["tool"].values)
for tool in tools_to_update:
if tool in existing_tools:
new_accuracy = acc_info[acc_info["tool"] == tool]["tool_accuracy"].values[0]
new_volume = acc_info[acc_info["tool"] == tool]["total_requests"].values[0]
if no_timeline_info:
new_min_timeline = None
new_max_timeline = None
else:
new_min_timeline = acc_info[acc_info["tool"] == tool]["min"].values[0]
new_max_timeline = acc_info[acc_info["tool"] == tool]["max"].values[0]
tools_acc.loc[tools_acc["tool"] == tool, "tool_accuracy"] = new_accuracy
tools_acc.loc[tools_acc["tool"] == tool, "total_requests"] = new_volume
tools_acc.loc[tools_acc["tool"] == tool, "min"] = new_min_timeline
tools_acc.loc[tools_acc["tool"] == tool, "max"] = new_max_timeline
print(tools_acc)
return tools_acc
if __name__ == "__main__":
RPCs = [
"https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a",
]
filename = DEFAULT_FILENAME
tools = etl(rpcs=RPCs, filename=filename)