# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------ # # Copyright 2023 Valory AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ------------------------------------------------------------------------------ import json from typing import ( Optional, List, Dict, Union, Any, ) import pandas as pd import requests from datetime import datetime from gnosis_timestamps import transform_timestamp_to_datetime from requests.adapters import HTTPAdapter from tqdm import tqdm from urllib3 import Retry from markets import add_market_creator from concurrent.futures import ThreadPoolExecutor, as_completed from web3_utils import ( N_IPFS_RETRIES, ) from utils import ( clean, BLOCK_FIELD, limit_text, DATA_DIR, JSON_DATA_DIR, MechEvent, MechEventName, MechRequest, MechResponse, EVENT_TO_MECH_STRUCT, REQUEST_ID, HTTP, HTTPS, get_result_values, get_vote, get_win_probability, get_prediction_values, ) CONTRACTS_PATH = "contracts" MECH_TO_INFO = { # this block number is when the creator had its first tx ever, and after this mech's creation "0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547), # this block number is when this mech was created "0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879), } # optionally set the latest block to stop searching for the delivered events EVENT_ARGUMENTS = "args" DATA = "data" IPFS_LINKS_SERIES_NAME = "ipfs_links" BACKOFF_FACTOR = 1 STATUS_FORCELIST = [404, 500, 502, 503, 504] DEFAULT_FILENAME = "tools.parquet" ABI_ERROR = "The event signature did not match the provided ABI" # HTTP_TIMEOUT = 10 # Increasing when ipfs is slow HTTP_TIMEOUT = 15 IRRELEVANT_TOOLS = [ "openai-text-davinci-002", "openai-text-davinci-003", "openai-gpt-3.5-turbo", "openai-gpt-4", "stabilityai-stable-diffusion-v1-5", "stabilityai-stable-diffusion-xl-beta-v2-2-2", "stabilityai-stable-diffusion-512-v2-1", "stabilityai-stable-diffusion-768-v2-1", "deepmind-optimization-strong", "deepmind-optimization", ] # this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches # for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search, # we also store the snapshot SNAPSHOT_RATE = 10 NUM_WORKERS = 10 GET_CONTENTS_BATCH_SIZE = 1000 class TimestampedRetry(Retry): def increment(self, *args, **kwargs): print(f"Retry attempt at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") return super().increment(*args, **kwargs) def create_session() -> requests.Session: """Create a session with a retry strategy.""" session = requests.Session() retry_strategy = TimestampedRetry( total=N_IPFS_RETRIES, backoff_factor=BACKOFF_FACTOR, status_forcelist=STATUS_FORCELIST, ) adapter = HTTPAdapter(max_retries=retry_strategy) for protocol in (HTTP, HTTPS): session.mount(protocol, adapter) return session def request( session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT ) -> Optional[requests.Response]: """Perform a request with a session.""" try: response = session.get(url, timeout=timeout) response.raise_for_status() except requests.exceptions.HTTPError as exc: tqdm.write(f"HTTP error occurred: {exc}.") except Exception as exc: tqdm.write(f"Unexpected error occurred: {exc}.") else: return response return None def parse_ipfs_response( session: requests.Session, url: str, event: MechEvent, event_name: MechEventName, response: requests.Response, ) -> Optional[Dict[str, str]]: """Parse a response from IPFS.""" try: return response.json() except requests.exceptions.JSONDecodeError: # this is a workaround because the `metadata.json` file was introduced and removed multiple times if event_name == MechEvent.REQUEST and url != event.ipfs_request_link: url = event.ipfs_request_link response = request(session, url) if response is None: tqdm.write(f"Skipping {event=}.") return None try: return response.json() except requests.exceptions.JSONDecodeError: pass tqdm.write(f"Failed to parse response into json for {url=}.") return None def parse_ipfs_tools_content( raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName ) -> Optional[Union[MechRequest, MechResponse]]: """Parse tools content from IPFS.""" struct = EVENT_TO_MECH_STRUCT.get(event_name) raw_content[REQUEST_ID] = str(event.requestId) raw_content[BLOCK_FIELD] = str(event.for_block) raw_content["sender"] = str(event.sender) try: mech_response = struct(**raw_content) except (ValueError, TypeError, KeyError): tqdm.write(f"Could not parse {limit_text(str(raw_content))}") return None if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS: return None return mech_response def parse_json_events(json_events: dict, keys_to_traverse: List[int]) -> pd.DataFrame: """Function to parse the mech info in a json format""" all_records = [] for key in keys_to_traverse: try: json_input = json_events[key] output = {} output["request_id"] = json_input["requestId"] output["request_block"] = json_input["blockNumber"] output["request_time"] = transform_timestamp_to_datetime( int(json_input["blockTimestamp"]) ) output["tx_hash"] = json_input["transactionHash"] output["prompt_request"] = json_input["ipfsContents"]["prompt"] output["tool"] = json_input["ipfsContents"]["tool"] output["nonce"] = json_input["ipfsContents"]["nonce"] output["trader_address"] = json_input["sender"] output["deliver_block"] = json_input["deliver"]["blockNumber"] error_value, error_message, prediction_params = get_result_values( json_input["deliver"]["ipfsContents"]["result"] ) error_message_value = json_input.get("error_message", error_message) output["error"] = error_value output["error_message"] = error_message_value output["prompt_response"] = json_input["deliver"]["ipfsContents"]["prompt"] output["mech_address"] = json_input["deliver"]["sender"] p_yes_value, p_no_value, confidence_value, info_utility_value = ( get_prediction_values(prediction_params) ) output["p_yes"] = p_yes_value output["p_no"] = p_no_value output["confidence"] = confidence_value output["info_utility"] = info_utility_value output["vote"] = get_vote(p_yes_value, p_no_value) output["win_probability"] = get_win_probability(p_yes_value, p_no_value) all_records.append(output) except Exception as e: print(e) print(f"Error parsing the key ={key}. Noted as error") output["error"] = 1 output["error_message"] = "Response parsing error" output["p_yes"] = None output["p_no"] = None output["confidence"] = None output["info_utility"] = None output["vote"] = None output["win_probability"] = None all_records.append(output) return pd.DataFrame.from_dict(all_records, orient="columns") def transform_request(contents: pd.DataFrame) -> pd.DataFrame: """Transform the requests dataframe.""" return clean(contents) def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame: """Transform the delivers dataframe.""" unpacked_result = pd.json_normalize(contents.result) # # drop result column if it exists if "result" in unpacked_result.columns: unpacked_result.drop(columns=["result"], inplace=True) # drop prompt column if it exists if "prompt" in unpacked_result.columns: unpacked_result.drop(columns=["prompt"], inplace=True) # rename prompt column to prompt_deliver unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True) contents = pd.concat((contents, unpacked_result), axis=1) if "result" in contents.columns: contents.drop(columns=["result"], inplace=True) if "prompt" in contents.columns: contents.drop(columns=["prompt"], inplace=True) return clean(contents) def parse_store_json_events_parallel(json_events: Dict[str, Any], output_filename: str): total_nr_events = len(json_events) ids_to_traverse = list(json_events.keys()) print(f"Parsing {total_nr_events} events") contents = [] with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range(0, total_nr_events, GET_CONTENTS_BATCH_SIZE): futures.append( executor.submit( parse_json_events, json_events, ids_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE], ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching json contents", ): current_mech_contents = future.result() contents.append(current_mech_contents) tools = pd.concat(contents, ignore_index=True) print(f"Adding market creators info. Length of the tools file = {len(tools)}") tools = add_market_creator(tools) print( f"Length of the tools dataframe after adding market creators info= {len(tools)}" ) print(tools.info()) try: if "result" in tools.columns: tools = tools.drop(columns=["result"]) tools.to_parquet(DATA_DIR / output_filename, index=False) except Exception as e: print(f"Failed to write tools data: {e}") return tools def generate_tools_file(input_filename: str, output_filename: str): """Function to parse the json mech events and generate the parquet tools file""" try: with open(JSON_DATA_DIR / input_filename, "r") as file: file_contents = json.load(file) parse_store_json_events_parallel(file_contents, output_filename) except Exception as e: print(f"An Exception happened while parsing the json events {e}") if __name__ == "__main__": generate_tools_file()