cyberosa commited on
Commit
5f5eb85
·
1 Parent(s): 89c034b

cleaning, new notebooks and two months data logic

Browse files
app.py CHANGED
@@ -4,25 +4,25 @@ import pandas as pd
4
  import duckdb
5
  import logging
6
  from tabs.trades import (
7
- prepare_trades,
8
- get_overall_trades,
9
  get_overall_winning_trades,
10
  plot_trades_by_week,
11
  plot_winning_trades_by_week,
12
- plot_trade_details
13
  )
14
  from tabs.tool_win import (
15
  get_tool_winning_rate,
16
  get_overall_winning_rate,
17
  plot_tool_winnings_overall,
18
- plot_tool_winnings_by_tool
19
  )
20
  from tabs.error import (
21
- get_error_data,
22
  get_error_data_overall,
23
  plot_error_data,
24
  plot_tool_error_data,
25
- plot_week_error_data
26
  )
27
  from tabs.about import about_olas_predict
28
 
@@ -33,21 +33,25 @@ def get_logger():
33
  # stream handler and formatter
34
  stream_handler = logging.StreamHandler()
35
  stream_handler.setLevel(logging.DEBUG)
36
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
 
37
  stream_handler.setFormatter(formatter)
38
  logger.addHandler(stream_handler)
39
  return logger
40
 
 
41
  logger = get_logger()
42
 
 
43
  def get_last_one_month_data():
44
  """
45
  Get the last one month data from the tools.parquet file
46
  """
47
  logger.info("Getting last one month data")
48
- con = duckdb.connect(':memory:')
49
- one_months_ago = (datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d')
50
-
51
  # Query to fetch data from all_trades_profitability.parquet
52
  query2 = f"""
53
  SELECT *
@@ -69,19 +73,47 @@ def get_last_one_month_data():
69
 
70
  return df1, df2
71
 
72
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  def prepare_data():
74
  """
75
  Prepare the data for the dashboard
76
  """
77
- tools_df, trades_df = get_last_one_month_data()
78
 
79
- tools_df['request_time'] = pd.to_datetime(tools_df['request_time'])
80
- trades_df['creation_timestamp'] = pd.to_datetime(trades_df['creation_timestamp'])
81
 
82
  trades_df = prepare_trades(trades_df)
83
  return tools_df, trades_df
84
 
 
85
  tools_df, trades_df = prepare_data()
86
 
87
 
@@ -89,53 +121,39 @@ demo = gr.Blocks()
89
 
90
 
91
  INC_TOOLS = [
92
- 'prediction-online',
93
- 'prediction-offline',
94
- 'claude-prediction-online',
95
- 'claude-prediction-offline',
96
- 'prediction-offline-sme',
97
- 'prediction-online-sme',
98
- 'prediction-request-rag',
99
- 'prediction-request-reasoning',
100
- 'prediction-url-cot-claude',
101
- 'prediction-request-rag-claude',
102
- 'prediction-request-reasoning-claude'
103
  ]
104
 
105
 
106
- error_df = get_error_data(
107
- tools_df=tools_df,
108
- inc_tools=INC_TOOLS
109
- )
110
- error_overall_df = get_error_data_overall(
111
- error_df=error_df
112
- )
113
- winning_rate_df = get_tool_winning_rate(
114
- tools_df=tools_df,
115
- inc_tools=INC_TOOLS
116
- )
117
- winning_rate_overall_df = get_overall_winning_rate(
118
- wins_df=winning_rate_df
119
- )
120
- trades_count_df = get_overall_trades(
121
- trades_df=trades_df
122
- )
123
- trades_winning_rate_df = get_overall_winning_trades(
124
- trades_df=trades_df
125
- )
126
 
127
  with demo:
128
  gr.HTML("<h1>Olas Predict Actual Performance</h1>")
129
- gr.Markdown("This app shows the actual performance of Olas Predict tools on the live market.")
 
 
130
 
131
  with gr.Tabs():
132
  with gr.TabItem("🔥Trades Dashboard"):
133
  with gr.Row():
134
  gr.Markdown("# Plot of number of trades by week")
135
  with gr.Row():
136
- trades_by_week_plot = plot_trades_by_week(
137
- trades_df=trades_count_df
138
- )
139
  with gr.Row():
140
  gr.Markdown("# Plot of winning trades by week")
141
  with gr.Row():
@@ -146,32 +164,30 @@ with demo:
146
  gr.Markdown("# Plot of trade details")
147
  with gr.Row():
148
  trade_details_selector = gr.Dropdown(
149
- label="Select a trade",
150
  choices=[
151
  "mech calls",
152
  "collateral amount",
153
  "earnings",
154
  "net earnings",
155
- "ROI"
156
  ],
157
- value="mech calls"
158
  )
159
  with gr.Row():
160
  trade_details_plot = plot_trade_details(
161
- trade_detail="mech calls",
162
- trades_df=trades_df
163
  )
164
-
165
  def update_trade_details(trade_detail):
166
  return plot_trade_details(
167
- trade_detail=trade_detail,
168
- trades_df=trades_df
169
  )
170
 
171
  trade_details_selector.change(
172
- update_trade_details,
173
- inputs=trade_details_selector,
174
- outputs=trade_details_plot
175
  )
176
 
177
  with gr.Row():
@@ -185,27 +201,25 @@ with demo:
185
 
186
  with gr.Row():
187
  winning_selector = gr.Dropdown(
188
- label="Select Metric",
189
- choices=['losses', 'wins', 'total_request', 'win_perc'],
190
- value='win_perc',
191
  )
192
 
193
  with gr.Row():
194
  winning_plot = plot_tool_winnings_overall(
195
- wins_df=winning_rate_overall_df,
196
- winning_selector="win_perc"
197
  )
198
 
199
  def update_tool_winnings_overall_plot(winning_selector):
200
  return plot_tool_winnings_overall(
201
- wins_df=winning_rate_overall_df,
202
- winning_selector=winning_selector
203
  )
204
 
205
  winning_selector.change(
206
  update_tool_winnings_overall_plot,
207
- inputs=winning_selector,
208
- outputs=winning_plot
209
  )
210
 
211
  with gr.Row():
@@ -215,30 +229,24 @@ with demo:
215
 
216
  with gr.Row():
217
  gr.Markdown("# Plot showing winning rate by tool")
218
-
219
  with gr.Row():
220
  sel_tool = gr.Dropdown(
221
- label="Select a tool",
222
- choices=INC_TOOLS,
223
- value=INC_TOOLS[0]
224
  )
225
 
226
  with gr.Row():
227
  tool_winnings_by_tool_plot = plot_tool_winnings_by_tool(
228
- wins_df=winning_rate_df,
229
- tool=INC_TOOLS[0]
230
  )
231
 
232
  def update_tool_winnings_by_tool_plot(tool):
233
- return plot_tool_winnings_by_tool(
234
- wins_df=winning_rate_df,
235
- tool=tool
236
- )
237
 
238
  sel_tool.change(
239
  update_tool_winnings_by_tool_plot,
240
- inputs=sel_tool,
241
- outputs=tool_winnings_by_tool_plot
242
  )
243
 
244
  with gr.Row():
@@ -250,35 +258,24 @@ with demo:
250
  with gr.Row():
251
  gr.Markdown("# Plot showing overall error")
252
  with gr.Row():
253
- error_overall_plot = plot_error_data(
254
- error_all_df=error_overall_df
255
- )
256
  with gr.Row():
257
  gr.Markdown("# Plot showing error by tool")
258
  with gr.Row():
259
  sel_tool = gr.Dropdown(
260
- label="Select a tool",
261
- choices=INC_TOOLS,
262
- value=INC_TOOLS[0]
263
  )
264
 
265
  with gr.Row():
266
  tool_error_plot = plot_tool_error_data(
267
- error_df=error_df,
268
- tool=INC_TOOLS[0]
269
  )
270
 
271
-
272
  def update_tool_error_plot(tool):
273
- return plot_tool_error_data(
274
- error_df=error_df,
275
- tool=tool
276
- )
277
 
278
  sel_tool.change(
279
- update_tool_error_plot,
280
- inputs=sel_tool,
281
- outputs=tool_error_plot
282
  )
283
  with gr.Row():
284
  sel_tool
@@ -289,29 +286,27 @@ with demo:
289
  gr.Markdown("# Plot showing error by week")
290
 
291
  with gr.Row():
292
- choices = error_overall_df['request_month_year_week'].unique().tolist()
293
  # sort the choices by the latest week to be on the top
294
  choices = sorted(choices)
295
  sel_week = gr.Dropdown(
296
- label="Select a week",
297
- choices=choices,
298
- value=choices[-1]
299
- )
300
 
301
  with gr.Row():
302
  week_error_plot = plot_week_error_data(
303
- error_df=error_df,
304
- week=choices[-1]
305
  )
306
 
307
  def update_week_error_plot(selected_week):
308
- return plot_week_error_data(
309
- error_df=error_df,
310
- week=selected_week
311
- )
312
 
313
- sel_tool.change(update_tool_error_plot, inputs=sel_tool, outputs=tool_error_plot)
314
- sel_week.change(update_week_error_plot, inputs=sel_week, outputs=week_error_plot)
 
 
 
 
315
 
316
  with gr.Row():
317
  sel_tool
 
4
  import duckdb
5
  import logging
6
  from tabs.trades import (
7
+ prepare_trades,
8
+ get_overall_trades,
9
  get_overall_winning_trades,
10
  plot_trades_by_week,
11
  plot_winning_trades_by_week,
12
+ plot_trade_details,
13
  )
14
  from tabs.tool_win import (
15
  get_tool_winning_rate,
16
  get_overall_winning_rate,
17
  plot_tool_winnings_overall,
18
+ plot_tool_winnings_by_tool,
19
  )
20
  from tabs.error import (
21
+ get_error_data,
22
  get_error_data_overall,
23
  plot_error_data,
24
  plot_tool_error_data,
25
+ plot_week_error_data,
26
  )
27
  from tabs.about import about_olas_predict
28
 
 
33
  # stream handler and formatter
34
  stream_handler = logging.StreamHandler()
35
  stream_handler.setLevel(logging.DEBUG)
36
+ formatter = logging.Formatter(
37
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
38
+ )
39
  stream_handler.setFormatter(formatter)
40
  logger.addHandler(stream_handler)
41
  return logger
42
 
43
+
44
  logger = get_logger()
45
 
46
+
47
  def get_last_one_month_data():
48
  """
49
  Get the last one month data from the tools.parquet file
50
  """
51
  logger.info("Getting last one month data")
52
+ con = duckdb.connect(":memory:")
53
+ one_months_ago = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
54
+
55
  # Query to fetch data from all_trades_profitability.parquet
56
  query2 = f"""
57
  SELECT *
 
73
 
74
  return df1, df2
75
 
76
+
77
+ def get_all_data():
78
+ """
79
+ Get all data from the tools.parquet and all_trades_profitability.parquet files
80
+ """
81
+ logger.info("Getting all data")
82
+ con = duckdb.connect(":memory:")
83
+
84
+ # Query to fetch data from all_trades_profitability.parquet
85
+ query2 = f"""
86
+ SELECT *
87
+ FROM read_parquet('./data/all_trades_profitability.parquet')
88
+ """
89
+ df2 = con.execute(query2).fetchdf()
90
+ logger.info("Got all data from all_trades_profitability.parquet")
91
+
92
+ query1 = f"""
93
+ SELECT *
94
+ FROM read_parquet('./data/tools.parquet')
95
+ """
96
+ df1 = con.execute(query1).fetchdf()
97
+ logger.info("Got all data from tools.parquet")
98
+
99
+ con.close()
100
+
101
+ return df1, df2
102
+
103
+
104
  def prepare_data():
105
  """
106
  Prepare the data for the dashboard
107
  """
108
+ tools_df, trades_df = get_all_data()
109
 
110
+ tools_df["request_time"] = pd.to_datetime(tools_df["request_time"])
111
+ trades_df["creation_timestamp"] = pd.to_datetime(trades_df["creation_timestamp"])
112
 
113
  trades_df = prepare_trades(trades_df)
114
  return tools_df, trades_df
115
 
116
+
117
  tools_df, trades_df = prepare_data()
118
 
119
 
 
121
 
122
 
123
  INC_TOOLS = [
124
+ "prediction-online",
125
+ "prediction-offline",
126
+ "claude-prediction-online",
127
+ "claude-prediction-offline",
128
+ "prediction-offline-sme",
129
+ "prediction-online-sme",
130
+ "prediction-request-rag",
131
+ "prediction-request-reasoning",
132
+ "prediction-url-cot-claude",
133
+ "prediction-request-rag-claude",
134
+ "prediction-request-reasoning-claude",
135
  ]
136
 
137
 
138
+ error_df = get_error_data(tools_df=tools_df, inc_tools=INC_TOOLS)
139
+ error_overall_df = get_error_data_overall(error_df=error_df)
140
+ winning_rate_df = get_tool_winning_rate(tools_df=tools_df, inc_tools=INC_TOOLS)
141
+ winning_rate_overall_df = get_overall_winning_rate(wins_df=winning_rate_df)
142
+ trades_count_df = get_overall_trades(trades_df=trades_df)
143
+ trades_winning_rate_df = get_overall_winning_trades(trades_df=trades_df)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
  with demo:
146
  gr.HTML("<h1>Olas Predict Actual Performance</h1>")
147
+ gr.Markdown(
148
+ "This app shows the actual performance of Olas Predict tools on the live market."
149
+ )
150
 
151
  with gr.Tabs():
152
  with gr.TabItem("🔥Trades Dashboard"):
153
  with gr.Row():
154
  gr.Markdown("# Plot of number of trades by week")
155
  with gr.Row():
156
+ trades_by_week_plot = plot_trades_by_week(trades_df=trades_count_df)
 
 
157
  with gr.Row():
158
  gr.Markdown("# Plot of winning trades by week")
159
  with gr.Row():
 
164
  gr.Markdown("# Plot of trade details")
165
  with gr.Row():
166
  trade_details_selector = gr.Dropdown(
167
+ label="Select a trade",
168
  choices=[
169
  "mech calls",
170
  "collateral amount",
171
  "earnings",
172
  "net earnings",
173
+ "ROI",
174
  ],
175
+ value="mech calls",
176
  )
177
  with gr.Row():
178
  trade_details_plot = plot_trade_details(
179
+ trade_detail="mech calls", trades_df=trades_df
 
180
  )
181
+
182
  def update_trade_details(trade_detail):
183
  return plot_trade_details(
184
+ trade_detail=trade_detail, trades_df=trades_df
 
185
  )
186
 
187
  trade_details_selector.change(
188
+ update_trade_details,
189
+ inputs=trade_details_selector,
190
+ outputs=trade_details_plot,
191
  )
192
 
193
  with gr.Row():
 
201
 
202
  with gr.Row():
203
  winning_selector = gr.Dropdown(
204
+ label="Select Metric",
205
+ choices=["losses", "wins", "total_request", "win_perc"],
206
+ value="win_perc",
207
  )
208
 
209
  with gr.Row():
210
  winning_plot = plot_tool_winnings_overall(
211
+ wins_df=winning_rate_overall_df, winning_selector="win_perc"
 
212
  )
213
 
214
  def update_tool_winnings_overall_plot(winning_selector):
215
  return plot_tool_winnings_overall(
216
+ wins_df=winning_rate_overall_df, winning_selector=winning_selector
 
217
  )
218
 
219
  winning_selector.change(
220
  update_tool_winnings_overall_plot,
221
+ inputs=winning_selector,
222
+ outputs=winning_plot,
223
  )
224
 
225
  with gr.Row():
 
229
 
230
  with gr.Row():
231
  gr.Markdown("# Plot showing winning rate by tool")
232
+
233
  with gr.Row():
234
  sel_tool = gr.Dropdown(
235
+ label="Select a tool", choices=INC_TOOLS, value=INC_TOOLS[0]
 
 
236
  )
237
 
238
  with gr.Row():
239
  tool_winnings_by_tool_plot = plot_tool_winnings_by_tool(
240
+ wins_df=winning_rate_df, tool=INC_TOOLS[0]
 
241
  )
242
 
243
  def update_tool_winnings_by_tool_plot(tool):
244
+ return plot_tool_winnings_by_tool(wins_df=winning_rate_df, tool=tool)
 
 
 
245
 
246
  sel_tool.change(
247
  update_tool_winnings_by_tool_plot,
248
+ inputs=sel_tool,
249
+ outputs=tool_winnings_by_tool_plot,
250
  )
251
 
252
  with gr.Row():
 
258
  with gr.Row():
259
  gr.Markdown("# Plot showing overall error")
260
  with gr.Row():
261
+ error_overall_plot = plot_error_data(error_all_df=error_overall_df)
 
 
262
  with gr.Row():
263
  gr.Markdown("# Plot showing error by tool")
264
  with gr.Row():
265
  sel_tool = gr.Dropdown(
266
+ label="Select a tool", choices=INC_TOOLS, value=INC_TOOLS[0]
 
 
267
  )
268
 
269
  with gr.Row():
270
  tool_error_plot = plot_tool_error_data(
271
+ error_df=error_df, tool=INC_TOOLS[0]
 
272
  )
273
 
 
274
  def update_tool_error_plot(tool):
275
+ return plot_tool_error_data(error_df=error_df, tool=tool)
 
 
 
276
 
277
  sel_tool.change(
278
+ update_tool_error_plot, inputs=sel_tool, outputs=tool_error_plot
 
 
279
  )
280
  with gr.Row():
281
  sel_tool
 
286
  gr.Markdown("# Plot showing error by week")
287
 
288
  with gr.Row():
289
+ choices = error_overall_df["request_month_year_week"].unique().tolist()
290
  # sort the choices by the latest week to be on the top
291
  choices = sorted(choices)
292
  sel_week = gr.Dropdown(
293
+ label="Select a week", choices=choices, value=choices[-1]
294
+ )
 
 
295
 
296
  with gr.Row():
297
  week_error_plot = plot_week_error_data(
298
+ error_df=error_df, week=choices[-1]
 
299
  )
300
 
301
  def update_week_error_plot(selected_week):
302
+ return plot_week_error_data(error_df=error_df, week=selected_week)
 
 
 
303
 
304
+ sel_tool.change(
305
+ update_tool_error_plot, inputs=sel_tool, outputs=tool_error_plot
306
+ )
307
+ sel_week.change(
308
+ update_week_error_plot, inputs=sel_week, outputs=week_error_plot
309
+ )
310
 
311
  with gr.Row():
312
  sel_tool
notebooks/confidence_analysis.ipynb ADDED
The diff for this file is too large to render. See raw diff
 
scripts/get_mech_info.py ADDED
@@ -0,0 +1,89 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from dataclasses import dataclass
2
+ from string import Template
3
+ from typing import Any
4
+ from datetime import datetime, timedelta, UTC
5
+ import requests
6
+
7
+ MECH_SUBGRAPH_URL = "https://api.thegraph.com/subgraphs/name/stakewise/ethereum-gnosis"
8
+ SUBGRAPH_HEADERS = {
9
+ "Accept": "application/json, multipart/mixed",
10
+ "Content-Type": "application/json",
11
+ }
12
+ QUERY_BATCH_SIZE = 1000
13
+ DATETIME_60_DAYS_AGO = datetime.now(UTC) - timedelta(days=60)
14
+ BLOCK_NUMBER = Template(
15
+ """
16
+ {
17
+ blocks(
18
+ first: 1,
19
+ orderBy: timestamp,
20
+ orderDirection: asc,
21
+ where: {
22
+ timestamp_gte: "${timestamp_from}",
23
+ timestamp_lte: "${timestamp_to}"
24
+ }
25
+ ){
26
+ id
27
+ }
28
+ }
29
+ """
30
+ )
31
+
32
+
33
+ def fetch_block_number(timestamp_from: int, timestamp_to: int) -> dict:
34
+ """Get a block number by its timestamp margins."""
35
+
36
+ query = BLOCK_NUMBER.substitute(
37
+ timestamp_from=timestamp_from, timestamp_to=timestamp_to
38
+ )
39
+ # print(f"Sending query for the subgraph = {query}")
40
+
41
+ response = requests.post(
42
+ MECH_SUBGRAPH_URL,
43
+ headers=SUBGRAPH_HEADERS,
44
+ json={"query": query},
45
+ timeout=300,
46
+ )
47
+
48
+ result_json = response.json()
49
+ print(f"Response of the query={result_json}")
50
+ blocks = result_json.get("data", {}).get("blocks", "")
51
+ return blocks[0]
52
+
53
+
54
+ def get_mech_info_last_60_days() -> dict[str, Any]:
55
+ """Query the subgraph to get the last 60 days of information from mech."""
56
+
57
+ timestamp_60_days_ago = int((DATETIME_60_DAYS_AGO).timestamp())
58
+ margin = timedelta(seconds=5)
59
+ timestamp_60_days_ago_plus_margin = int((DATETIME_60_DAYS_AGO + margin).timestamp())
60
+
61
+ last_month_block_number = fetch_block_number(
62
+ timestamp_60_days_ago, timestamp_60_days_ago_plus_margin
63
+ )
64
+ # expecting only one block
65
+ last_month_block_number = last_month_block_number.get("id", "")
66
+ if last_month_block_number.isdigit():
67
+ last_month_block_number = int(last_month_block_number)
68
+
69
+ if last_month_block_number == "":
70
+ raise ValueError("Could not find a valid block number for last month data")
71
+
72
+ MECH_TO_INFO = {
73
+ # this block number is when the creator had its first tx ever, and after this mech's creation
74
+ "0xff82123dfb52ab75c417195c5fdb87630145ae81": (
75
+ "old_mech_abi.json",
76
+ last_month_block_number,
77
+ ),
78
+ # this block number is when this mech was created
79
+ "0x77af31de935740567cf4ff1986d04b2c964a786a": (
80
+ "new_mech_abi.json",
81
+ last_month_block_number,
82
+ ),
83
+ }
84
+ return MECH_TO_INFO
85
+
86
+
87
+ if __name__ == "__main__":
88
+ result = get_mech_info_last_60_days()
89
+ print(result)
scripts/profitability.py CHANGED
@@ -28,6 +28,7 @@ from enum import Enum
28
  from tqdm import tqdm
29
  import numpy as np
30
  from pathlib import Path
 
31
 
32
  IRRELEVANT_TOOLS = [
33
  "openai-text-davinci-002",
@@ -59,6 +60,7 @@ SCRIPTS_DIR = Path(__file__).parent
59
  ROOT_DIR = SCRIPTS_DIR.parent
60
  DATA_DIR = ROOT_DIR / "data"
61
 
 
62
  class MarketState(Enum):
63
  """Market state"""
64
 
@@ -343,7 +345,6 @@ def wei_to_unit(wei: int) -> float:
343
  def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool:
344
  """Returns whether the user has redeemed the position."""
345
  user_positions = user_json["data"]["user"]["userPositions"]
346
- outcomes_tokens_traded = int(fpmmTrade["outcomeTokensTraded"])
347
  condition_id = fpmmTrade["fpmm.condition.id"]
348
 
349
  for position in user_positions:
@@ -358,12 +359,12 @@ def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool:
358
  return False
359
 
360
 
361
- def create_fpmmTrades(rpc: str):
362
  """Create fpmmTrades for all trades."""
363
  trades_json = _query_omen_xdai_subgraph(
364
- from_timestamp=DEFAULT_FROM_TIMESTAMP,
365
  to_timestamp=DEFAULT_TO_TIMESTAMP,
366
- fpmm_from_timestamp=DEFAULT_FROM_TIMESTAMP,
367
  fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP,
368
  )
369
 
@@ -384,18 +385,14 @@ def create_fpmmTrades(rpc: str):
384
  # change creator to creator_address
385
  df.rename(columns={"creator": "trader_address"}, inplace=True)
386
 
387
- # save to csv
388
- df.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False)
389
-
390
  return df
391
 
392
 
393
  def prepare_profitalibity_data(rpc: str):
394
  """Prepare data for profitalibity analysis."""
395
 
396
- # Check if tools.py is in the same directory
397
  try:
398
- # load tools.csv
399
  tools = pd.read_parquet(DATA_DIR / "tools.parquet")
400
 
401
  # make sure creator_address is in the columns
@@ -412,16 +409,18 @@ def prepare_profitalibity_data(rpc: str):
412
  print("tools.parquet not found. Please run tools.py first.")
413
  return
414
 
415
- # Check if fpmmTrades.csv is in the same directory
416
  try:
417
- # load fpmmTrades.csv
418
  fpmmTrades = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet")
419
  print("fpmmTrades.parquet loaded")
420
  except FileNotFoundError:
421
  print("fpmmTrades.parquet not found. Creating fpmmTrades.parquet...")
422
- fpmmTrades = create_fpmmTrades(rpc)
 
 
423
  fpmmTrades.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False)
424
- fpmmTrades = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet")
 
425
 
426
  # make sure trader_address is in the columns
427
  assert "trader_address" in fpmmTrades.columns, "trader_address column not found"
@@ -468,7 +467,7 @@ def analyse_trader(
468
  # Iterate over the trades
469
  for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"):
470
  try:
471
- if not trade['fpmm.currentAnswer']:
472
  print(f"Skipping trade {i} because currentAnswer is NaN")
473
  continue
474
  # Parsing and computing shared values
@@ -535,7 +534,8 @@ def analyse_trader(
535
  "num_mech_calls": num_mech_calls,
536
  "mech_fee_amount": num_mech_calls * DEFAULT_MECH_FEE,
537
  "net_earnings": net_earnings,
538
- "roi": net_earnings / (collateral_amount + fee_amount + num_mech_calls * DEFAULT_MECH_FEE),
 
539
  }
540
 
541
  except Exception as e:
@@ -613,7 +613,7 @@ def run_profitability_analysis(rpc):
613
  # load dfs from csv for analysis
614
  print("Preparing data...")
615
  fpmmTrades, tools = prepare_profitalibity_data(rpc)
616
- tools['trader_address'] = tools['trader_address'].str.lower()
617
 
618
  # all trades profitability df
619
  print("Analysing trades...")
 
28
  from tqdm import tqdm
29
  import numpy as np
30
  from pathlib import Path
31
+ from get_mech_info import DATETIME_60_DAYS_AGO
32
 
33
  IRRELEVANT_TOOLS = [
34
  "openai-text-davinci-002",
 
60
  ROOT_DIR = SCRIPTS_DIR.parent
61
  DATA_DIR = ROOT_DIR / "data"
62
 
63
+
64
  class MarketState(Enum):
65
  """Market state"""
66
 
 
345
  def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool:
346
  """Returns whether the user has redeemed the position."""
347
  user_positions = user_json["data"]["user"]["userPositions"]
 
348
  condition_id = fpmmTrade["fpmm.condition.id"]
349
 
350
  for position in user_positions:
 
359
  return False
360
 
361
 
362
+ def create_fpmmTrades(rpc: str, from_timestamp: float = DEFAULT_FROM_TIMESTAMP):
363
  """Create fpmmTrades for all trades."""
364
  trades_json = _query_omen_xdai_subgraph(
365
+ from_timestamp=from_timestamp,
366
  to_timestamp=DEFAULT_TO_TIMESTAMP,
367
+ fpmm_from_timestamp=from_timestamp,
368
  fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP,
369
  )
370
 
 
385
  # change creator to creator_address
386
  df.rename(columns={"creator": "trader_address"}, inplace=True)
387
 
 
 
 
388
  return df
389
 
390
 
391
  def prepare_profitalibity_data(rpc: str):
392
  """Prepare data for profitalibity analysis."""
393
 
394
+ # Check if tools.parquet is in the same directory
395
  try:
 
396
  tools = pd.read_parquet(DATA_DIR / "tools.parquet")
397
 
398
  # make sure creator_address is in the columns
 
409
  print("tools.parquet not found. Please run tools.py first.")
410
  return
411
 
412
+ # Check if fpmmTrades.parquet is in the same directory
413
  try:
 
414
  fpmmTrades = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet")
415
  print("fpmmTrades.parquet loaded")
416
  except FileNotFoundError:
417
  print("fpmmTrades.parquet not found. Creating fpmmTrades.parquet...")
418
+ # Prepare the same time window as used for the tools
419
+ timestamp_60_days_ago = (DATETIME_60_DAYS_AGO).timestamp()
420
+ fpmmTrades = create_fpmmTrades(rpc, from_timestamp=timestamp_60_days_ago)
421
  fpmmTrades.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False)
422
+ # This is not needed
423
+ # fpmmTrades = pd.read_parquet(DATA_DIR / "fpmmTrades.parquet")
424
 
425
  # make sure trader_address is in the columns
426
  assert "trader_address" in fpmmTrades.columns, "trader_address column not found"
 
467
  # Iterate over the trades
468
  for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"):
469
  try:
470
+ if not trade["fpmm.currentAnswer"]:
471
  print(f"Skipping trade {i} because currentAnswer is NaN")
472
  continue
473
  # Parsing and computing shared values
 
534
  "num_mech_calls": num_mech_calls,
535
  "mech_fee_amount": num_mech_calls * DEFAULT_MECH_FEE,
536
  "net_earnings": net_earnings,
537
+ "roi": net_earnings
538
+ / (collateral_amount + fee_amount + num_mech_calls * DEFAULT_MECH_FEE),
539
  }
540
 
541
  except Exception as e:
 
613
  # load dfs from csv for analysis
614
  print("Preparing data...")
615
  fpmmTrades, tools = prepare_profitalibity_data(rpc)
616
+ tools["trader_address"] = tools["trader_address"].str.lower()
617
 
618
  # all trades profitability df
619
  print("Analysing trades...")
scripts/pull_data.py CHANGED
@@ -19,6 +19,7 @@ from tools import (
19
  DEFAULT_FILENAME as TOOLS_FILENAME,
20
  )
21
  from profitability import run_profitability_analysis
 
22
  import gc
23
 
24
  logging.basicConfig(level=logging.INFO)
@@ -27,6 +28,7 @@ SCRIPTS_DIR = Path(__file__).parent
27
  ROOT_DIR = SCRIPTS_DIR.parent
28
  DATA_DIR = ROOT_DIR / "data"
29
 
 
30
  def get_question(text: str) -> str:
31
  """Get the question from a text."""
32
  # Regex to find text within double quotes
@@ -43,24 +45,26 @@ def get_question(text: str) -> str:
43
 
44
  def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]:
45
  """Get the current answer for a question."""
46
- row = fpmms[fpmms['title'] == text]
47
  if row.shape[0] == 0:
48
  return None
49
- return row['currentAnswer'].values[0]
50
 
51
 
52
  def block_number_to_timestamp(block_number: int, web3: Web3) -> str:
53
  """Convert a block number to a timestamp."""
54
  block = web3.eth.get_block(block_number)
55
- timestamp = datetime.utcfromtimestamp(block['timestamp'])
56
- return timestamp.strftime('%Y-%m-%d %H:%M:%S')
57
 
58
 
59
  def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list:
60
  """Parallelize the timestamp conversion."""
61
- block_numbers = df['request_block'].tolist()
62
  with ThreadPoolExecutor(max_workers=10) as executor:
63
- results = list(tqdm(executor.map(function, block_numbers), total=len(block_numbers)))
 
 
64
  return results
65
 
66
 
@@ -76,10 +80,11 @@ def weekly_analysis():
76
 
77
  # Run tools ETL
78
  logging.info("Running tools ETL")
 
 
79
  tools_etl(
80
  rpcs=[rpc],
81
  filename=TOOLS_FILENAME,
82
- full_contents=True,
83
  )
84
  logging.info("Tools ETL completed")
85
 
@@ -98,35 +103,48 @@ def weekly_analysis():
98
 
99
  # Get the question from the tools
100
  logging.info("Getting the question and current answer for the tools")
101
- tools['title'] = tools['prompt_request'].apply(lambda x: get_question(x))
102
- tools['currentAnswer'] = tools['title'].apply(lambda x: current_answer(x, fpmms))
103
 
104
- tools['currentAnswer'] = tools['currentAnswer'].str.replace('yes', 'Yes')
105
- tools['currentAnswer'] = tools['currentAnswer'].str.replace('no', 'No')
106
 
107
  # Convert block number to timestamp
108
  logging.info("Converting block number to timestamp")
109
  t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb"))
110
- tools['request_time'] = tools['request_block'].map(t_map)
111
 
112
  # Identify tools with missing request_time and fill them
113
- missing_time_indices = tools[tools['request_time'].isna()].index
114
  if not missing_time_indices.empty:
115
- partial_block_number_to_timestamp = partial(block_number_to_timestamp, web3=web3)
116
- missing_timestamps = parallelize_timestamp_conversion(tools.loc[missing_time_indices], partial_block_number_to_timestamp)
117
-
 
 
 
 
118
  # Update the original DataFrame with the missing timestamps
119
  for i, timestamp in zip(missing_time_indices, missing_timestamps):
120
- tools.at[i, 'request_time'] = timestamp
121
 
122
- tools['request_month_year'] = pd.to_datetime(tools['request_time']).dt.strftime('%Y-%m')
123
- tools['request_month_year_week'] = pd.to_datetime(tools['request_time']).dt.to_period('W').astype(str)
 
 
 
 
124
 
125
- # Save the tools
126
  tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False)
127
 
128
  # Update t_map with new timestamps
129
- new_timestamps = tools[['request_block', 'request_time']].dropna().set_index('request_block').to_dict()['request_time']
 
 
 
 
 
130
  t_map.update(new_timestamps)
131
 
132
  with open(DATA_DIR / "t_map.pkl", "wb") as f:
@@ -142,4 +160,3 @@ def weekly_analysis():
142
 
143
  if __name__ == "__main__":
144
  weekly_analysis()
145
-
 
19
  DEFAULT_FILENAME as TOOLS_FILENAME,
20
  )
21
  from profitability import run_profitability_analysis
22
+
23
  import gc
24
 
25
  logging.basicConfig(level=logging.INFO)
 
28
  ROOT_DIR = SCRIPTS_DIR.parent
29
  DATA_DIR = ROOT_DIR / "data"
30
 
31
+
32
  def get_question(text: str) -> str:
33
  """Get the question from a text."""
34
  # Regex to find text within double quotes
 
45
 
46
  def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]:
47
  """Get the current answer for a question."""
48
+ row = fpmms[fpmms["title"] == text]
49
  if row.shape[0] == 0:
50
  return None
51
+ return row["currentAnswer"].values[0]
52
 
53
 
54
  def block_number_to_timestamp(block_number: int, web3: Web3) -> str:
55
  """Convert a block number to a timestamp."""
56
  block = web3.eth.get_block(block_number)
57
+ timestamp = datetime.utcfromtimestamp(block["timestamp"])
58
+ return timestamp.strftime("%Y-%m-%d %H:%M:%S")
59
 
60
 
61
  def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list:
62
  """Parallelize the timestamp conversion."""
63
+ block_numbers = df["request_block"].tolist()
64
  with ThreadPoolExecutor(max_workers=10) as executor:
65
+ results = list(
66
+ tqdm(executor.map(function, block_numbers), total=len(block_numbers))
67
+ )
68
  return results
69
 
70
 
 
80
 
81
  # Run tools ETL
82
  logging.info("Running tools ETL")
83
+
84
+ # This etl is saving already the tools parquet file
85
  tools_etl(
86
  rpcs=[rpc],
87
  filename=TOOLS_FILENAME,
 
88
  )
89
  logging.info("Tools ETL completed")
90
 
 
103
 
104
  # Get the question from the tools
105
  logging.info("Getting the question and current answer for the tools")
106
+ tools["title"] = tools["prompt_request"].apply(lambda x: get_question(x))
107
+ tools["currentAnswer"] = tools["title"].apply(lambda x: current_answer(x, fpmms))
108
 
109
+ tools["currentAnswer"] = tools["currentAnswer"].str.replace("yes", "Yes")
110
+ tools["currentAnswer"] = tools["currentAnswer"].str.replace("no", "No")
111
 
112
  # Convert block number to timestamp
113
  logging.info("Converting block number to timestamp")
114
  t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb"))
115
+ tools["request_time"] = tools["request_block"].map(t_map)
116
 
117
  # Identify tools with missing request_time and fill them
118
+ missing_time_indices = tools[tools["request_time"].isna()].index
119
  if not missing_time_indices.empty:
120
+ partial_block_number_to_timestamp = partial(
121
+ block_number_to_timestamp, web3=web3
122
+ )
123
+ missing_timestamps = parallelize_timestamp_conversion(
124
+ tools.loc[missing_time_indices], partial_block_number_to_timestamp
125
+ )
126
+
127
  # Update the original DataFrame with the missing timestamps
128
  for i, timestamp in zip(missing_time_indices, missing_timestamps):
129
+ tools.at[i, "request_time"] = timestamp
130
 
131
+ tools["request_month_year"] = pd.to_datetime(tools["request_time"]).dt.strftime(
132
+ "%Y-%m"
133
+ )
134
+ tools["request_month_year_week"] = (
135
+ pd.to_datetime(tools["request_time"]).dt.to_period("W").astype(str)
136
+ )
137
 
138
+ # Save the tools data after the updates on the content
139
  tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False)
140
 
141
  # Update t_map with new timestamps
142
+ new_timestamps = (
143
+ tools[["request_block", "request_time"]]
144
+ .dropna()
145
+ .set_index("request_block")
146
+ .to_dict()["request_time"]
147
+ )
148
  t_map.update(new_timestamps)
149
 
150
  with open(DATA_DIR / "t_map.pkl", "wb") as f:
 
160
 
161
  if __name__ == "__main__":
162
  weekly_analysis()
 
scripts/tools.py CHANGED
@@ -20,22 +20,17 @@
20
  import json
21
  import os.path
22
  import re
23
- import sys
24
  import time
25
  import random
26
  from dataclasses import dataclass
27
  from enum import Enum
28
- from io import StringIO
29
  from typing import (
30
  Optional,
31
  List,
32
  Dict,
33
  Any,
34
  Union,
35
- Callable,
36
- Tuple,
37
  )
38
-
39
  import pandas as pd
40
  import requests
41
  from json.decoder import JSONDecodeError
@@ -56,8 +51,18 @@ from web3 import Web3, HTTPProvider
56
  from web3.exceptions import MismatchedABI
57
  from web3.types import BlockParams
58
  from concurrent.futures import ThreadPoolExecutor, as_completed
59
- from pathlib import Path
60
-
 
 
 
 
 
 
 
 
 
 
61
 
62
  CONTRACTS_PATH = "contracts"
63
  MECH_TO_INFO = {
@@ -71,14 +76,11 @@ LATEST_BLOCK: Optional[int] = None
71
  LATEST_BLOCK_NAME: BlockParams = "latest"
72
  BLOCK_DATA_NUMBER = "number"
73
  BLOCKS_CHUNK_SIZE = 10_000
74
- REDUCE_FACTOR = 0.25
75
  EVENT_ARGUMENTS = "args"
76
  DATA = "data"
77
  REQUEST_ID = "requestId"
78
- REQUEST_ID_FIELD = "request_id"
79
  REQUEST_SENDER = "sender"
80
  PROMPT_FIELD = "prompt"
81
- BLOCK_FIELD = "block"
82
  CID_PREFIX = "f01701220"
83
  HTTP = "http://"
84
  HTTPS = HTTP[:4] + "s" + HTTP[4:]
@@ -89,7 +91,6 @@ STATUS_FORCELIST = [404, 500, 502, 503, 504]
89
  DEFAULT_FILENAME = "tools.parquet"
90
  RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist."
91
  ABI_ERROR = "The event signature did not match the provided ABI"
92
- SLEEP = 0.5
93
  HTTP_TIMEOUT = 10
94
  N_IPFS_RETRIES = 1
95
  N_RPC_RETRIES = 100
@@ -109,13 +110,12 @@ IRRELEVANT_TOOLS = [
109
  "deepmind-optimization",
110
  ]
111
  # this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches
112
- # for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search, we also store the snapshot
 
113
  SNAPSHOT_RATE = 10
114
  NUM_WORKERS = 10
115
  GET_CONTENTS_BATCH_SIZE = 1000
116
- SCRIPTS_DIR = Path(__file__).parent
117
- ROOT_DIR = SCRIPTS_DIR.parent
118
- DATA_DIR = ROOT_DIR / "data"
119
 
120
  class MechEventName(Enum):
121
  """The mech's event names."""
@@ -289,31 +289,6 @@ EVENT_TO_MECH_STRUCT = {
289
  }
290
 
291
 
292
- def parse_args() -> str:
293
- """Parse the arguments and return the RPC."""
294
- if len(sys.argv) != 2:
295
- raise ValueError("Expected the RPC as a positional argument.")
296
- return sys.argv[1]
297
-
298
-
299
- def read_abi(abi_path: str) -> str:
300
- """Read and return the wxDAI contract's ABI."""
301
- with open(abi_path) as abi_file:
302
- return abi_file.read()
303
-
304
-
305
- def reduce_window(contract_instance, event, from_block, batch_size, latest_block):
306
- """Dynamically reduce the batch size window."""
307
- keep_fraction = 1 - REDUCE_FACTOR
308
- events_filter = contract_instance.events[event].build_filter()
309
- events_filter.fromBlock = from_block
310
- batch_size = int(batch_size * keep_fraction)
311
- events_filter.toBlock = min(from_block + batch_size, latest_block)
312
- tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.")
313
- time.sleep(SLEEP)
314
- return events_filter, batch_size
315
-
316
-
317
  def get_events(
318
  w3: Web3,
319
  event: str,
@@ -442,13 +417,6 @@ def request(
442
  return None
443
 
444
 
445
- def limit_text(text: str, limit: int = 200) -> str:
446
- """Limit the given text"""
447
- if len(text) > limit:
448
- return f"{text[:limit]}..."
449
- return text
450
-
451
-
452
  def parse_ipfs_response(
453
  session: requests.Session,
454
  url: str,
@@ -523,38 +491,12 @@ def get_contents(
523
  return pd.DataFrame(contents)
524
 
525
 
526
- def check_for_dicts(df: pd.DataFrame) -> List[str]:
527
- """Check for columns that contain dictionaries."""
528
- dict_columns = []
529
- for column in df.columns:
530
- if df[column].apply(lambda x: isinstance(x, dict)).any():
531
- dict_columns.append(column)
532
- return dict_columns
533
-
534
-
535
- def drop_dict_rows(df: pd.DataFrame,
536
- dict_columns: List[str]) -> pd.DataFrame:
537
- """Drop rows that contain dictionaries."""
538
- for column in dict_columns:
539
- df = df[~df[column].apply(lambda x: isinstance(x, dict))]
540
- return df
541
-
542
-
543
- def clean(df: pd.DataFrame) -> pd.DataFrame:
544
- """Clean the dataframe."""
545
- dict_columns = check_for_dicts(df)
546
- df = drop_dict_rows(df, dict_columns)
547
- cleaned = df.drop_duplicates()
548
- cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str")
549
- return cleaned
550
-
551
-
552
  def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
553
  """Transform the requests dataframe."""
554
  return clean(contents)
555
 
556
 
557
- def transform_deliver(contents: pd.DataFrame, full_contents=False) -> pd.DataFrame:
558
  """Transform the delivers dataframe."""
559
  unpacked_result = pd.json_normalize(contents.result)
560
  # # drop result column if it exists
@@ -578,55 +520,27 @@ def transform_deliver(contents: pd.DataFrame, full_contents=False) -> pd.DataFra
578
  return clean(contents)
579
 
580
 
581
- def gen_event_filename(event_name: MechEventName) -> str:
582
- """Generate the filename of an event."""
583
- return f"{event_name.value.lower()}s.parquet"
584
-
585
-
586
- def read_n_last_lines(filename: str, n: int = 1) -> str:
587
- """Return the `n` last lines' content of a file."""
588
- num_newlines = 0
589
- with open(filename, "rb") as f:
590
- try:
591
- f.seek(-2, os.SEEK_END)
592
- while num_newlines < n:
593
- f.seek(-2, os.SEEK_CUR)
594
- if f.read(1) == b"\n":
595
- num_newlines += 1
596
- except OSError:
597
- f.seek(0)
598
- last_line = f.readline().decode()
599
- return last_line
600
-
601
-
602
- def get_earliest_block(event_name: MechEventName) -> int:
603
- """Get the earliest block number to use when filtering for events."""
604
- filename = gen_event_filename(event_name)
605
- if not os.path.exists(DATA_DIR / filename):
606
- return 0
607
-
608
- df = pd.read_parquet(DATA_DIR / filename)
609
- block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}"
610
- return int(df[block_field].max())
611
-
612
-
613
  def store_progress(
614
  filename: str,
615
  event_to_contents: Dict[str, pd.DataFrame],
616
  tools: pd.DataFrame,
617
  ) -> None:
618
  """Store the given progress."""
619
- print("starting")
620
  if filename:
621
  DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
622
  for event_name, content in event_to_contents.items():
623
- event_filename = gen_event_filename(event_name) # Ensure this function returns a valid filename string
 
 
624
  try:
625
  if "result" in content.columns:
626
- content = content.drop(columns=["result"]) # Avoid in-place modification
 
 
627
  content.to_parquet(DATA_DIR / event_filename, index=False)
628
  except Exception as e:
629
- print(f"Failed to write {event_name}: {e}")
630
  # Drop result and error columns for tools DataFrame
631
  try:
632
  if "result" in tools.columns:
@@ -637,7 +551,8 @@ def store_progress(
637
 
638
 
639
  def etl(
640
- rpcs: List[str], filename: Optional[str] = None, full_contents: bool = True
 
641
  ) -> pd.DataFrame:
642
  """Fetch from on-chain events, process, store and return the tools' results on all the questions as a Dataframe."""
643
  w3s = [Web3(HTTPProvider(r)) for r in rpcs]
@@ -646,13 +561,15 @@ def etl(
646
  MechEventName.REQUEST: transform_request,
647
  MechEventName.DELIVER: transform_deliver,
648
  }
 
649
  mech_to_info = {
650
  to_checksum_address(address): (
651
  os.path.join(CONTRACTS_PATH, filename),
652
  earliest_block,
653
  )
654
- for address, (filename, earliest_block) in MECH_TO_INFO.items()
655
  }
 
656
  event_to_contents = {}
657
 
658
  latest_block = LATEST_BLOCK
@@ -663,17 +580,13 @@ def etl(
663
 
664
  # Loop through events in event_to_transformer
665
  for event_name, transformer in event_to_transformer.items():
666
- if next_start_block is None:
667
- next_start_block_base = get_earliest_block(event_name)
668
 
669
  # Loop through mech addresses in mech_to_info
670
  events = []
671
  for address, (abi, earliest_block) in mech_to_info.items():
672
- if next_start_block_base == 0:
673
- next_start_block = earliest_block
674
- else:
675
- next_start_block = next_start_block_base
676
-
677
  print(
678
  f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}."
679
  )
@@ -704,6 +617,7 @@ def etl(
704
  current_mech_events = future.result()
705
  events.extend(current_mech_events)
706
 
 
707
  parsed = parse_events(events)
708
 
709
  contents = []
@@ -729,31 +643,28 @@ def etl(
729
 
730
  contents = pd.concat(contents, ignore_index=True)
731
 
732
- full_contents = True
733
- if event_name == MechEventName.REQUEST:
734
- transformed = transformer(contents)
735
- elif event_name == MechEventName.DELIVER:
736
- transformed = transformer(contents, full_contents=full_contents)
737
-
738
- events_filename = gen_event_filename(event_name)
739
 
740
- if os.path.exists(DATA_DIR / events_filename):
741
- old = pd.read_parquet(DATA_DIR / events_filename)
 
742
 
743
- # Reset index to avoid index conflicts
744
- old.reset_index(drop=True, inplace=True)
745
- transformed.reset_index(drop=True, inplace=True)
746
 
747
- # Concatenate DataFrames
748
- transformed = pd.concat([old, transformed], ignore_index=True)
749
 
750
- # Drop duplicates if necessary
751
- transformed.drop_duplicates(subset=REQUEST_ID_FIELD, inplace=True)
752
 
753
  event_to_contents[event_name] = transformed.copy()
754
 
755
  # Store progress
756
  tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD)
 
 
757
  store_progress(filename, event_to_contents, tools)
758
 
759
  return tools
@@ -763,5 +674,6 @@ if __name__ == "__main__":
763
  RPCs = [
764
  "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a",
765
  ]
 
766
 
767
- tools = etl(rpcs=RPCs, filename=DEFAULT_FILENAME, full_contents=True)
 
20
  import json
21
  import os.path
22
  import re
 
23
  import time
24
  import random
25
  from dataclasses import dataclass
26
  from enum import Enum
 
27
  from typing import (
28
  Optional,
29
  List,
30
  Dict,
31
  Any,
32
  Union,
 
 
33
  )
 
34
  import pandas as pd
35
  import requests
36
  from json.decoder import JSONDecodeError
 
51
  from web3.exceptions import MismatchedABI
52
  from web3.types import BlockParams
53
  from concurrent.futures import ThreadPoolExecutor, as_completed
54
+ from get_mech_info import get_mech_info_last_60_days
55
+ from utils import (
56
+ clean,
57
+ BLOCK_FIELD,
58
+ gen_event_filename,
59
+ read_abi,
60
+ SLEEP,
61
+ reduce_window,
62
+ limit_text,
63
+ DATA_DIR,
64
+ REQUEST_ID_FIELD,
65
+ )
66
 
67
  CONTRACTS_PATH = "contracts"
68
  MECH_TO_INFO = {
 
76
  LATEST_BLOCK_NAME: BlockParams = "latest"
77
  BLOCK_DATA_NUMBER = "number"
78
  BLOCKS_CHUNK_SIZE = 10_000
 
79
  EVENT_ARGUMENTS = "args"
80
  DATA = "data"
81
  REQUEST_ID = "requestId"
 
82
  REQUEST_SENDER = "sender"
83
  PROMPT_FIELD = "prompt"
 
84
  CID_PREFIX = "f01701220"
85
  HTTP = "http://"
86
  HTTPS = HTTP[:4] + "s" + HTTP[4:]
 
91
  DEFAULT_FILENAME = "tools.parquet"
92
  RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist."
93
  ABI_ERROR = "The event signature did not match the provided ABI"
 
94
  HTTP_TIMEOUT = 10
95
  N_IPFS_RETRIES = 1
96
  N_RPC_RETRIES = 100
 
110
  "deepmind-optimization",
111
  ]
112
  # this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches
113
+ # for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search,
114
+ # we also store the snapshot
115
  SNAPSHOT_RATE = 10
116
  NUM_WORKERS = 10
117
  GET_CONTENTS_BATCH_SIZE = 1000
118
+
 
 
119
 
120
  class MechEventName(Enum):
121
  """The mech's event names."""
 
289
  }
290
 
291
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292
  def get_events(
293
  w3: Web3,
294
  event: str,
 
417
  return None
418
 
419
 
 
 
 
 
 
 
 
420
  def parse_ipfs_response(
421
  session: requests.Session,
422
  url: str,
 
491
  return pd.DataFrame(contents)
492
 
493
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
494
  def transform_request(contents: pd.DataFrame) -> pd.DataFrame:
495
  """Transform the requests dataframe."""
496
  return clean(contents)
497
 
498
 
499
+ def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame:
500
  """Transform the delivers dataframe."""
501
  unpacked_result = pd.json_normalize(contents.result)
502
  # # drop result column if it exists
 
520
  return clean(contents)
521
 
522
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
523
  def store_progress(
524
  filename: str,
525
  event_to_contents: Dict[str, pd.DataFrame],
526
  tools: pd.DataFrame,
527
  ) -> None:
528
  """Store the given progress."""
529
+ print("storing given progress")
530
  if filename:
531
  DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
532
  for event_name, content in event_to_contents.items():
533
+ event_filename = gen_event_filename(
534
+ event_name
535
+ ) # Ensure this function returns a valid filename string
536
  try:
537
  if "result" in content.columns:
538
+ content = content.drop(
539
+ columns=["result"]
540
+ ) # Avoid in-place modification
541
  content.to_parquet(DATA_DIR / event_filename, index=False)
542
  except Exception as e:
543
+ print(f"Failed to write {event_name} data: {e}")
544
  # Drop result and error columns for tools DataFrame
545
  try:
546
  if "result" in tools.columns:
 
551
 
552
 
553
  def etl(
554
+ rpcs: List[str],
555
+ filename: Optional[str] = None,
556
  ) -> pd.DataFrame:
557
  """Fetch from on-chain events, process, store and return the tools' results on all the questions as a Dataframe."""
558
  w3s = [Web3(HTTPProvider(r)) for r in rpcs]
 
561
  MechEventName.REQUEST: transform_request,
562
  MechEventName.DELIVER: transform_deliver,
563
  }
564
+
565
  mech_to_info = {
566
  to_checksum_address(address): (
567
  os.path.join(CONTRACTS_PATH, filename),
568
  earliest_block,
569
  )
570
+ for address, (filename, earliest_block) in get_mech_info_last_60_days().items()
571
  }
572
+
573
  event_to_contents = {}
574
 
575
  latest_block = LATEST_BLOCK
 
580
 
581
  # Loop through events in event_to_transformer
582
  for event_name, transformer in event_to_transformer.items():
583
+ # if next_start_block is None:
584
+ # next_start_block_base = get_earliest_block(event_name)
585
 
586
  # Loop through mech addresses in mech_to_info
587
  events = []
588
  for address, (abi, earliest_block) in mech_to_info.items():
589
+ next_start_block = earliest_block
 
 
 
 
590
  print(
591
  f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}."
592
  )
 
617
  current_mech_events = future.result()
618
  events.extend(current_mech_events)
619
 
620
+ print("Parsing events")
621
  parsed = parse_events(events)
622
 
623
  contents = []
 
643
 
644
  contents = pd.concat(contents, ignore_index=True)
645
 
646
+ transformed = transformer(contents)
 
 
 
 
 
 
647
 
648
+ # Remove appending data, always new files
649
+ # if os.path.exists(DATA_DIR / events_filename):
650
+ # old = pd.read_parquet(DATA_DIR / events_filename)
651
 
652
+ # # Reset index to avoid index conflicts
653
+ # old.reset_index(drop=True, inplace=True)
654
+ # transformed.reset_index(drop=True, inplace=True)
655
 
656
+ # # Concatenate DataFrames
657
+ # transformed = pd.concat([old, transformed], ignore_index=True)
658
 
659
+ # # Drop duplicates if necessary
660
+ # transformed.drop_duplicates(subset=REQUEST_ID_FIELD, inplace=True)
661
 
662
  event_to_contents[event_name] = transformed.copy()
663
 
664
  # Store progress
665
  tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD)
666
+ print(tools.info())
667
+
668
  store_progress(filename, event_to_contents, tools)
669
 
670
  return tools
 
674
  RPCs = [
675
  "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a",
676
  ]
677
+ filename = DEFAULT_FILENAME
678
 
679
+ tools = etl(rpcs=RPCs, filename=filename)
scripts/utils.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ import os
3
+ import time
4
+ from tqdm import tqdm
5
+ from tools import MechEventName
6
+ from typing import List
7
+ import pandas as pd
8
+ import gc
9
+ from pathlib import Path
10
+
11
+ REDUCE_FACTOR = 0.25
12
+ SLEEP = 0.5
13
+ REQUEST_ID_FIELD = "request_id"
14
+ SCRIPTS_DIR = Path(__file__).parent
15
+ ROOT_DIR = SCRIPTS_DIR.parent
16
+ DATA_DIR = ROOT_DIR / "data"
17
+ BLOCK_FIELD = "block"
18
+
19
+
20
+ def parse_args() -> str:
21
+ """Parse the arguments and return the RPC."""
22
+ if len(sys.argv) != 2:
23
+ raise ValueError("Expected the RPC as a positional argument.")
24
+ return sys.argv[1]
25
+
26
+
27
+ def read_abi(abi_path: str) -> str:
28
+ """Read and return the wxDAI contract's ABI."""
29
+ with open(abi_path) as abi_file:
30
+ return abi_file.read()
31
+
32
+
33
+ def reduce_window(contract_instance, event, from_block, batch_size, latest_block):
34
+ """Dynamically reduce the batch size window."""
35
+ keep_fraction = 1 - REDUCE_FACTOR
36
+ events_filter = contract_instance.events[event].build_filter()
37
+ events_filter.fromBlock = from_block
38
+ batch_size = int(batch_size * keep_fraction)
39
+ events_filter.toBlock = min(from_block + batch_size, latest_block)
40
+ tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.")
41
+ time.sleep(SLEEP)
42
+ return events_filter, batch_size
43
+
44
+
45
+ def limit_text(text: str, limit: int = 200) -> str:
46
+ """Limit the given text"""
47
+ if len(text) > limit:
48
+ return f"{text[:limit]}..."
49
+ return text
50
+
51
+
52
+ def check_for_dicts(df: pd.DataFrame) -> List[str]:
53
+ """Check for columns that contain dictionaries."""
54
+ dict_columns = []
55
+ for column in df.columns:
56
+ if df[column].apply(lambda x: isinstance(x, dict)).any():
57
+ dict_columns.append(column)
58
+ return dict_columns
59
+
60
+
61
+ def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame:
62
+ """Drop rows that contain dictionaries."""
63
+ for column in dict_columns:
64
+ df = df[~df[column].apply(lambda x: isinstance(x, dict))]
65
+ return df
66
+
67
+
68
+ def clean(df: pd.DataFrame) -> pd.DataFrame:
69
+ """Clean the dataframe."""
70
+ dict_columns = check_for_dicts(df)
71
+ df = drop_dict_rows(df, dict_columns)
72
+ cleaned = df.drop_duplicates()
73
+ cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str")
74
+ return cleaned
75
+
76
+
77
+ def gen_event_filename(event_name: MechEventName) -> str:
78
+ """Generate the filename of an event."""
79
+ return f"{event_name.value.lower()}s.parquet"
80
+
81
+
82
+ def read_n_last_lines(filename: str, n: int = 1) -> str:
83
+ """Return the `n` last lines' content of a file."""
84
+ num_newlines = 0
85
+ with open(filename, "rb") as f:
86
+ try:
87
+ f.seek(-2, os.SEEK_END)
88
+ while num_newlines < n:
89
+ f.seek(-2, os.SEEK_CUR)
90
+ if f.read(1) == b"\n":
91
+ num_newlines += 1
92
+ except OSError:
93
+ f.seek(0)
94
+ last_line = f.readline().decode()
95
+ return last_line
96
+
97
+
98
+ def get_earliest_block(event_name: MechEventName) -> int:
99
+ """Get the earliest block number to use when filtering for events."""
100
+ filename = gen_event_filename(event_name)
101
+ if not os.path.exists(DATA_DIR / filename):
102
+ return 0
103
+
104
+ df = pd.read_parquet(DATA_DIR / filename)
105
+ block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}"
106
+ earliest_block = int(df[block_field].max())
107
+ # clean and release all memory
108
+ del df
109
+ gc.collect()
110
+ return earliest_block