rosacastillo commited on
Commit
03219e6
·
1 Parent(s): eac9f54

updating scripts for data collection with new tools pipeline

Browse files
scripts/get_mech_info.py CHANGED
@@ -1,14 +1,22 @@
1
- from dataclasses import dataclass
2
  from string import Template
3
  from typing import Any
4
  from datetime import datetime, timedelta, UTC
5
- from utils import SUBGRAPH_API_KEY
6
  import requests
 
 
 
 
 
 
 
 
 
7
 
8
  OLD_MECH_SUBGRAPH_URL = (
9
  "https://api.thegraph.com/subgraphs/name/stakewise/ethereum-gnosis"
10
  )
11
- # MECH_SUBGRAPH_URL = "https://api.studio.thegraph.com/query/57238/mech/0.0.2"
12
  NETWORK_SUBGRAPH_URL = Template(
13
  """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/FxV6YUix58SpYmLBwc9gEHkwjfkqwe1X5FJQjn8nKPyA"""
14
  )
@@ -17,8 +25,11 @@ SUBGRAPH_HEADERS = {
17
  "Accept": "application/json, multipart/mixed",
18
  "Content-Type": "application/json",
19
  }
 
20
  QUERY_BATCH_SIZE = 1000
21
  DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60)
 
 
22
  BLOCK_NUMBER = Template(
23
  """
24
  {
@@ -38,6 +49,40 @@ BLOCK_NUMBER = Template(
38
  """
39
  )
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict:
43
  """Get a block number by its timestamp margins."""
@@ -55,9 +100,9 @@ def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict:
55
  json={"query": query},
56
  timeout=300,
57
  )
58
-
59
  result_json = response.json()
60
- print(f"Response of the query={result_json}")
61
  blocks = result_json.get("data", {}).get("blocks", "")
62
  if len(blocks) == 0:
63
  raise ValueError(f"The query {query} did not return any results")
@@ -100,9 +145,19 @@ def get_mech_info_2024() -> dict[str, Any]:
100
  return MECH_TO_INFO
101
 
102
 
103
- def get_mech_info_last_60_days() -> dict[str, Any]:
104
- """Query the subgraph to get the last 60 days of information from mech."""
 
 
 
 
 
 
 
 
105
 
 
 
106
  timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp())
107
  margin = timedelta(seconds=5)
108
  timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp())
@@ -117,6 +172,12 @@ def get_mech_info_last_60_days() -> dict[str, Any]:
117
 
118
  if last_month_block_number == "":
119
  raise ValueError("Could not find a valid block number for last month data")
 
 
 
 
 
 
120
 
121
  MECH_TO_INFO = {
122
  # this block number is when the creator had its first tx ever, and after this mech's creation
@@ -130,9 +191,73 @@ def get_mech_info_last_60_days() -> dict[str, Any]:
130
  last_month_block_number,
131
  ),
132
  }
 
133
  return MECH_TO_INFO
134
 
135
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  if __name__ == "__main__":
137
- result = get_mech_info_last_60_days()
138
- print(result)
 
 
 
 
1
  from string import Template
2
  from typing import Any
3
  from datetime import datetime, timedelta, UTC
4
+ from utils import SUBGRAPH_API_KEY, measure_execution_time, DATA_DIR
5
  import requests
6
+ from mech_request_utils import (
7
+ collect_all_mech_delivers,
8
+ collect_all_mech_requests,
9
+ clean_mech_delivers,
10
+ fix_duplicate_requestIds,
11
+ merge_requests_delivers,
12
+ get_ipfs_data,
13
+ only_delivers_loop,
14
+ )
15
 
16
  OLD_MECH_SUBGRAPH_URL = (
17
  "https://api.thegraph.com/subgraphs/name/stakewise/ethereum-gnosis"
18
  )
19
+
20
  NETWORK_SUBGRAPH_URL = Template(
21
  """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/FxV6YUix58SpYmLBwc9gEHkwjfkqwe1X5FJQjn8nKPyA"""
22
  )
 
25
  "Accept": "application/json, multipart/mixed",
26
  "Content-Type": "application/json",
27
  }
28
+
29
  QUERY_BATCH_SIZE = 1000
30
  DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60)
31
+ DATETIME_10_DAYS_AGO = datetime.now(UTC) - timedelta(days=10)
32
+ DATETIME_10_HOURS_AGO = datetime.now(UTC) - timedelta(hours=10)
33
  BLOCK_NUMBER = Template(
34
  """
35
  {
 
49
  """
50
  )
51
 
52
+ LATEST_BLOCK_QUERY = """
53
+ {
54
+ blocks(
55
+ first: 1,
56
+ orderBy: timestamp,
57
+ orderDirection: desc,
58
+ ){
59
+ id,
60
+ number,
61
+ }
62
+ }
63
+ """
64
+
65
+
66
+ def fetch_last_block_number() -> dict:
67
+ # print(f"Sending query for the subgraph = {query}")
68
+ network_subgraph_url = NETWORK_SUBGRAPH_URL.substitute(
69
+ subgraph_api_key=SUBGRAPH_API_KEY
70
+ )
71
+ query = LATEST_BLOCK_QUERY
72
+ response = requests.post(
73
+ network_subgraph_url,
74
+ headers=SUBGRAPH_HEADERS,
75
+ json={"query": query},
76
+ timeout=300,
77
+ )
78
+
79
+ result_json = response.json()
80
+ print(f"Response of the query={result_json}")
81
+ blocks = result_json.get("data", {}).get("blocks", "")
82
+ if len(blocks) == 0:
83
+ raise ValueError(f"The query {query} did not return any results")
84
+ return blocks[0]
85
+
86
 
87
  def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict:
88
  """Get a block number by its timestamp margins."""
 
100
  json={"query": query},
101
  timeout=300,
102
  )
103
+ # print(f"block query: {query}")
104
  result_json = response.json()
105
+ # print(f"Response of the query={result_json}")
106
  blocks = result_json.get("data", {}).get("blocks", "")
107
  if len(blocks) == 0:
108
  raise ValueError(f"The query {query} did not return any results")
 
145
  return MECH_TO_INFO
146
 
147
 
148
+ def get_last_block_number() -> int:
149
+ last_block_number = fetch_last_block_number()
150
+ # expecting only one block
151
+ last_block_number = last_block_number.get("number", "")
152
+ if last_block_number.isdigit():
153
+ last_block_number = int(last_block_number)
154
+
155
+ if last_block_number == "":
156
+ raise ValueError("Could not find a valid block number for last month data")
157
+ return last_block_number
158
 
159
+
160
+ def get_last_60_days_block_number() -> int:
161
  timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp())
162
  margin = timedelta(seconds=5)
163
  timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp())
 
172
 
173
  if last_month_block_number == "":
174
  raise ValueError("Could not find a valid block number for last month data")
175
+ return last_month_block_number
176
+
177
+
178
+ def get_mech_info_last_60_days() -> dict[str, Any]:
179
+ """Query the subgraph to get the last 60 days of information from mech."""
180
+ last_month_block_number = get_last_60_days_block_number()
181
 
182
  MECH_TO_INFO = {
183
  # this block number is when the creator had its first tx ever, and after this mech's creation
 
191
  last_month_block_number,
192
  ),
193
  }
194
+ print(f"last 60 days block number {last_month_block_number}")
195
  return MECH_TO_INFO
196
 
197
 
198
+ def get_mech_info_last_10_days() -> dict[str, Any]:
199
+ """Query the subgraph to get the last 10 days of information from mech."""
200
+
201
+ timestamp_10_days_ago = int((DATETIME_10_DAYS_AGO).timestamp())
202
+ margin = timedelta(seconds=5)
203
+ timestamp_10_days_ago_plus_margin = int((DATETIME_10_DAYS_AGO + margin).timestamp())
204
+
205
+ last_month_block_number = fetch_block_number(
206
+ timestamp_10_days_ago, timestamp_10_days_ago_plus_margin
207
+ )
208
+ # expecting only one block
209
+ last_month_block_number = last_month_block_number.get("number", "")
210
+ if last_month_block_number.isdigit():
211
+ last_month_block_number = int(last_month_block_number)
212
+
213
+ if last_month_block_number == "":
214
+ raise ValueError("Could not find a valid block number for last month data")
215
+
216
+ MECH_TO_INFO = {
217
+ # this block number is when the creator had its first tx ever, and after this mech's creation
218
+ "0xff82123dfb52ab75c417195c5fdb87630145ae81": (
219
+ "old_mech_abi.json",
220
+ last_month_block_number,
221
+ ),
222
+ # this block number is when this mech was created
223
+ "0x77af31de935740567cf4ff1986d04b2c964a786a": (
224
+ "new_mech_abi.json",
225
+ last_month_block_number,
226
+ ),
227
+ }
228
+ print(f"last 10 days block number {last_month_block_number}")
229
+ return MECH_TO_INFO
230
+
231
+
232
+ @measure_execution_time
233
+ def get_mech_events_last_60_days():
234
+ earliest_block_number = get_last_60_days_block_number()
235
+ last_block_number = get_last_block_number()
236
+ # mech requests
237
+ requests_dict, duplicatedReqId = collect_all_mech_requests(
238
+ from_block=earliest_block_number, to_block=last_block_number
239
+ )
240
+
241
+ # mech delivers
242
+ delivers_dict, duplicatedIds = collect_all_mech_delivers(
243
+ from_block=earliest_block_number, to_block=last_block_number
244
+ )
245
+
246
+ # clean delivers
247
+ clean_mech_delivers()
248
+
249
+ # solve duplicated requestIds
250
+ block_map = fix_duplicate_requestIds()
251
+
252
+ # merge the two files into one source
253
+ not_found = merge_requests_delivers()
254
+
255
+ # Add ipfs contents
256
+ get_ipfs_data()
257
+
258
+
259
  if __name__ == "__main__":
260
+ get_mech_events_last_60_days()
261
+
262
+ # result = get_mech_info_last_60_days()
263
+ # print(result)
scripts/mech_request_utils.py ADDED
@@ -0,0 +1,560 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # -*- coding: utf-8 -*-
2
+ # ------------------------------------------------------------------------------
3
+ #
4
+ # Copyright 2024 Valory AG
5
+ #
6
+ # Licensed under the Apache License, Version 2.0 (the "License");
7
+ # you may not use this file except in compliance with the License.
8
+ # You may obtain a copy of the License at
9
+ #
10
+ # http://www.apache.org/licenses/LICENSE-2.0
11
+ #
12
+ # Unless required by applicable law or agreed to in writing, software
13
+ # distributed under the License is distributed on an "AS IS" BASIS,
14
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
+ # See the License for the specific language governing permissions and
16
+ # limitations under the License.
17
+ #
18
+ # ------------------------------------------------------------------------------
19
+
20
+ """Script for retrieving mech requests and their delivers."""
21
+ import os
22
+ import json
23
+ import time
24
+ import pickle
25
+ from collections import defaultdict
26
+ from typing import Any, Dict, List, Tuple
27
+ from pathlib import Path
28
+ import requests
29
+ from gql import Client, gql
30
+ from gql.transport.requests import RequestsHTTPTransport
31
+ from tools import (
32
+ IPFS_POLL_INTERVAL,
33
+ GET_CONTENTS_BATCH_SIZE,
34
+ IRRELEVANT_TOOLS,
35
+ create_session,
36
+ request,
37
+ )
38
+ from tqdm import tqdm
39
+ from markets import PEARL_CREATOR, CREATOR
40
+ from concurrent.futures import ThreadPoolExecutor, as_completed
41
+
42
+ NUM_WORKERS = 10
43
+ BLOCKS_CHUNK_SIZE = 10000
44
+ TEXT_ALIGNMENT = 30
45
+ MINIMUM_WRITE_FILE_DELAY_SECONDS = 20
46
+ MECH_FROM_BLOCK_RANGE = 50000
47
+ SCRIPTS_DIR = Path(__file__).parent
48
+ ROOT_DIR = SCRIPTS_DIR.parent
49
+ JSON_DATA_DIR = ROOT_DIR / "json_data"
50
+ DATA_DIR = ROOT_DIR / "data"
51
+ IPFS_ADDRESS = "https://gateway.autonolas.tech/ipfs/"
52
+ THEGRAPH_ENDPOINT = "https://api.studio.thegraph.com/query/57238/mech/0.0.2"
53
+ last_write_time = 0.0
54
+
55
+ REQUESTS_QUERY_FILTER = """
56
+ query requests_query($sender_not_in: [Bytes!], $id_gt: Bytes, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) {
57
+ requests(where: {sender_not_in: $sender_not_in, id_gt: $id_gt, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: id, first: 1000) {
58
+ blockNumber
59
+ blockTimestamp
60
+ id
61
+ ipfsHash
62
+ requestId
63
+ sender
64
+ transactionHash
65
+ }
66
+ }
67
+ """
68
+
69
+ DELIVERS_QUERY_NO_FILTER = """
70
+ query delivers_query($id_gt: Bytes, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) {
71
+ delivers(where: {id_gt: $id_gt, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: id, first: 1000) {
72
+ blockNumber
73
+ blockTimestamp
74
+ id
75
+ ipfsHash
76
+ requestId
77
+ sender
78
+ transactionHash
79
+ }
80
+ }
81
+
82
+ """
83
+ DELIVERS_QUERY = """
84
+ query delivers_query($requestId: BigInt, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) {
85
+ delivers(where: {requestId: $requestId, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: blockNumber, first: 1000) {
86
+ blockNumber
87
+ blockTimestamp
88
+ id
89
+ ipfsHash
90
+ requestId
91
+ sender
92
+ transactionHash
93
+ }
94
+ }
95
+ """
96
+
97
+ MISSING_DELIVERS_QUERY = """
98
+ query delivers_query($requestId: BigInt, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) {
99
+ delivers(where: {requestId: $requestId, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: blockNumber, first: 1000) {
100
+ blockNumber
101
+ blockTimestamp
102
+ id
103
+ ipfsHash
104
+ requestId
105
+ sender
106
+ transactionHash
107
+ }
108
+ }
109
+ """
110
+
111
+
112
+ def collect_all_mech_requests(from_block: int, to_block: int) -> Tuple:
113
+
114
+ print(f"Fetching all mech requests from {from_block} to {to_block}")
115
+ mech_requests = {}
116
+ duplicated_reqIds = []
117
+ transport = RequestsHTTPTransport(url=THEGRAPH_ENDPOINT)
118
+ client = Client(transport=transport, fetch_schema_from_transport=True)
119
+
120
+ id_gt = "0x00"
121
+ while True:
122
+ variables = {
123
+ "sender_not_in": [CREATOR, PEARL_CREATOR],
124
+ "id_gt": id_gt,
125
+ "blockNumber_gte": str(from_block), # str
126
+ "blockNumber_lte": str(to_block), # str
127
+ }
128
+ try:
129
+ response = client.execute(
130
+ gql(REQUESTS_QUERY_FILTER), variable_values=variables
131
+ )
132
+
133
+ items = response.get("requests", [])
134
+
135
+ if not items:
136
+ break
137
+
138
+ for mech_request in items:
139
+ if mech_request["id"] not in mech_requests:
140
+ mech_requests[mech_request["id"]] = mech_request
141
+ else:
142
+ duplicated_reqIds.append(mech_request["id"])
143
+ except Exception as e:
144
+ print(f"Error while getting the response: {e}")
145
+
146
+ id_gt = items[-1]["id"]
147
+ time.sleep(IPFS_POLL_INTERVAL)
148
+ print(f"New execution for id_gt = {id_gt}")
149
+ if len(duplicated_reqIds) > 0:
150
+ print(f"Number of duplicated req Ids = {len(duplicated_reqIds)}")
151
+ save_json_file(mech_requests, "mech_requests.json")
152
+
153
+ print(f"Number of requests = {len(mech_requests)}")
154
+ print(f"Number of duplicated req Ids = {len(duplicated_reqIds)}")
155
+ save_json_file(mech_requests, "mech_requests.json")
156
+ return mech_requests, duplicated_reqIds
157
+
158
+
159
+ def collect_all_mech_delivers(from_block: int, to_block: int) -> Tuple:
160
+
161
+ print(f"Fetching all mech delivers from {from_block} to {to_block}")
162
+ mech_delivers = {}
163
+ duplicated_requestIds = []
164
+ transport = RequestsHTTPTransport(url=THEGRAPH_ENDPOINT)
165
+ client = Client(transport=transport, fetch_schema_from_transport=True)
166
+ to_block = (
167
+ to_block + MECH_FROM_BLOCK_RANGE
168
+ ) # there is a delay between deliver and request
169
+ id_gt = ""
170
+ while True:
171
+ variables = {
172
+ "id_gt": id_gt,
173
+ "blockNumber_gte": str(from_block), # str
174
+ "blockNumber_lte": str(to_block), # str
175
+ }
176
+ try:
177
+ response = client.execute(
178
+ gql(DELIVERS_QUERY_NO_FILTER), variable_values=variables
179
+ )
180
+ items = response.get("delivers", [])
181
+
182
+ if not items:
183
+ break
184
+
185
+ for mech_deliver in items:
186
+ if mech_deliver["requestId"] not in mech_delivers:
187
+ mech_delivers[mech_deliver["requestId"]] = [mech_deliver]
188
+ else:
189
+ duplicated_requestIds.append(mech_deliver["requestId"])
190
+ # we will handle the duplicated later
191
+ mech_delivers[mech_deliver["requestId"]].append(mech_deliver)
192
+ except Exception as e:
193
+ print(f"Error while getting the response: {e}")
194
+ return
195
+
196
+ id_gt = items[-1]["id"]
197
+ time.sleep(IPFS_POLL_INTERVAL)
198
+ print(f"New execution for id_gt = {id_gt}")
199
+ if len(duplicated_requestIds) > 0:
200
+ print(f"Number of duplicated request id = {len(duplicated_requestIds)}")
201
+ save_json_file(mech_delivers, "mech_delivers.json")
202
+ print(f"Number of delivers = {len(mech_delivers)}")
203
+ print(f"Number of duplicated request id = {len(duplicated_requestIds)}")
204
+ save_json_file(mech_delivers, "mech_delivers.json")
205
+ return mech_delivers, duplicated_requestIds
206
+
207
+
208
+ def collect_missing_delivers(request_id: int, block_number: int) -> Dict[str, Any]:
209
+ to_block = (
210
+ block_number + MECH_FROM_BLOCK_RANGE
211
+ ) # there is a delay between deliver and request
212
+ print(f"Fetching all missing delivers from {block_number} to {to_block}")
213
+ mech_delivers = {}
214
+ transport = RequestsHTTPTransport(url=THEGRAPH_ENDPOINT)
215
+ client = Client(transport=transport, fetch_schema_from_transport=True)
216
+
217
+ variables = {
218
+ "requestId": request_id,
219
+ "blockNumber_gte": str(block_number), # str
220
+ "blockNumber_lte": str(to_block), # str
221
+ }
222
+ try:
223
+ response = client.execute(
224
+ gql(MISSING_DELIVERS_QUERY), variable_values=variables
225
+ )
226
+ items = response.get("delivers", [])
227
+ # If the user sends requests with the same values (tool, prompt, nonce) it
228
+ # will generate the same requestId. Therefore, multiple items can be retrieved
229
+ # at this point. We assume the most likely deliver to this request is the
230
+ # one with the closest blockNumber among all delivers with the same requestId.
231
+ if items:
232
+ return items[0]
233
+ except Exception as e:
234
+ print(f"Error while getting the response: {e}")
235
+
236
+ return mech_delivers
237
+
238
+
239
+ def populate_requests_ipfs_contents(
240
+ session: requests.Session, mech_requests: Dict[str, Any], keys_to_traverse: list
241
+ ) -> dict:
242
+ updated_dict = {}
243
+ wrong_response_count = 0
244
+ for k in tqdm(
245
+ keys_to_traverse,
246
+ desc="Fetching IPFS contents for requests",
247
+ position=1,
248
+ unit="results",
249
+ ):
250
+ mech_request = mech_requests[k]
251
+
252
+ if "ipfsContents" not in mech_request:
253
+ ipfs_hash = mech_request["ipfsHash"]
254
+ url = f"{IPFS_ADDRESS}{ipfs_hash}/metadata.json"
255
+ response = request(session, url)
256
+ if response is None:
257
+ tqdm.write(f"Skipping {mech_request=}. because response was None")
258
+ wrong_response_count += 1
259
+ continue
260
+ try:
261
+ contents = response.json()
262
+ if contents["tool"] in IRRELEVANT_TOOLS:
263
+ continue
264
+ mech_request["ipfsContents"] = contents
265
+ except requests.exceptions.JSONDecodeError:
266
+ tqdm.write(
267
+ f"Skipping {mech_request} because of JSONDecodeError when parsing response"
268
+ )
269
+ wrong_response_count += 1
270
+ continue
271
+ updated_dict[k] = mech_request
272
+ time.sleep(IPFS_POLL_INTERVAL)
273
+
274
+ return updated_dict
275
+
276
+
277
+ def populate_delivers_ipfs_contents(
278
+ session: requests.Session, mech_requests: Dict[str, Any], keys_to_traverse: list
279
+ ) -> dict:
280
+ """Function to complete the delivers content info from ipfs"""
281
+ updated_dict = {}
282
+ for k in tqdm(
283
+ keys_to_traverse,
284
+ desc="Fetching IPFS contents for delivers",
285
+ position=1,
286
+ unit="results",
287
+ ):
288
+ mech_request = mech_requests[k]
289
+ if "deliver" not in mech_request or len(mech_request["deliver"]) == 0:
290
+ print(f"Skipping mech request {mech_request} because of no delivers info")
291
+ continue
292
+
293
+ deliver = mech_request["deliver"]
294
+ if "ipfsContents" not in deliver:
295
+ ipfs_hash = deliver["ipfsHash"]
296
+ request_id = deliver["requestId"]
297
+ url = f"{IPFS_ADDRESS}{ipfs_hash}/{request_id}"
298
+ response = request(session, url)
299
+ if response is None:
300
+ tqdm.write(f"Skipping {mech_request=}.")
301
+ continue
302
+ try:
303
+ contents = response.json()
304
+ metadata = contents.get("metadata", None)
305
+ if metadata and contents["metadata"]["tool"] in IRRELEVANT_TOOLS:
306
+ continue
307
+ contents.pop("cost_dict", None)
308
+ deliver["ipfsContents"] = contents
309
+ except requests.exceptions.JSONDecodeError:
310
+ tqdm.write(f"Skipping {mech_request} because of JSONDecodeError")
311
+ continue
312
+ except Exception:
313
+ tqdm.write(
314
+ f"Skipping {mech_request} because of error parsing the response"
315
+ )
316
+ continue
317
+ updated_dict[k] = mech_request
318
+ time.sleep(IPFS_POLL_INTERVAL)
319
+
320
+ return updated_dict
321
+
322
+
323
+ def write_mech_events_to_file(
324
+ mech_requests: Dict[str, Any],
325
+ filename: str,
326
+ force_write: bool = False,
327
+ ) -> None:
328
+ global last_write_time # pylint: disable=global-statement
329
+ now = time.time()
330
+
331
+ if len(mech_requests) == 0:
332
+ return
333
+
334
+ filename_path = DATA_DIR / filename
335
+ if force_write or (now - last_write_time) >= MINIMUM_WRITE_FILE_DELAY_SECONDS:
336
+ with open(filename_path, "w", encoding="utf-8") as file:
337
+ json.dump(mech_requests, file, indent=2)
338
+ last_write_time = now
339
+
340
+
341
+ def save_final_tools_json_file(data: Dict[str, Any], filename: str):
342
+ filename_path = DATA_DIR / filename
343
+ with open(filename_path, "w", encoding="utf-8") as file:
344
+ json.dump(data, file, indent=2)
345
+
346
+
347
+ def save_json_file(data: Dict[str, Any], filename: str):
348
+ """Function to save the content into a json file"""
349
+ filename_path = JSON_DATA_DIR / filename
350
+ with open(filename_path, "w", encoding="utf-8") as file:
351
+ json.dump(data, file, indent=2)
352
+
353
+
354
+ def clean_mech_delivers() -> None:
355
+ """Function to remove from the delivers json file the request Ids that are not in the mech requests"""
356
+ # read mech requests
357
+ with open(JSON_DATA_DIR / "mech_requests.json", "r") as file:
358
+ mech_requests = json.load(file)
359
+
360
+ list_reqIds = [mech_requests[k].get("requestId") for k in mech_requests.keys()]
361
+ # remove duplicated elements
362
+ list_reqIds = list(set(list_reqIds))
363
+
364
+ # remove requestIds from delivers that are not in this list
365
+ with open(JSON_DATA_DIR / "mech_delivers.json", "r") as file:
366
+ mech_delivers = json.load(file)
367
+
368
+ print(f"original size of the file {len(mech_delivers)}")
369
+ to_delete = []
370
+ for r in mech_delivers.keys():
371
+ if r not in list_reqIds:
372
+ to_delete.append(r)
373
+
374
+ for r in to_delete:
375
+ mech_delivers.pop(r, None)
376
+ print(f"final size of the file {len(mech_delivers)}")
377
+ save_json_file(mech_delivers, "mech_delivers.json")
378
+
379
+
380
+ def get_request_block_numbers(
381
+ mech_requests: Dict[str, Any], target_req_id: int
382
+ ) -> list:
383
+ block_numbers = []
384
+
385
+ for entry in mech_requests.values():
386
+ if entry["requestId"] == target_req_id:
387
+ block_numbers.append(entry["blockNumber"])
388
+
389
+ return block_numbers
390
+
391
+
392
+ def update_block_request_map(block_request_id_map: dict) -> None:
393
+ print("Saving block request id map info")
394
+ with open(JSON_DATA_DIR / "block_request_id_map.pickle", "wb") as handle:
395
+ pickle.dump(block_request_id_map, handle, protocol=pickle.HIGHEST_PROTOCOL)
396
+
397
+
398
+ def fix_duplicate_requestIds() -> dict:
399
+ with open(JSON_DATA_DIR / "mech_delivers.json", "r") as file:
400
+ data_delivers = json.load(file)
401
+
402
+ with open(JSON_DATA_DIR / "mech_requests.json", "r") as file:
403
+ mech_requests = json.load(file)
404
+ list_request_Ids = list(data_delivers.keys())
405
+
406
+ list_duplicated_reqIds = []
407
+ for req_Id in list_request_Ids:
408
+ if len(data_delivers.get(req_Id)) > 1:
409
+ list_duplicated_reqIds.append(req_Id)
410
+
411
+ print(len(list_duplicated_reqIds))
412
+ block_request_id_map = {}
413
+
414
+ for req_Id in list_duplicated_reqIds:
415
+ # get the list of mech request block numbers for that requestId
416
+ block_nrs = get_request_block_numbers(mech_requests, req_Id)
417
+ # get the list of mech delivers
418
+ mech_delivers_list = data_delivers.get(req_Id) # list of dictionaries
419
+ if len(block_nrs) > 1:
420
+ print("More than one block number was found")
421
+ for block_nr in block_nrs:
422
+ key = (block_nr, req_Id)
423
+ min_difference_request = min(
424
+ mech_delivers_list,
425
+ key=lambda x: abs(int(x["blockNumber"]) - int(block_nr)),
426
+ )
427
+ block_request_id_map[key] = min_difference_request
428
+
429
+ update_block_request_map(block_request_id_map)
430
+
431
+ return block_request_id_map
432
+
433
+
434
+ def merge_requests_delivers() -> None:
435
+ """Function to map requests and delivers"""
436
+ with open(JSON_DATA_DIR / "mech_delivers.json", "r") as file:
437
+ mech_delivers = json.load(file)
438
+
439
+ with open(JSON_DATA_DIR / "mech_requests.json", "r") as file:
440
+ mech_requests = json.load(file)
441
+
442
+ # read the block map for duplicated requestIds
443
+ with open(JSON_DATA_DIR / "block_request_id_map.pickle", "rb") as handle:
444
+ # key = (block_nr, req_Id) value = delivers dictionary
445
+ block_request_id_map = pickle.load(handle)
446
+ for _, mech_req in tqdm(
447
+ mech_requests.items(),
448
+ desc=f"Merging delivers data into the mech requests",
449
+ ):
450
+ if "deliver" in mech_req:
451
+ continue
452
+
453
+ block_number_req = mech_req["blockNumber"]
454
+ req_Id = mech_req["requestId"]
455
+ # check if it is in the duplicated map
456
+ key = (block_number_req, req_Id)
457
+ if key in block_request_id_map.keys():
458
+ deliver_dict = block_request_id_map[key]
459
+ elif req_Id in mech_delivers.keys():
460
+ deliver_dict = mech_delivers.get(req_Id)[0] # the value is a list
461
+ else:
462
+ print("No deliver entry found for this request Id")
463
+ deliver_dict = collect_missing_delivers(
464
+ request_id=req_Id, block_number=int(block_number_req)
465
+ )
466
+
467
+ # extract the info and append it to the original mech request dictionary
468
+ mech_req["deliver"] = deliver_dict
469
+ save_json_file(mech_requests, "merged_requests.json")
470
+ return
471
+
472
+
473
+ def get_ipfs_data():
474
+ with open(JSON_DATA_DIR / "merged_requests.json", "r") as file:
475
+ mech_requests = json.load(file)
476
+
477
+ total_keys_to_traverse = list(mech_requests.keys())
478
+ updated_mech_requests = dict()
479
+ session = create_session()
480
+ print("UPDATING IPFS CONTENTS OF REQUESTS")
481
+ # requests
482
+ with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
483
+ futures = []
484
+ for i in range(0, len(mech_requests), GET_CONTENTS_BATCH_SIZE):
485
+ futures.append(
486
+ executor.submit(
487
+ populate_requests_ipfs_contents,
488
+ session,
489
+ mech_requests,
490
+ total_keys_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE],
491
+ )
492
+ )
493
+
494
+ for future in tqdm(
495
+ as_completed(futures),
496
+ total=len(futures),
497
+ desc=f"Fetching all ipfs contents from requests ",
498
+ ):
499
+ partial_dict = future.result()
500
+ updated_mech_requests.update(partial_dict)
501
+
502
+ save_final_tools_json_file(updated_mech_requests, "tools_info.json")
503
+
504
+ # delivers
505
+ print("UPDATING IPFS CONTENTS OF DELIVERS")
506
+ total_keys_to_traverse = list(updated_mech_requests.keys())
507
+ final_tools_content = {}
508
+ with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
509
+ futures = []
510
+ for i in range(0, len(updated_mech_requests), GET_CONTENTS_BATCH_SIZE):
511
+ futures.append(
512
+ executor.submit(
513
+ populate_delivers_ipfs_contents,
514
+ session,
515
+ updated_mech_requests,
516
+ total_keys_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE],
517
+ )
518
+ )
519
+
520
+ for future in tqdm(
521
+ as_completed(futures),
522
+ total=len(futures),
523
+ desc=f"Fetching all ipfs contents from delivers ",
524
+ ):
525
+ partial_dict = future.result()
526
+ final_tools_content.update(partial_dict)
527
+
528
+ save_final_tools_json_file(final_tools_content, "tools_info.json")
529
+
530
+
531
+ def only_delivers_loop():
532
+ with open(DATA_DIR / "tools_info.json", "r") as file:
533
+ updated_mech_requests = json.load(file)
534
+
535
+ # delivers
536
+ session = create_session()
537
+ print("UPDATING IPFS CONTENTS OF DELIVERS")
538
+ total_keys_to_traverse = list(updated_mech_requests.keys())
539
+ final_tools_content = {}
540
+ with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
541
+ futures = []
542
+ for i in range(0, len(updated_mech_requests), GET_CONTENTS_BATCH_SIZE):
543
+ futures.append(
544
+ executor.submit(
545
+ populate_delivers_ipfs_contents,
546
+ session,
547
+ updated_mech_requests,
548
+ total_keys_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE],
549
+ )
550
+ )
551
+
552
+ for future in tqdm(
553
+ as_completed(futures),
554
+ total=len(futures),
555
+ desc=f"Fetching all ipfs contents from delivers ",
556
+ ):
557
+ partial_dict = future.result()
558
+ final_tools_content.update(partial_dict)
559
+
560
+ save_final_tools_json_file(final_tools_content, "tools_info.json")
scripts/profitability.py CHANGED
@@ -99,6 +99,7 @@ class MarketAttribute(Enum):
99
 
100
  ALL_TRADES_STATS_DF_COLS = [
101
  "trader_address",
 
102
  "trade_id",
103
  "creation_timestamp",
104
  "title",
@@ -407,6 +408,7 @@ def analyse_trader(
407
  earnings, winner_trade = (0, False)
408
  redemption = _is_redeemed(user_json, trade)
409
  current_answer = trade["fpmm.currentAnswer"]
 
410
 
411
  # Determine market status
412
  market_status = determine_market_status(trade, current_answer)
@@ -431,9 +433,16 @@ def analyse_trader(
431
  winner_trade = True
432
 
433
  # Compute mech calls
434
- num_mech_calls = (
435
- tools_usage["prompt_request"].apply(lambda x: trade["title"] in x).sum()
436
- )
 
 
 
 
 
 
 
437
  net_earnings = (
438
  earnings
439
  - fee_amount
@@ -444,6 +453,7 @@ def analyse_trader(
444
  # Assign values to DataFrame
445
  trades_df.loc[i] = {
446
  "trader_address": trader_address,
 
447
  "trade_id": trade["id"],
448
  "market_status": market_status.name,
449
  "creation_timestamp": creation_timestamp_utc,
@@ -548,7 +558,6 @@ def run_profitability_analysis(
548
  rpc, tools_filename, trades_filename, from_timestamp
549
  )
550
  tools["trader_address"] = tools["trader_address"].str.lower()
551
- print(f"List of market creators = {trades["trader_address"].unique()}")
552
 
553
  # all trades profitability df
554
  print("Analysing trades...")
 
99
 
100
  ALL_TRADES_STATS_DF_COLS = [
101
  "trader_address",
102
+ "market_creator",
103
  "trade_id",
104
  "creation_timestamp",
105
  "title",
 
408
  earnings, winner_trade = (0, False)
409
  redemption = _is_redeemed(user_json, trade)
410
  current_answer = trade["fpmm.currentAnswer"]
411
+ market_creator = trade["market_creator"]
412
 
413
  # Determine market status
414
  market_status = determine_market_status(trade, current_answer)
 
433
  winner_trade = True
434
 
435
  # Compute mech calls
436
+ try:
437
+ num_mech_calls = (
438
+ tools_usage["prompt_request"]
439
+ .apply(lambda x: trade["title"] in x)
440
+ .sum()
441
+ )
442
+ except Exception:
443
+ print(f"Error while getting the number of mech calls")
444
+ num_mech_calls = 0 # No info
445
+
446
  net_earnings = (
447
  earnings
448
  - fee_amount
 
453
  # Assign values to DataFrame
454
  trades_df.loc[i] = {
455
  "trader_address": trader_address,
456
+ "market_creator": market_creator,
457
  "trade_id": trade["id"],
458
  "market_status": market_status.name,
459
  "creation_timestamp": creation_timestamp_utc,
 
558
  rpc, tools_filename, trades_filename, from_timestamp
559
  )
560
  tools["trader_address"] = tools["trader_address"].str.lower()
 
561
 
562
  # all trades profitability df
563
  print("Analysing trades...")
scripts/pull_data.py CHANGED
@@ -16,10 +16,13 @@ from tools import (
16
  etl as tools_etl,
17
  DEFAULT_FILENAME as TOOLS_FILENAME,
18
  update_tools_accuracy,
 
19
  )
20
  from profitability import run_profitability_analysis
21
  from utils import get_question, current_answer, RPC
22
- from get_mech_info import get_mech_info_last_60_days
 
 
23
  from update_tools_accuracy import compute_tools_accuracy
24
  import gc
25
 
@@ -122,21 +125,20 @@ def updating_timestamps(rpc: str):
122
  def weekly_analysis():
123
  """Run weekly analysis for the FPMMS project."""
124
  rpc = RPC
125
-
126
  # Run markets ETL
127
  logging.info("Running markets ETL")
128
  mkt_etl(MARKETS_FILENAME)
129
  logging.info("Markets ETL completed")
130
 
131
- # Run tools ETL
132
- logging.info("Running tools ETL")
 
 
133
 
134
- # This etl is saving already the tools parquet file
135
- tools_etl(
136
- rpcs=[rpc],
137
- mech_info=get_mech_info_last_60_days(),
138
- filename=TOOLS_FILENAME,
139
- )
140
  logging.info("Tools ETL completed")
141
 
142
  # Run profitability analysis
@@ -146,6 +148,7 @@ def weekly_analysis():
146
  run_profitability_analysis(
147
  rpc=rpc,
148
  )
 
149
  logging.info("Profitability analysis completed")
150
  add_current_answer()
151
  try:
@@ -163,3 +166,4 @@ if __name__ == "__main__":
163
  weekly_analysis()
164
  # rpc = RPC
165
  # updating_timestamps(rpc)
 
 
16
  etl as tools_etl,
17
  DEFAULT_FILENAME as TOOLS_FILENAME,
18
  update_tools_accuracy,
19
+ generate_tools_file,
20
  )
21
  from profitability import run_profitability_analysis
22
  from utils import get_question, current_answer, RPC
23
+ from get_mech_info import (
24
+ get_mech_events_last_60_days,
25
+ )
26
  from update_tools_accuracy import compute_tools_accuracy
27
  import gc
28
 
 
125
  def weekly_analysis():
126
  """Run weekly analysis for the FPMMS project."""
127
  rpc = RPC
 
128
  # Run markets ETL
129
  logging.info("Running markets ETL")
130
  mkt_etl(MARKETS_FILENAME)
131
  logging.info("Markets ETL completed")
132
 
133
+ # New tools ETL
134
+ logging.info("Generating the mech json files")
135
+ get_mech_events_last_60_days()
136
+ logging.info("Finished generating the mech json files")
137
 
138
+ # Run tools ETL
139
+ logging.info("Running new tools ETL")
140
+ get_mech_events_last_60_days()
141
+ generate_tools_file()
 
 
142
  logging.info("Tools ETL completed")
143
 
144
  # Run profitability analysis
 
148
  run_profitability_analysis(
149
  rpc=rpc,
150
  )
151
+
152
  logging.info("Profitability analysis completed")
153
  add_current_answer()
154
  try:
 
166
  weekly_analysis()
167
  # rpc = RPC
168
  # updating_timestamps(rpc)
169
+ # compute_tools_accuracy()
scripts/tools.py CHANGED
@@ -18,7 +18,7 @@
18
  # ------------------------------------------------------------------------------
19
 
20
  import os.path
21
- import re
22
  import time
23
  import random
24
  from typing import (
@@ -66,6 +66,10 @@ from utils import (
66
  HTTP,
67
  HTTPS,
68
  REQUEST_SENDER,
 
 
 
 
69
  )
70
 
71
  CONTRACTS_PATH = "contracts"
@@ -184,7 +188,7 @@ def get_events(
184
  f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds."
185
  )
186
  if hasattr(exc, "message"):
187
- tqdm.write(f"Error message: {exc.messge}\n")
188
  time.sleep(sleep)
189
 
190
  from_block += batch_size
@@ -201,6 +205,7 @@ def get_events(
201
 
202
 
203
  def parse_events(raw_events: List) -> List[MechEvent]:
 
204
  """Parse all the specified MechEvents."""
205
  parsed_events = []
206
  for event in raw_events:
@@ -215,6 +220,24 @@ def parse_events(raw_events: List) -> List[MechEvent]:
215
  return parsed_events
216
 
217
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
  def create_session() -> requests.Session:
219
  """Create a session with a retry strategy."""
220
  session = requests.Session()
@@ -258,7 +281,7 @@ def parse_ipfs_response(
258
  return response.json()
259
  except requests.exceptions.JSONDecodeError:
260
  # this is a workaround because the `metadata.json` file was introduced and removed multiple times
261
- if event_name == MechEventName.REQUEST and url != event.ipfs_request_link:
262
  url = event.ipfs_request_link
263
  response = request(session, url)
264
  if response is None:
@@ -320,6 +343,54 @@ def get_contents(
320
  return pd.DataFrame(contents)
321
 
322
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
  def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
324
  """Transform the requests dataframe."""
325
  return clean(contents)
@@ -370,7 +441,7 @@ def store_progress(
370
  content.to_parquet(DATA_DIR / event_filename, index=False)
371
  except Exception as e:
372
  print(f"Failed to write {event_name} data: {e}")
373
- # Drop result and error columns for tools DataFrame
374
  try:
375
  if "result" in tools.columns:
376
  tools = tools.drop(columns=["result"])
@@ -487,6 +558,55 @@ def etl(
487
  return tools
488
 
489
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
490
  def update_tools_accuracy(
491
  tools_acc: pd.DataFrame, tools_df: pd.DataFrame, inc_tools: List[str]
492
  ) -> pd.DataFrame:
 
18
  # ------------------------------------------------------------------------------
19
 
20
  import os.path
21
+ import json
22
  import time
23
  import random
24
  from typing import (
 
66
  HTTP,
67
  HTTPS,
68
  REQUEST_SENDER,
69
+ get_result_values,
70
+ get_vote,
71
+ get_win_probability,
72
+ get_prediction_values,
73
  )
74
 
75
  CONTRACTS_PATH = "contracts"
 
188
  f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds."
189
  )
190
  if hasattr(exc, "message"):
191
+ tqdm.write(f"Error message: {exc.message}\n")
192
  time.sleep(sleep)
193
 
194
  from_block += batch_size
 
205
 
206
 
207
  def parse_events(raw_events: List) -> List[MechEvent]:
208
+ # TODO use dictionary instead of List
209
  """Parse all the specified MechEvents."""
210
  parsed_events = []
211
  for event in raw_events:
 
220
  return parsed_events
221
 
222
 
223
+ def parse_dict_events(events_dict: dict) -> List[MechEvent]:
224
+ # TODO use dictionary instead of List
225
+ """Parse all the specified MechEvents."""
226
+ parsed_events = []
227
+ list_ids = list(events_dict.keys())
228
+ for mech_id in list_ids:
229
+ event = events_dict[mech_id]
230
+ for_block = event.get("blockNumber", 0)
231
+ args = event.get(EVENT_ARGUMENTS, {})
232
+ request_id = args.get(REQUEST_ID, 0)
233
+ data = args.get(DATA, b"")
234
+ sender = args.get(REQUEST_SENDER, "")
235
+ parsed_event = MechEvent(for_block, request_id, data, sender)
236
+ parsed_events.append(parsed_event)
237
+
238
+ return parsed_events
239
+
240
+
241
  def create_session() -> requests.Session:
242
  """Create a session with a retry strategy."""
243
  session = requests.Session()
 
281
  return response.json()
282
  except requests.exceptions.JSONDecodeError:
283
  # this is a workaround because the `metadata.json` file was introduced and removed multiple times
284
+ if event_name == MechEvent.REQUEST and url != event.ipfs_request_link:
285
  url = event.ipfs_request_link
286
  response = request(session, url)
287
  if response is None:
 
343
  return pd.DataFrame(contents)
344
 
345
 
346
+ def parse_json_events(json_events: dict, keys_to_traverse: List[int]) -> pd.DataFrame:
347
+ """Function to parse the mech info in a json format"""
348
+ all_records = []
349
+ for key in keys_to_traverse:
350
+ try:
351
+ json_input = json_events[key]
352
+ output = {}
353
+ output["request_id"] = json_input["requestId"]
354
+ output["request_block"] = json_input["blockNumber"]
355
+ output["prompt_request"] = json_input["ipfsContents"]["prompt"]
356
+ output["tool"] = json_input["ipfsContents"]["tool"]
357
+ output["nonce"] = json_input["ipfsContents"]["nonce"]
358
+ output["trader_address"] = json_input["sender"]
359
+ output["deliver_block"] = json_input["deliver"]["blockNumber"]
360
+ error_value, error_message, prediction_params = get_result_values(
361
+ json_input["deliver"]["ipfsContents"]["result"]
362
+ )
363
+ error_message_value = json_input.get("error_message", error_message)
364
+ output["error"] = error_value
365
+ output["error_message"] = error_message_value
366
+ output["prompt_response"] = json_input["deliver"]["ipfsContents"]["prompt"]
367
+ output["mech_address"] = json_input["deliver"]["sender"]
368
+ p_yes_value, p_no_value, confidence_value, info_utility_value = (
369
+ get_prediction_values(prediction_params)
370
+ )
371
+ output["p_yes"] = p_yes_value
372
+ output["p_no"] = p_no_value
373
+ output["confidence"] = confidence_value
374
+ output["info_utility"] = info_utility_value
375
+ output["vote"] = get_vote(p_yes_value, p_no_value)
376
+ output["win_probability"] = get_win_probability(p_yes_value, p_no_value)
377
+ all_records.append(output)
378
+ except Exception as e:
379
+ print(e)
380
+ print(f"Error parsing the key ={key}. Noted as error")
381
+ output["error"] = 1
382
+ output["error_message"] = "Response parsing error"
383
+ output["p_yes"] = None
384
+ output["p_no"] = None
385
+ output["confidence"] = None
386
+ output["info_utility"] = None
387
+ output["vote"] = None
388
+ output["win_probability"] = None
389
+ all_records.append(output)
390
+
391
+ return pd.DataFrame.from_dict(all_records, orient="columns")
392
+
393
+
394
  def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
395
  """Transform the requests dataframe."""
396
  return clean(contents)
 
441
  content.to_parquet(DATA_DIR / event_filename, index=False)
442
  except Exception as e:
443
  print(f"Failed to write {event_name} data: {e}")
444
+ # Drop result columns for tools DataFrame
445
  try:
446
  if "result" in tools.columns:
447
  tools = tools.drop(columns=["result"])
 
558
  return tools
559
 
560
 
561
+ def parse_store_json_events_parallel(
562
+ json_events: Dict[str, Any], filename: str = DEFAULT_FILENAME
563
+ ):
564
+ total_nr_events = len(json_events)
565
+ ids_to_traverse = list(json_events.keys())
566
+ print(f"Parsing {total_nr_events} events")
567
+ contents = []
568
+ with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
569
+ futures = []
570
+ for i in range(0, total_nr_events, GET_CONTENTS_BATCH_SIZE):
571
+ futures.append(
572
+ executor.submit(
573
+ parse_json_events,
574
+ json_events,
575
+ ids_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE],
576
+ )
577
+ )
578
+
579
+ for future in tqdm(
580
+ as_completed(futures),
581
+ total=len(futures),
582
+ desc=f"Fetching json contents",
583
+ ):
584
+ current_mech_contents = future.result()
585
+ contents.append(current_mech_contents)
586
+
587
+ tools = pd.concat(contents, ignore_index=True)
588
+ print(f"Length of the contents dataframe {len(tools)}")
589
+ print(tools.info())
590
+ try:
591
+ if "result" in tools.columns:
592
+ tools = tools.drop(columns=["result"])
593
+ tools.to_parquet(DATA_DIR / filename, index=False)
594
+ except Exception as e:
595
+ print(f"Failed to write tools data: {e}")
596
+
597
+ return tools
598
+
599
+
600
+ def generate_tools_file():
601
+ """Function to parse the json mech events and generate the parquet tools file"""
602
+ try:
603
+ with open(DATA_DIR / "tools_info.json", "r") as file:
604
+ file_contents = json.load(file)
605
+ parse_store_json_events_parallel(file_contents)
606
+ except Exception as e:
607
+ print(f"An Exception happened while parsing the json events {e}")
608
+
609
+
610
  def update_tools_accuracy(
611
  tools_acc: pd.DataFrame, tools_df: pd.DataFrame, inc_tools: List[str]
612
  ) -> pd.DataFrame: