Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
# ------------------------------------------------------------------------------ | |
# | |
# Copyright 2023 Valory AG | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# ------------------------------------------------------------------------------ | |
import os.path | |
import json | |
import time | |
import random | |
from typing import ( | |
Optional, | |
List, | |
Dict, | |
Union, | |
Any, | |
) | |
import pandas as pd | |
import requests | |
from eth_typing import ChecksumAddress | |
from eth_utils import to_checksum_address | |
from requests.adapters import HTTPAdapter | |
from requests.exceptions import ( | |
ReadTimeout as RequestsReadTimeoutError, | |
HTTPError as RequestsHTTPError, | |
) | |
from tqdm import tqdm | |
from urllib3 import Retry | |
from urllib3.exceptions import ( | |
ReadTimeoutError as Urllib3ReadTimeoutError, | |
HTTPError as Urllib3HTTPError, | |
) | |
from web3 import Web3, HTTPProvider | |
from web3.exceptions import MismatchedABI | |
from markets import add_market_creator | |
from web3.types import BlockParams | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from utils import ( | |
clean, | |
BLOCK_FIELD, | |
gen_event_filename, | |
read_abi, | |
SLEEP, | |
reduce_window, | |
limit_text, | |
DATA_DIR, | |
JSON_DATA_DIR, | |
REQUEST_ID_FIELD, | |
MechEvent, | |
MechEventName, | |
MechRequest, | |
MechResponse, | |
EVENT_TO_MECH_STRUCT, | |
REQUEST_ID, | |
HTTP, | |
HTTPS, | |
REQUEST_SENDER, | |
get_result_values, | |
get_vote, | |
get_win_probability, | |
get_prediction_values, | |
) | |
CONTRACTS_PATH = "contracts" | |
MECH_TO_INFO = { | |
# this block number is when the creator had its first tx ever, and after this mech's creation | |
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547), | |
# this block number is when this mech was created | |
"0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879), | |
} | |
# optionally set the latest block to stop searching for the delivered events | |
LATEST_BLOCK: Optional[int] = None | |
LATEST_BLOCK_NAME: BlockParams = "latest" | |
BLOCK_DATA_NUMBER = "number" | |
BLOCKS_CHUNK_SIZE = 10_000 | |
EVENT_ARGUMENTS = "args" | |
DATA = "data" | |
IPFS_LINKS_SERIES_NAME = "ipfs_links" | |
BACKOFF_FACTOR = 1 | |
STATUS_FORCELIST = [404, 500, 502, 503, 504] | |
DEFAULT_FILENAME = "tools.parquet" | |
RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist." | |
ABI_ERROR = "The event signature did not match the provided ABI" | |
HTTP_TIMEOUT = 10 | |
N_IPFS_RETRIES = 1 | |
N_RPC_RETRIES = 100 | |
RPC_POLL_INTERVAL = 0.05 | |
# IPFS_POLL_INTERVAL = 0.05 # low speed | |
IPFS_POLL_INTERVAL = 0.2 # high speed | |
IRRELEVANT_TOOLS = [ | |
"openai-text-davinci-002", | |
"openai-text-davinci-003", | |
"openai-gpt-3.5-turbo", | |
"openai-gpt-4", | |
"stabilityai-stable-diffusion-v1-5", | |
"stabilityai-stable-diffusion-xl-beta-v2-2-2", | |
"stabilityai-stable-diffusion-512-v2-1", | |
"stabilityai-stable-diffusion-768-v2-1", | |
"deepmind-optimization-strong", | |
"deepmind-optimization", | |
] | |
# this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches | |
# for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search, | |
# we also store the snapshot | |
SNAPSHOT_RATE = 10 | |
NUM_WORKERS = 10 | |
GET_CONTENTS_BATCH_SIZE = 1000 | |
def get_events( | |
w3: Web3, | |
event: str, | |
mech_address: ChecksumAddress, | |
mech_abi_path: str, | |
earliest_block: int, | |
latest_block: int, | |
) -> List: | |
"""Get the delivered events.""" | |
abi = read_abi(mech_abi_path) | |
contract_instance = w3.eth.contract(address=mech_address, abi=abi) | |
events = [] | |
from_block = earliest_block | |
batch_size = BLOCKS_CHUNK_SIZE | |
with tqdm( | |
total=latest_block - from_block, | |
desc=f"Searching {event} events for mech {mech_address}", | |
unit="blocks", | |
) as pbar: | |
while from_block < latest_block: | |
events_filter = contract_instance.events[event].build_filter() | |
events_filter.fromBlock = from_block | |
events_filter.toBlock = min(from_block + batch_size, latest_block) | |
entries = None | |
retries = 0 | |
while entries is None: | |
try: | |
entries = events_filter.deploy(w3).get_all_entries() | |
retries = 0 | |
except (RequestsHTTPError, Urllib3HTTPError) as exc: | |
if "Request Entity Too Large" in exc.args[0]: | |
events_filter, batch_size = reduce_window( | |
contract_instance, | |
event, | |
from_block, | |
batch_size, | |
latest_block, | |
) | |
except (Urllib3ReadTimeoutError, RequestsReadTimeoutError): | |
events_filter, batch_size = reduce_window( | |
contract_instance, event, from_block, batch_size, latest_block | |
) | |
except Exception as exc: | |
retries += 1 | |
if retries == N_RPC_RETRIES: | |
tqdm.write( | |
f"Skipping events for blocks {events_filter.fromBlock} - {events_filter.toBlock} " | |
f"as the retries have been exceeded." | |
) | |
break | |
sleep = SLEEP * retries | |
# error_message = "" | |
# if isinstance(exc.args[0], str): | |
# error_message = exc.args[0] | |
# elif isinstance(exc, ValueError): | |
# error_message = exc.args[0].get("message", "") | |
# if ( | |
# ( | |
# isinstance(exc, ValueError) | |
# and re.match(RE_RPC_FILTER_ERROR, error_message) is None | |
# ) | |
# and not isinstance(exc, ValueError) | |
# and not isinstance(exc, MismatchedABI) | |
# ): | |
tqdm.write( | |
f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds." | |
) | |
if hasattr(exc, "message"): | |
tqdm.write(f"Error message: {exc.message}\n") | |
time.sleep(sleep) | |
from_block += batch_size | |
pbar.update(batch_size) | |
if entries is None: | |
continue | |
chunk = list(entries) | |
events.extend(chunk) | |
time.sleep(RPC_POLL_INTERVAL) | |
return events | |
def parse_events(raw_events: List) -> List[MechEvent]: | |
# TODO use dictionary instead of List | |
"""Parse all the specified MechEvents.""" | |
parsed_events = [] | |
for event in raw_events: | |
for_block = event.get("blockNumber", 0) | |
args = event.get(EVENT_ARGUMENTS, {}) | |
request_id = args.get(REQUEST_ID, 0) | |
data = args.get(DATA, b"") | |
sender = args.get(REQUEST_SENDER, "") | |
parsed_event = MechEvent(for_block, request_id, data, sender) | |
parsed_events.append(parsed_event) | |
return parsed_events | |
def parse_dict_events(events_dict: dict) -> List[MechEvent]: | |
# TODO use dictionary instead of List | |
"""Parse all the specified MechEvents.""" | |
parsed_events = [] | |
list_ids = list(events_dict.keys()) | |
for mech_id in list_ids: | |
event = events_dict[mech_id] | |
for_block = event.get("blockNumber", 0) | |
args = event.get(EVENT_ARGUMENTS, {}) | |
request_id = args.get(REQUEST_ID, 0) | |
data = args.get(DATA, b"") | |
sender = args.get(REQUEST_SENDER, "") | |
parsed_event = MechEvent(for_block, request_id, data, sender) | |
parsed_events.append(parsed_event) | |
return parsed_events | |
def create_session() -> requests.Session: | |
"""Create a session with a retry strategy.""" | |
session = requests.Session() | |
retry_strategy = Retry( | |
total=N_IPFS_RETRIES + 1, | |
backoff_factor=BACKOFF_FACTOR, | |
status_forcelist=STATUS_FORCELIST, | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
for protocol in (HTTP, HTTPS): | |
session.mount(protocol, adapter) | |
return session | |
def request( | |
session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT | |
) -> Optional[requests.Response]: | |
"""Perform a request with a session.""" | |
try: | |
response = session.get(url, timeout=timeout) | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as exc: | |
tqdm.write(f"HTTP error occurred: {exc}.") | |
except Exception as exc: | |
tqdm.write(f"Unexpected error occurred: {exc}.") | |
else: | |
return response | |
return None | |
def parse_ipfs_response( | |
session: requests.Session, | |
url: str, | |
event: MechEvent, | |
event_name: MechEventName, | |
response: requests.Response, | |
) -> Optional[Dict[str, str]]: | |
"""Parse a response from IPFS.""" | |
try: | |
return response.json() | |
except requests.exceptions.JSONDecodeError: | |
# this is a workaround because the `metadata.json` file was introduced and removed multiple times | |
if event_name == MechEvent.REQUEST and url != event.ipfs_request_link: | |
url = event.ipfs_request_link | |
response = request(session, url) | |
if response is None: | |
tqdm.write(f"Skipping {event=}.") | |
return None | |
try: | |
return response.json() | |
except requests.exceptions.JSONDecodeError: | |
pass | |
tqdm.write(f"Failed to parse response into json for {url=}.") | |
return None | |
def parse_ipfs_tools_content( | |
raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName | |
) -> Optional[Union[MechRequest, MechResponse]]: | |
"""Parse tools content from IPFS.""" | |
struct = EVENT_TO_MECH_STRUCT.get(event_name) | |
raw_content[REQUEST_ID] = str(event.requestId) | |
raw_content[BLOCK_FIELD] = str(event.for_block) | |
raw_content["sender"] = str(event.sender) | |
try: | |
mech_response = struct(**raw_content) | |
except (ValueError, TypeError, KeyError): | |
tqdm.write(f"Could not parse {limit_text(str(raw_content))}") | |
return None | |
if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS: | |
return None | |
return mech_response | |
def get_contents( | |
session: requests.Session, events: List[MechEvent], event_name: MechEventName | |
) -> pd.DataFrame: | |
"""Fetch the tools' responses.""" | |
contents = [] | |
for event in tqdm(events, desc=f"Tools' results", unit="results"): | |
url = event.ipfs_link(event_name) | |
response = request(session, url) | |
if response is None: | |
tqdm.write(f"Skipping {event=}.") | |
continue | |
raw_content = parse_ipfs_response(session, url, event, event_name, response) | |
if raw_content is None: | |
continue | |
mech_response = parse_ipfs_tools_content(raw_content, event, event_name) | |
if mech_response is None: | |
continue | |
contents.append(mech_response) | |
time.sleep(IPFS_POLL_INTERVAL) | |
return pd.DataFrame(contents) | |
def parse_json_events(json_events: dict, keys_to_traverse: List[int]) -> pd.DataFrame: | |
"""Function to parse the mech info in a json format""" | |
all_records = [] | |
for key in keys_to_traverse: | |
try: | |
json_input = json_events[key] | |
output = {} | |
output["request_id"] = json_input["requestId"] | |
output["request_block"] = json_input["blockNumber"] | |
output["prompt_request"] = json_input["ipfsContents"]["prompt"] | |
output["tool"] = json_input["ipfsContents"]["tool"] | |
output["nonce"] = json_input["ipfsContents"]["nonce"] | |
output["trader_address"] = json_input["sender"] | |
output["deliver_block"] = json_input["deliver"]["blockNumber"] | |
error_value, error_message, prediction_params = get_result_values( | |
json_input["deliver"]["ipfsContents"]["result"] | |
) | |
error_message_value = json_input.get("error_message", error_message) | |
output["error"] = error_value | |
output["error_message"] = error_message_value | |
output["prompt_response"] = json_input["deliver"]["ipfsContents"]["prompt"] | |
output["mech_address"] = json_input["deliver"]["sender"] | |
p_yes_value, p_no_value, confidence_value, info_utility_value = ( | |
get_prediction_values(prediction_params) | |
) | |
output["p_yes"] = p_yes_value | |
output["p_no"] = p_no_value | |
output["confidence"] = confidence_value | |
output["info_utility"] = info_utility_value | |
output["vote"] = get_vote(p_yes_value, p_no_value) | |
output["win_probability"] = get_win_probability(p_yes_value, p_no_value) | |
all_records.append(output) | |
except Exception as e: | |
print(e) | |
print(f"Error parsing the key ={key}. Noted as error") | |
output["error"] = 1 | |
output["error_message"] = "Response parsing error" | |
output["p_yes"] = None | |
output["p_no"] = None | |
output["confidence"] = None | |
output["info_utility"] = None | |
output["vote"] = None | |
output["win_probability"] = None | |
all_records.append(output) | |
return pd.DataFrame.from_dict(all_records, orient="columns") | |
def transform_request(contents: pd.DataFrame) -> pd.DataFrame: | |
"""Transform the requests dataframe.""" | |
return clean(contents) | |
def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame: | |
"""Transform the delivers dataframe.""" | |
unpacked_result = pd.json_normalize(contents.result) | |
# # drop result column if it exists | |
if "result" in unpacked_result.columns: | |
unpacked_result.drop(columns=["result"], inplace=True) | |
# drop prompt column if it exists | |
if "prompt" in unpacked_result.columns: | |
unpacked_result.drop(columns=["prompt"], inplace=True) | |
# rename prompt column to prompt_deliver | |
unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True) | |
contents = pd.concat((contents, unpacked_result), axis=1) | |
if "result" in contents.columns: | |
contents.drop(columns=["result"], inplace=True) | |
if "prompt" in contents.columns: | |
contents.drop(columns=["prompt"], inplace=True) | |
return clean(contents) | |
def store_progress( | |
filename: str, | |
event_to_contents: Dict[str, pd.DataFrame], | |
tools: pd.DataFrame, | |
) -> None: | |
"""Store the given progress.""" | |
print("storing given progress") | |
if filename: | |
DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists | |
for event_name, content in event_to_contents.items(): | |
event_filename = gen_event_filename( | |
event_name | |
) # Ensure this function returns a valid filename string | |
try: | |
if "result" in content.columns: | |
content = content.drop( | |
columns=["result"] | |
) # Avoid in-place modification | |
content.to_parquet(DATA_DIR / event_filename, index=False) | |
except Exception as e: | |
print(f"Failed to write {event_name} data: {e}") | |
# Drop result columns for tools DataFrame | |
try: | |
if "result" in tools.columns: | |
tools = tools.drop(columns=["result"]) | |
tools.to_parquet(DATA_DIR / filename, index=False) | |
except Exception as e: | |
print(f"Failed to write tools data: {e}") | |
def etl( | |
rpcs: List[str], | |
mech_info: dict[str, Any], | |
filename: Optional[str] = None, | |
) -> pd.DataFrame: | |
"""Fetch from on-chain events, process, store and return the tools' results on | |
all the questions as a Dataframe.""" | |
w3s = [Web3(HTTPProvider(r)) for r in rpcs] | |
session = create_session() | |
event_to_transformer = { | |
MechEventName.REQUEST: transform_request, | |
MechEventName.DELIVER: transform_deliver, | |
} | |
mech_to_info = { | |
to_checksum_address(address): ( | |
os.path.join(CONTRACTS_PATH, filename), | |
earliest_block, | |
) | |
for address, (filename, earliest_block) in mech_info.items() | |
} | |
event_to_contents = {} | |
latest_block = LATEST_BLOCK | |
if latest_block is None: | |
latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER] | |
next_start_block = None | |
# Loop through events in event_to_transformer | |
for event_name, transformer in event_to_transformer.items(): | |
# if next_start_block is None: | |
# next_start_block_base = get_earliest_block(event_name) | |
# Loop through mech addresses in mech_to_info | |
events = [] | |
for address, (abi, earliest_block) in mech_to_info.items(): | |
next_start_block = earliest_block | |
print( | |
f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}." | |
) | |
# parallelize the fetching of events | |
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: | |
futures = [] | |
for i in range( | |
next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE | |
): | |
futures.append( | |
executor.submit( | |
get_events, | |
random.choice(w3s), | |
event_name.value, | |
address, | |
abi, | |
i, | |
min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block), | |
) | |
) | |
for future in tqdm( | |
as_completed(futures), | |
total=len(futures), | |
desc=f"Fetching {event_name.value} Events", | |
): | |
current_mech_events = future.result() | |
events.extend(current_mech_events) | |
print("Parsing events") | |
parsed = parse_events(events) | |
contents = [] | |
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: | |
futures = [] | |
for i in range(0, len(parsed), GET_CONTENTS_BATCH_SIZE): | |
futures.append( | |
executor.submit( | |
get_contents, | |
session, | |
parsed[i : i + GET_CONTENTS_BATCH_SIZE], | |
event_name, | |
) | |
) | |
for future in tqdm( | |
as_completed(futures), | |
total=len(futures), | |
desc=f"Fetching {event_name.value} Contents", | |
): | |
current_mech_contents = future.result() | |
contents.append(current_mech_contents) | |
contents = pd.concat(contents, ignore_index=True) | |
transformed = transformer(contents) | |
event_to_contents[event_name] = transformed.copy() | |
# Store progress | |
tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD) | |
print(tools.info()) | |
store_progress(filename, event_to_contents, tools) | |
return tools | |
def parse_store_json_events_parallel(json_events: Dict[str, Any], output_filename: str): | |
total_nr_events = len(json_events) | |
ids_to_traverse = list(json_events.keys()) | |
print(f"Parsing {total_nr_events} events") | |
contents = [] | |
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: | |
futures = [] | |
for i in range(0, total_nr_events, GET_CONTENTS_BATCH_SIZE): | |
futures.append( | |
executor.submit( | |
parse_json_events, | |
json_events, | |
ids_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE], | |
) | |
) | |
for future in tqdm( | |
as_completed(futures), | |
total=len(futures), | |
desc=f"Fetching json contents", | |
): | |
current_mech_contents = future.result() | |
contents.append(current_mech_contents) | |
tools = pd.concat(contents, ignore_index=True) | |
print(f"Adding market creators info. Length of the tools file = {len(tools)}") | |
tools = add_market_creator(tools) | |
print( | |
f"Length of the tools dataframe after adding market creators info= {len(tools)}" | |
) | |
print(tools.info()) | |
try: | |
if "result" in tools.columns: | |
tools = tools.drop(columns=["result"]) | |
tools.to_parquet(DATA_DIR / output_filename, index=False) | |
except Exception as e: | |
print(f"Failed to write tools data: {e}") | |
return tools | |
def generate_tools_file(input_filename: str, output_filename: str): | |
"""Function to parse the json mech events and generate the parquet tools file""" | |
try: | |
with open(JSON_DATA_DIR / input_filename, "r") as file: | |
file_contents = json.load(file) | |
parse_store_json_events_parallel(file_contents, output_filename) | |
except Exception as e: | |
print(f"An Exception happened while parsing the json events {e}") | |
if __name__ == "__main__": | |
RPCs = [ | |
"https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a", | |
] | |
filename = DEFAULT_FILENAME | |
tools = etl(rpcs=RPCs, filename=filename) | |