rosacastillo's picture
cleaning and refactoring code
6e7e273
raw
history blame
21.9 kB
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
#
# Copyright 2023 Valory AG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ------------------------------------------------------------------------------
import os.path
import json
import time
import random
from typing import (
Optional,
List,
Dict,
Union,
Any,
)
import pandas as pd
import requests
from eth_typing import ChecksumAddress
from eth_utils import to_checksum_address
from requests.adapters import HTTPAdapter
from requests.exceptions import (
ReadTimeout as RequestsReadTimeoutError,
HTTPError as RequestsHTTPError,
)
from tqdm import tqdm
from urllib3 import Retry
from urllib3.exceptions import (
ReadTimeoutError as Urllib3ReadTimeoutError,
HTTPError as Urllib3HTTPError,
)
from web3 import Web3, HTTPProvider
from web3.exceptions import MismatchedABI
from markets import add_market_creator
from web3.types import BlockParams
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils import (
clean,
BLOCK_FIELD,
gen_event_filename,
read_abi,
SLEEP,
reduce_window,
limit_text,
DATA_DIR,
JSON_DATA_DIR,
REQUEST_ID_FIELD,
MechEvent,
MechEventName,
MechRequest,
MechResponse,
EVENT_TO_MECH_STRUCT,
REQUEST_ID,
HTTP,
HTTPS,
REQUEST_SENDER,
get_result_values,
get_vote,
get_win_probability,
get_prediction_values,
)
CONTRACTS_PATH = "contracts"
MECH_TO_INFO = {
# this block number is when the creator had its first tx ever, and after this mech's creation
"0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547),
# this block number is when this mech was created
"0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879),
}
# optionally set the latest block to stop searching for the delivered events
LATEST_BLOCK: Optional[int] = None
LATEST_BLOCK_NAME: BlockParams = "latest"
BLOCK_DATA_NUMBER = "number"
BLOCKS_CHUNK_SIZE = 10_000
EVENT_ARGUMENTS = "args"
DATA = "data"
IPFS_LINKS_SERIES_NAME = "ipfs_links"
BACKOFF_FACTOR = 1
STATUS_FORCELIST = [404, 500, 502, 503, 504]
DEFAULT_FILENAME = "tools.parquet"
RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist."
ABI_ERROR = "The event signature did not match the provided ABI"
HTTP_TIMEOUT = 10
N_IPFS_RETRIES = 1
N_RPC_RETRIES = 100
RPC_POLL_INTERVAL = 0.05
IPFS_POLL_INTERVAL = 0.05
IRRELEVANT_TOOLS = [
"openai-text-davinci-002",
"openai-text-davinci-003",
"openai-gpt-3.5-turbo",
"openai-gpt-4",
"stabilityai-stable-diffusion-v1-5",
"stabilityai-stable-diffusion-xl-beta-v2-2-2",
"stabilityai-stable-diffusion-512-v2-1",
"stabilityai-stable-diffusion-768-v2-1",
"deepmind-optimization-strong",
"deepmind-optimization",
]
# this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches
# for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search,
# we also store the snapshot
SNAPSHOT_RATE = 10
NUM_WORKERS = 10
GET_CONTENTS_BATCH_SIZE = 1000
def get_events(
w3: Web3,
event: str,
mech_address: ChecksumAddress,
mech_abi_path: str,
earliest_block: int,
latest_block: int,
) -> List:
"""Get the delivered events."""
abi = read_abi(mech_abi_path)
contract_instance = w3.eth.contract(address=mech_address, abi=abi)
events = []
from_block = earliest_block
batch_size = BLOCKS_CHUNK_SIZE
with tqdm(
total=latest_block - from_block,
desc=f"Searching {event} events for mech {mech_address}",
unit="blocks",
) as pbar:
while from_block < latest_block:
events_filter = contract_instance.events[event].build_filter()
events_filter.fromBlock = from_block
events_filter.toBlock = min(from_block + batch_size, latest_block)
entries = None
retries = 0
while entries is None:
try:
entries = events_filter.deploy(w3).get_all_entries()
retries = 0
except (RequestsHTTPError, Urllib3HTTPError) as exc:
if "Request Entity Too Large" in exc.args[0]:
events_filter, batch_size = reduce_window(
contract_instance,
event,
from_block,
batch_size,
latest_block,
)
except (Urllib3ReadTimeoutError, RequestsReadTimeoutError):
events_filter, batch_size = reduce_window(
contract_instance, event, from_block, batch_size, latest_block
)
except Exception as exc:
retries += 1
if retries == N_RPC_RETRIES:
tqdm.write(
f"Skipping events for blocks {events_filter.fromBlock} - {events_filter.toBlock} "
f"as the retries have been exceeded."
)
break
sleep = SLEEP * retries
# error_message = ""
# if isinstance(exc.args[0], str):
# error_message = exc.args[0]
# elif isinstance(exc, ValueError):
# error_message = exc.args[0].get("message", "")
# if (
# (
# isinstance(exc, ValueError)
# and re.match(RE_RPC_FILTER_ERROR, error_message) is None
# )
# and not isinstance(exc, ValueError)
# and not isinstance(exc, MismatchedABI)
# ):
tqdm.write(
f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds."
)
if hasattr(exc, "message"):
tqdm.write(f"Error message: {exc.message}\n")
time.sleep(sleep)
from_block += batch_size
pbar.update(batch_size)
if entries is None:
continue
chunk = list(entries)
events.extend(chunk)
time.sleep(RPC_POLL_INTERVAL)
return events
def parse_events(raw_events: List) -> List[MechEvent]:
# TODO use dictionary instead of List
"""Parse all the specified MechEvents."""
parsed_events = []
for event in raw_events:
for_block = event.get("blockNumber", 0)
args = event.get(EVENT_ARGUMENTS, {})
request_id = args.get(REQUEST_ID, 0)
data = args.get(DATA, b"")
sender = args.get(REQUEST_SENDER, "")
parsed_event = MechEvent(for_block, request_id, data, sender)
parsed_events.append(parsed_event)
return parsed_events
def parse_dict_events(events_dict: dict) -> List[MechEvent]:
# TODO use dictionary instead of List
"""Parse all the specified MechEvents."""
parsed_events = []
list_ids = list(events_dict.keys())
for mech_id in list_ids:
event = events_dict[mech_id]
for_block = event.get("blockNumber", 0)
args = event.get(EVENT_ARGUMENTS, {})
request_id = args.get(REQUEST_ID, 0)
data = args.get(DATA, b"")
sender = args.get(REQUEST_SENDER, "")
parsed_event = MechEvent(for_block, request_id, data, sender)
parsed_events.append(parsed_event)
return parsed_events
def create_session() -> requests.Session:
"""Create a session with a retry strategy."""
session = requests.Session()
retry_strategy = Retry(
total=N_IPFS_RETRIES + 1,
backoff_factor=BACKOFF_FACTOR,
status_forcelist=STATUS_FORCELIST,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
for protocol in (HTTP, HTTPS):
session.mount(protocol, adapter)
return session
def request(
session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT
) -> Optional[requests.Response]:
"""Perform a request with a session."""
try:
response = session.get(url, timeout=timeout)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
tqdm.write(f"HTTP error occurred: {exc}.")
except Exception as exc:
tqdm.write(f"Unexpected error occurred: {exc}.")
else:
return response
return None
def parse_ipfs_response(
session: requests.Session,
url: str,
event: MechEvent,
event_name: MechEventName,
response: requests.Response,
) -> Optional[Dict[str, str]]:
"""Parse a response from IPFS."""
try:
return response.json()
except requests.exceptions.JSONDecodeError:
# this is a workaround because the `metadata.json` file was introduced and removed multiple times
if event_name == MechEvent.REQUEST and url != event.ipfs_request_link:
url = event.ipfs_request_link
response = request(session, url)
if response is None:
tqdm.write(f"Skipping {event=}.")
return None
try:
return response.json()
except requests.exceptions.JSONDecodeError:
pass
tqdm.write(f"Failed to parse response into json for {url=}.")
return None
def parse_ipfs_tools_content(
raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName
) -> Optional[Union[MechRequest, MechResponse]]:
"""Parse tools content from IPFS."""
struct = EVENT_TO_MECH_STRUCT.get(event_name)
raw_content[REQUEST_ID] = str(event.requestId)
raw_content[BLOCK_FIELD] = str(event.for_block)
raw_content["sender"] = str(event.sender)
try:
mech_response = struct(**raw_content)
except (ValueError, TypeError, KeyError):
tqdm.write(f"Could not parse {limit_text(str(raw_content))}")
return None
if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS:
return None
return mech_response
def get_contents(
session: requests.Session, events: List[MechEvent], event_name: MechEventName
) -> pd.DataFrame:
"""Fetch the tools' responses."""
contents = []
for event in tqdm(events, desc=f"Tools' results", unit="results"):
url = event.ipfs_link(event_name)
response = request(session, url)
if response is None:
tqdm.write(f"Skipping {event=}.")
continue
raw_content = parse_ipfs_response(session, url, event, event_name, response)
if raw_content is None:
continue
mech_response = parse_ipfs_tools_content(raw_content, event, event_name)
if mech_response is None:
continue
contents.append(mech_response)
time.sleep(IPFS_POLL_INTERVAL)
return pd.DataFrame(contents)
def parse_json_events(json_events: dict, keys_to_traverse: List[int]) -> pd.DataFrame:
"""Function to parse the mech info in a json format"""
all_records = []
for key in keys_to_traverse:
try:
json_input = json_events[key]
output = {}
output["request_id"] = json_input["requestId"]
output["request_block"] = json_input["blockNumber"]
output["prompt_request"] = json_input["ipfsContents"]["prompt"]
output["tool"] = json_input["ipfsContents"]["tool"]
output["nonce"] = json_input["ipfsContents"]["nonce"]
output["trader_address"] = json_input["sender"]
output["deliver_block"] = json_input["deliver"]["blockNumber"]
error_value, error_message, prediction_params = get_result_values(
json_input["deliver"]["ipfsContents"]["result"]
)
error_message_value = json_input.get("error_message", error_message)
output["error"] = error_value
output["error_message"] = error_message_value
output["prompt_response"] = json_input["deliver"]["ipfsContents"]["prompt"]
output["mech_address"] = json_input["deliver"]["sender"]
p_yes_value, p_no_value, confidence_value, info_utility_value = (
get_prediction_values(prediction_params)
)
output["p_yes"] = p_yes_value
output["p_no"] = p_no_value
output["confidence"] = confidence_value
output["info_utility"] = info_utility_value
output["vote"] = get_vote(p_yes_value, p_no_value)
output["win_probability"] = get_win_probability(p_yes_value, p_no_value)
all_records.append(output)
except Exception as e:
print(e)
print(f"Error parsing the key ={key}. Noted as error")
output["error"] = 1
output["error_message"] = "Response parsing error"
output["p_yes"] = None
output["p_no"] = None
output["confidence"] = None
output["info_utility"] = None
output["vote"] = None
output["win_probability"] = None
all_records.append(output)
return pd.DataFrame.from_dict(all_records, orient="columns")
def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
"""Transform the requests dataframe."""
return clean(contents)
def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame:
"""Transform the delivers dataframe."""
unpacked_result = pd.json_normalize(contents.result)
# # drop result column if it exists
if "result" in unpacked_result.columns:
unpacked_result.drop(columns=["result"], inplace=True)
# drop prompt column if it exists
if "prompt" in unpacked_result.columns:
unpacked_result.drop(columns=["prompt"], inplace=True)
# rename prompt column to prompt_deliver
unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True)
contents = pd.concat((contents, unpacked_result), axis=1)
if "result" in contents.columns:
contents.drop(columns=["result"], inplace=True)
if "prompt" in contents.columns:
contents.drop(columns=["prompt"], inplace=True)
return clean(contents)
def store_progress(
filename: str,
event_to_contents: Dict[str, pd.DataFrame],
tools: pd.DataFrame,
) -> None:
"""Store the given progress."""
print("storing given progress")
if filename:
DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
for event_name, content in event_to_contents.items():
event_filename = gen_event_filename(
event_name
) # Ensure this function returns a valid filename string
try:
if "result" in content.columns:
content = content.drop(
columns=["result"]
) # Avoid in-place modification
content.to_parquet(DATA_DIR / event_filename, index=False)
except Exception as e:
print(f"Failed to write {event_name} data: {e}")
# Drop result columns for tools DataFrame
try:
if "result" in tools.columns:
tools = tools.drop(columns=["result"])
tools.to_parquet(DATA_DIR / filename, index=False)
except Exception as e:
print(f"Failed to write tools data: {e}")
def etl(
rpcs: List[str],
mech_info: dict[str, Any],
filename: Optional[str] = None,
) -> pd.DataFrame:
"""Fetch from on-chain events, process, store and return the tools' results on
all the questions as a Dataframe."""
w3s = [Web3(HTTPProvider(r)) for r in rpcs]
session = create_session()
event_to_transformer = {
MechEventName.REQUEST: transform_request,
MechEventName.DELIVER: transform_deliver,
}
mech_to_info = {
to_checksum_address(address): (
os.path.join(CONTRACTS_PATH, filename),
earliest_block,
)
for address, (filename, earliest_block) in mech_info.items()
}
event_to_contents = {}
latest_block = LATEST_BLOCK
if latest_block is None:
latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER]
next_start_block = None
# Loop through events in event_to_transformer
for event_name, transformer in event_to_transformer.items():
# if next_start_block is None:
# next_start_block_base = get_earliest_block(event_name)
# Loop through mech addresses in mech_to_info
events = []
for address, (abi, earliest_block) in mech_to_info.items():
next_start_block = earliest_block
print(
f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}."
)
# parallelize the fetching of events
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(
next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE
):
futures.append(
executor.submit(
get_events,
random.choice(w3s),
event_name.value,
address,
abi,
i,
min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block),
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching {event_name.value} Events",
):
current_mech_events = future.result()
events.extend(current_mech_events)
print("Parsing events")
parsed = parse_events(events)
contents = []
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(0, len(parsed), GET_CONTENTS_BATCH_SIZE):
futures.append(
executor.submit(
get_contents,
session,
parsed[i : i + GET_CONTENTS_BATCH_SIZE],
event_name,
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching {event_name.value} Contents",
):
current_mech_contents = future.result()
contents.append(current_mech_contents)
contents = pd.concat(contents, ignore_index=True)
transformed = transformer(contents)
event_to_contents[event_name] = transformed.copy()
# Store progress
tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD)
print(tools.info())
store_progress(filename, event_to_contents, tools)
return tools
def parse_store_json_events_parallel(
json_events: Dict[str, Any], filename: str = DEFAULT_FILENAME
):
total_nr_events = len(json_events)
ids_to_traverse = list(json_events.keys())
print(f"Parsing {total_nr_events} events")
contents = []
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = []
for i in range(0, total_nr_events, GET_CONTENTS_BATCH_SIZE):
futures.append(
executor.submit(
parse_json_events,
json_events,
ids_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE],
)
)
for future in tqdm(
as_completed(futures),
total=len(futures),
desc=f"Fetching json contents",
):
current_mech_contents = future.result()
contents.append(current_mech_contents)
tools = pd.concat(contents, ignore_index=True)
print(f"Adding market creators info. Length of the tools file = {tools}")
tools = add_market_creator(tools)
print(
f"Length of the tools dataframe after adding market creators info= {len(tools)}"
)
print(tools.info())
try:
if "result" in tools.columns:
tools = tools.drop(columns=["result"])
tools.to_parquet(DATA_DIR / filename, index=False)
except Exception as e:
print(f"Failed to write tools data: {e}")
return tools
def generate_tools_file():
"""Function to parse the json mech events and generate the parquet tools file"""
try:
with open(JSON_DATA_DIR / "tools_info.json", "r") as file:
file_contents = json.load(file)
parse_store_json_events_parallel(file_contents)
except Exception as e:
print(f"An Exception happened while parsing the json events {e}")
if __name__ == "__main__":
RPCs = [
"https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a",
]
filename = DEFAULT_FILENAME
tools = etl(rpcs=RPCs, filename=filename)