Optimus-Agent-Performance / initial_value_fixer.py
gauravlochab
fix: initial value correction and adding OLAS transcations
8cfb88e
# Sample CSV values:
# apr,adjusted_apr,timestamp,portfolio_snapshot,calculation_metrics,roi,agent_id,is_dummy,address,agent_name,metric_type,first_investment_timestamp,agent_hash,volume,trading_type,selected_protocols
# -0.03,1.75,2025-05-15 21:37:27.000000,"{'portfolio': {'portfolio_value': 29.34506065817397, 'allocations': [{'chain': 'optimism', 'type': 'velodrome', 'id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'assets': ['FRAX', 'alUSD'], 'apr': 11.9, 'details': 'Velodrome Pool', 'ratio': 100.0, 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}], 'portfolio_breakdown': [{'asset': 'FRAX', 'address': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'balance': 12.498312351563191, 'price': 0.999924, 'value_usd': 12.497362479824472, 'ratio': 0.425876}, {'asset': 'alUSD', 'address': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'balance': 17.023792285753334, 'price': 0.989656, 'value_usd': 16.8476981783495, 'ratio': 0.574124}], 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}, 'positons': [{'chain': 'optimism', 'pool_address': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'dex_type': 'velodrome', 'token0': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'token1': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'token0_symbol': 'FRAX', 'token1_symbol': 'alUSD', 'apr': 11.901789131732096, 'pool_id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'is_stable': True, 'is_cl_pool': False, 'amount0': 12549523370531409633, 'amount1': 16972223462662011900, 'timestamp': 1747319387, 'status': 'open', 'tx_hash': '0xb487bb4a45bcd7bb3b9e9e3fabe76bf6594828091598ffab69704754b4c8bea8'}]}","{'initial_value': 29.353178464538146, 'final_value': 29.34506065817397, 'f_i_ratio': -0.0002765562977782299, 'last_investment_timestamp': 1747319387, 'time_ratio': 5380.753851502806}",-0.0002765562977782299,86,False,0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758,nusus-tayar25,APR,,,,,
# Parse the optimus_apr_values.csv file
# Iterate on the rows: For each row:
# Parse address, final_value
# Compute initial_value using the parsed address similar to an Optimus function
# Compute the APR and ROI similar to an Optimus function
# Write the row with initial_value, APR, and ROI to a new CSV file
from datetime import datetime
from decimal import Decimal
import json
import logging
import os
import time
from typing import Dict, Optional, Tuple, List
from pandas import DataFrame
import requests
from web3 import Web3
ETHERSCAN_API_KEY = ""
EXCLUDED_ADDRESSES = { # Testnet agents of Gaurav, Divya, and Priyanshu
"0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7DC8", # agent_id 84
"0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7CB5", # agent_id 86
"0x3B3AbC1604fAd139F841Da5c3Cad73a72621fee4", # agent_id 102
}
COINGECKO_PRICE_API_URL = "https://api.coingecko.com/api/v3/coins/{coin_id}/history?date={date}}"
WHITELISTED_TOKENS = {
# Optimism tokens - stablecoins
"0x0b2c639c533813f4aa9d7837caf62653d097ff85": ("USDC", 6),
"0x01bff41798a0bcf287b996046ca68b395dbc1071": ("USDT0", 6),
"0x94b008aa00579c1307b0ef2c499ad98a8ce58e58": ("USDT", 6),
"0x7f5c764cbc14f9669b88837ca1490cca17c31607": ("USDC.e", 6),
"0x8ae125e8653821e851f12a49f7765db9a9ce7384": ("DOLA", 18),
"0xc40f949f8a4e094d1b49a23ea9241d289b7b2819": ("LUSD", 18),
"0xda10009cbd5d07dd0cecc66161fc93d7c9000da1": ("DAI", 18),
"0x087c440f251ff6cfe62b86dde1be558b95b4bb9b": ("BOLD", 18),
"0x2e3d870790dc77a83dd1d18184acc7439a53f475": ("FRAX", 18),
"0x2218a117083f5b482b0bb821d27056ba9c04b1d3": ("sDAI", 18),
"0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189": ("oUSDT", 6),
"0x4f604735c1cf31399c6e711d5962b2b3e0225ad3": ("USDGLO", 18),
"0xFC2E6e6BCbd49ccf3A5f029c79984372DcBFE527": ("OLAS", 18)
}
COIN_ID_MAPPING = {
"usdc": "usd-coin",
"alusd": "alchemix-usd",
"usdt0": "usdt0",
"usdt": "bridged-usdt",
"usdc.e": "bridged-usd-coin-optimism",
"usx": "token-dforce-usd",
"dola": "dola-usd",
"lusd": "liquity-usd",
"dai": "makerdao-optimism-bridged-dai-optimism",
"bold": "liquity-bold",
"frax": "frax",
"sdai": "savings-dai",
"usd+": "overnight-fi-usd-optimism",
"ousdt": "openusdt",
"usdglo": "glo-dollar",
"olas": "autonolas"
}
ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
w3 = Web3(Web3.HTTPProvider("https://rpc-gate.autonolas.tech/optimism-rpc/"))
def get_coin_id_from_symbol(symbol: str, chain: str) -> Optional[str]:
"""Map token symbol to CoinGecko ID."""
if chain == "optimism":
coin_id_map = {
"USDC": "usd-coin",
"ALUSD": "alchemix-usd",
"USDT0": "usdt0",
"USDT": "bridged-usdt",
"MSUSD": None,
"USDC.E": "bridged-usd-coin-optimism",
"USX": "token-dforce-usd",
"DOLA": "dola-usd",
"LUSD": "liquity-usd",
"DAI": "makerdao-optimism-bridged-dai-optimism",
"BOLD": "liquity-bold",
"FRAX": "frax",
"SDAI": "savings-dai",
"USD+": "overnight-fi-usd-optimism",
"OUSDT": "openusdt",
"USDGLO": "glo-dollar",
"ETH": "ethereum",
"WETH": "ethereum",
"WBTC": "wrapped-bitcoin",
}
return coin_id_map.get(symbol.upper())
return None
def load_cache(name: str) -> Dict:
"""Load price cache from JSON file."""
cache_file = f"{name}_cache.json"
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
logger.warning("Cache file corrupted, creating new cache")
return {}
return {}
def save_cache(name: str, cache: Dict):
"""Save price cache to JSON file."""
cache_file = f"{name}_cache.json"
with open(cache_file, 'w') as f:
json.dump(cache, f, indent=2)
def get_cached_price(date_str: str, token_symbol: str) -> Optional[float]:
"""Get price from cache if available."""
cache = load_cache(name="price")
return cache.get(date_str, {}).get(token_symbol)
def update_price_cache(date_str: str, token_symbol: str, price: float):
"""Update price cache with new value."""
cache = load_cache(name="price")
if date_str not in cache:
cache[date_str] = {}
cache[date_str][token_symbol] = price
save_cache(name="price", cache=cache)
def get_cached_request(cache_key: str) -> Optional[Dict]:
"""Get cached request response if available."""
cache = load_cache(name="request")
return cache.get(cache_key)
def update_request_cache(cache_key: str, response: Dict):
"""Update request cache with new response."""
cache = load_cache(name="request")
cache[cache_key] = response
save_cache(name="request", cache=cache)
def fetch_historical_eth_price(date_str: str) -> float:
"""Fetch historical ETH price from CoinGecko with caching."""
# Check cache first
cached_price = get_cached_price(date_str, "ETH")
if cached_price is not None:
return cached_price
try:
url = "https://api.coingecko.com/api/v3/coins/ethereum/history"
params = {"date": date_str, "localization": "false"}
# Add delay to respect rate limits
time.sleep(1.2)
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if "market_data" in data and "current_price" in data["market_data"]:
price = data["market_data"]["current_price"]["usd"]
# Update cache
update_price_cache(date_str, "ETH", price)
return price
return 0.0
except Exception as e:
print(f"Error fetching ETH price for {date_str}: {str(e)}")
return 0.0
def fetch_historical_token_price(coin_id: str, date_str: str, token_symbol: str) -> float:
"""Fetch historical token price from CoinGecko with caching."""
# Check cache first
cached_price = get_cached_price(date_str, token_symbol)
if cached_price is not None:
return cached_price
try:
success, data = request_with_retries(
endpoint=f"https://api.coingecko.com/api/v3/coins/{coin_id}/history",
params={"date": date_str, "localization": "false"},
)
if not success:
logger.error(f"Failed to fetch historical price for {coin_id} on {date_str}")
return 0.0
# Add delay to respect rate limits
time.sleep(1.2)
if "market_data" in data and "current_price" in data["market_data"]:
price = data["market_data"]["current_price"]["usd"]
# Update cache
update_price_cache(date_str, token_symbol, price)
return price
return 0.0
except Exception as e:
print(f"Error fetching price for {coin_id} on {date_str}: {str(e)}")
return 0.0
def get_block_at_timestamp(
timestamp: int,
chain: str = "optimism"
) -> Optional[int]:
success, res = request_with_retries(
endpoint=f"https://api-optimistic.etherscan.io/api?module=block&action=getblocknobytime&timestamp={timestamp}&closest=before&apikey={ETHERSCAN_API_KEY}",
)
if success and res.get("status") == "1" and "result" in res:
return int(res.get("result"))
else:
logger.error(f"Failed to fetch block at timestamp {timestamp} for {chain}: {res.get('message', 'Unknown error')}")
return None
def fetch_eth_balance(address: str, timestamp: float) -> float:
key = "eth_balance"
cache = load_cache(name=key)
if f"{address}_{timestamp}" in cache:
return cache[f"{address}_{timestamp}"] / (10 ** 18)
balance = w3.eth.get_balance(
account=Web3.to_checksum_address(address),
block_identifier=get_block_at_timestamp(int(timestamp))
)
cache[f"{address}_{timestamp}"] = balance
save_cache(name=key, cache=cache)
return balance / (10 ** 18)
def fetch_token_balance(
address: str,
token_address: str,
timestamp: int,
decimals: int = 18
) -> Optional[float]:
contract = w3.eth.contract(
address=Web3.to_checksum_address(token_address),
abi=[
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function",
}
]
)
try:
cache_key = f"token_balance_{address}_{token_address}_{timestamp}"
cache = load_cache(name="token_balance")
if cache_key in cache:
return cache[cache_key] / (10 ** decimals)
balance = contract.functions.balanceOf(address).call(block_identifier=get_block_at_timestamp(int(timestamp)))
cache[cache_key] = balance
save_cache(name="token_balance", cache=cache)
return balance / (10 ** decimals) if balance else 0.0
except Exception as e:
logger.error(f"Error fetching token balance for {address} at {timestamp}: {e}")
return None
def get_datetime_from_timestamp(timestamp: str) -> Optional[datetime]:
"""Convert timestamp string to datetime object."""
try:
return datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
except (ValueError, TypeError):
logger.warning(f"Invalid timestamp format: {timestamp}")
return None
def request_with_retries(
endpoint: str,
params: Dict = None,
headers: Dict = None,
method: str = "GET",
body: Dict = None,
rate_limited_code: int = 429,
retry_wait: int = 5,
max_retries: int = 3
) -> Tuple[bool, Dict]:
for attempt in range(max_retries):
try:
if method.upper() == "POST":
cache_key = f"POST_{endpoint}_{str(body or {})}"
cached_response = get_cached_request(cache_key)
if cached_response is not None:
return len(cached_response) > 0, cached_response
response = requests.post(endpoint, headers=headers, json=body)
if response.ok:
update_request_cache(cache_key, response.json())
else:
# Check cache first for GET requests
cache_key = f"{endpoint}_{str(params or {})}"
cached_response = get_cached_request(cache_key)
if cached_response is not None:
return len(cached_response) > 0, cached_response
response = requests.get(endpoint, headers=headers, params=params or {})
# Cache successful responses
if response.status_code == 200:
update_request_cache(cache_key, response.json())
elif response.status_code == 404:
update_request_cache(cache_key, {})
if response.status_code == rate_limited_code:
logger.warning(f"Rate limited. Waiting {retry_wait} seconds...")
time.sleep(retry_wait)
continue
if response.status_code != 200:
logger.error(f"Request failed with status {response.status_code}")
return False, {}
return True, response.json()
except Exception as e:
logger.error(f"Request failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(retry_wait)
continue
return False, {}
return False, {}
def should_include_transfer_optimism(
from_address: str
) -> bool:
"""Determine if an Optimism transfer should be included based on from address type."""
if not from_address:
return False
# Exclude zero address
if from_address.lower() in [
"0x0000000000000000000000000000000000000000",
"0x0",
"",
]:
return False
try:
# Use Optimism RPC to check if address is a contract
payload = {
"jsonrpc": "2.0",
"method": "eth_getCode",
"params": [from_address, "latest"],
"id": 1,
}
success, result = request_with_retries(
endpoint="https://mainnet.optimism.io",
method="POST",
headers={"Content-Type": "application/json"},
body=payload,
rate_limited_code=429,
retry_wait=5,
)
if not success:
logger.error("Failed to check contract code")
return False
code = result.get("result", "0x")
# If code is '0x', it's an EOA
if code == "0x":
return True
# If it has code, check if it's a GnosisSafe
safe_check_url = f"https://safe-transaction-optimism.safe.global/api/v1/safes/{from_address}/"
success, _ = request_with_retries(
endpoint=safe_check_url,
headers={"Accept": "application/json"},
rate_limited_code=429,
retry_wait=5,
)
if success:
return True
logger.info(
f"Excluding transfer from contract: {from_address}"
)
return False
except Exception as e:
logger.error(f"Error checking address {from_address}: {e}")
return False
def fetch_optimism_incoming_transfers(
address: str,
last_timestamp: int
) -> Dict:
"""
Fetch Optimism transfers for a given address with improved error handling and rate limiting.
"""
base_url = "https://safe-transaction-optimism.safe.global/api/v1"
all_transfers_by_date = {}
try:
logger.info(f"Fetching Optimism transfers for address {address}...")
# Fetch incoming transfers
transfers_url = f"{base_url}/safes/{address}/incoming-transfers/"
processed_count = 0
page_count = 0
max_pages = 10 # Limit to prevent infinite loops
while page_count < max_pages:
page_count += 1
logger.info(f"Fetching page {page_count} for address {address}")
success, response_json = request_with_retries(
endpoint=transfers_url,
headers={"Accept": "application/json"},
rate_limited_code=429,
retry_wait=5
)
if not success:
logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}")
break
transfers = response_json.get("results", [])
if not transfers:
logger.info(f"No more transfers found for address {address} on page {page_count}")
break
print("incoming transfers",response_json)
for transfer in transfers:
# Parse timestamp
timestamp = transfer.get("executionDate")
if not timestamp:
continue
tx_datetime = get_datetime_from_timestamp(timestamp)
tx_date = tx_datetime.strftime("%Y-%m-%d") if tx_datetime else None
if not tx_date:
continue
if tx_datetime.timestamp() > last_timestamp:
continue
# Process the transfer
from_address = transfer.get("from", address)
transfer_type = transfer.get("type", "")
if from_address.lower() == address.lower():
continue
# Initialize date in transfers dict if not exists
if tx_date not in all_transfers_by_date:
all_transfers_by_date[tx_date] = []
should_include = should_include_transfer_optimism(from_address)
if not should_include:
continue
# Process different transfer types
if transfer_type == "ERC20_TRANSFER":
# Token transfer
token_info = transfer.get("tokenInfo", {})
token_address = transfer.get("tokenAddress", "")
if not token_info:
if not token_address:
continue
# You might want to add token decimal and symbol fetching here
symbol = "Unknown"
decimals = 18
else:
symbol = token_info.get("symbol", "Unknown")
decimals = int(token_info.get("decimals", 18) or 18)
if symbol.lower() not in ["usdc", "eth"]:
continue
value_raw = int(transfer.get("value", "0") or "0")
amount = value_raw / (10**decimals)
transfer_data = {
"from_address": from_address,
"amount": amount,
"token_address": token_address,
"symbol": symbol,
"timestamp": timestamp,
"tx_hash": transfer.get("transactionHash", ""),
"type": "token"
}
elif transfer_type == "ETHER_TRANSFER":
# ETH transfer
try:
value_wei = int(transfer.get("value", "0") or "0")
amount_eth = value_wei / 10**18
if amount_eth <= 0:
continue
except (ValueError, TypeError):
logger.warning(f"Skipping transfer with invalid value: {transfer.get('value')}")
continue
transfer_data = {
"from_address": from_address,
"amount": amount_eth,
"token_address": "",
"symbol": "ETH",
"timestamp": timestamp,
"tx_hash": transfer.get("transactionHash", ""),
"type": "eth"
}
else:
# Skip other transfer types
continue
all_transfers_by_date[tx_date].append(transfer_data)
processed_count += 1
# Show progress
if processed_count % 50 == 0:
logger.info(f"Processed {processed_count} Optimism transfers for address {address}...")
# Check for next page
cursor = response_json.get("next")
if not cursor:
logger.info(f"No more pages for address {address}")
break
else:
transfers_url = cursor # Update URL for next page
logger.info(f"Completed Optimism transfers for address {address}: {processed_count} found")
return all_transfers_by_date
except Exception as e:
logger.error(f"Error fetching Optimism transfers for address {address}: {e}")
return {}
def fetch_optimism_outgoing_transfers(
address: str,
final_timestamp: int,
from_address: str,
) -> Dict:
"""Fetch all outgoing transfers from the safe address on Optimism until a specific date.
Args:
address: The safe address to fetch transfers for
final_timestamp: The timestamp until which to fetch transfers
from_address: The master address to check for reversions
Returns:
Dict: Dictionary of transfers organized by date
"""
all_transfers = {}
if not address:
logger.warning(
"No address provided for fetching Optimism outgoing transfers"
)
return all_transfers
try:
# Use SafeGlobal API for Optimism transfers
base_url = "https://safe-transaction-optimism.safe.global/api/v1"
transfers_url = f"{base_url}/safes/{address}/transfers/"
processed_count = 0
page_count = 0
max_pages = 50 # Increased limit to handle more transactions
while page_count < max_pages:
page_count += 1
logger.info(f"Fetching outgoing transfers page {page_count} for address {address}")
success, response_json = request_with_retries(
endpoint=transfers_url,
headers={"Accept": "application/json"},
rate_limited_code=429,
retry_wait=5,
)
if not success:
logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}")
break
transfers = response_json.get("results", [])
if not transfers:
logger.info(f"No more transfers found for address {address} on page {page_count}")
break
print("outgoing_transfers", response_json)
for transfer in transfers:
# Parse timestamp
timestamp = transfer.get("executionDate")
if not timestamp:
continue
# Handle ISO format timestamp
try:
tx_datetime = datetime.fromisoformat(
timestamp.replace("Z", "+00:00")
)
tx_date = tx_datetime.strftime("%Y-%m-%d")
except (ValueError, TypeError):
logger.warning(
f"Invalid timestamp format: {timestamp}"
)
continue
if tx_datetime.timestamp() > final_timestamp:
continue
# Only process outgoing transfers (where from address is equal to our safe address)
# OR transfers going back to the master address (reversions)
if transfer.get("from").lower() == address.lower():
transfer_type = transfer.get("type", "")
if transfer_type == "ETHER_TRANSFER":
try:
value_wei = int(transfer.get("value", "0") or "0")
amount_eth = value_wei / 10**18
if amount_eth <= 0:
continue
except (ValueError, TypeError):
continue
transfer_data = {
"from_address": address,
"to_address": transfer.get("to"),
"amount": amount_eth,
"token_address": ZERO_ADDRESS,
"symbol": "ETH",
"timestamp": timestamp,
"tx_hash": transfer.get("transactionHash", ""),
"type": "eth",
}
if tx_date not in all_transfers:
all_transfers[tx_date] = []
all_transfers[tx_date].append(transfer_data)
processed_count += 1
# Also process ERC20 transfers for completeness
token_info = transfer.get("tokenInfo", {})
token_address = transfer.get("tokenAddress", "")
if token_info:
symbol = token_info.get("symbol", "Unknown")
decimals = int(token_info.get("decimals", 18) or 18)
else:
symbol = "Unknown"
decimals = 18
try:
value_raw = int(transfer.get("value", "0") or "0")
amount = value_raw / (10**decimals)
if amount <= 0:
continue
except (ValueError, TypeError):
continue
transfer_data = {
"from_address": transfer.get("from"),
"to_address": transfer.get("to"),
"amount": amount,
"token_address": token_address,
"symbol": symbol,
"timestamp": timestamp,
"tx_hash": transfer.get("transactionHash", ""),
"type": "token",
}
if tx_date not in all_transfers:
all_transfers[tx_date] = []
all_transfers[tx_date].append(transfer_data)
processed_count += 1
# Show progress
if processed_count % 50 == 0:
logger.info(f"Processed {processed_count} outgoing transfers for address {address}...")
# Check for next page
cursor = response_json.get("next")
if not cursor:
logger.info(f"No more pages for address {address}")
break
else:
transfers_url = cursor # Update URL for next page
logger.info(f"Completed Optimism outgoing transfers: {processed_count} found")
return all_transfers
except Exception as e:
logger.error(f"Error fetching Optimism outgoing transfers: {e}")
return {}
def track_and_calculate_reversion_value(
safe_address: str,
chain: str,
incoming_transfers: Dict,
outgoing_transfers: Dict,
) -> float:
"""Track ETH transfers to safe address and handle reversion logic."""
try:
if not incoming_transfers:
logger.warning(f"No transfers found for {chain} chain")
return 0.0
# Track ETH transfers
eth_transfers = []
initial_funding = None
master_safe_address = None
reversion_transfers = []
reversion_value = 0.0
# Sort transfers by timestamp
sorted_incoming_transfers = []
for _, transfers in incoming_transfers.items():
for transfer in transfers:
if isinstance(transfer, dict) and "timestamp" in transfer:
sorted_incoming_transfers.append(transfer)
sorted_incoming_transfers.sort(key=lambda x: x["timestamp"])
sorted_outgoing_transfers = []
for _, transfers in outgoing_transfers.items():
for transfer in transfers:
if isinstance(transfer, dict) and "timestamp" in transfer:
sorted_outgoing_transfers.append(transfer)
sorted_outgoing_transfers.sort(key=lambda x: x["timestamp"])
# Process transfers
for transfer in sorted_incoming_transfers:
# Check if it's an ETH transfer
if transfer.get("symbol") == "ETH":
# If this is the first transfer, store it as initial funding
if not initial_funding:
initial_funding = {
"amount": transfer.get("amount", 0),
"from_address": transfer.get("from_address"),
"timestamp": transfer.get("timestamp"),
}
if transfer.get("from_address"):
master_safe_address = transfer.get("from_address").lower()
eth_transfers.append(transfer)
# If it's from the same address as initial funding
elif (
transfer.get("from_address", "").lower() == master_safe_address
):
eth_transfers.append(transfer)
for transfer in sorted_outgoing_transfers:
if transfer.get("symbol") == "ETH":
if (
transfer.get("to_address", "").lower() == master_safe_address
and transfer.get("from_address", "").lower()
== safe_address.lower()
):
reversion_transfers.append(transfer)
reversion_value = calculate_total_reversion_value(
eth_transfers, reversion_transfers
)
return reversion_value
except Exception as e:
logger.error(f"Error tracking ETH transfers: {str(e)}")
return 0.0
def calculate_total_reversion_value(
eth_transfers: List[Dict], reversion_transfers: List[Dict]
) -> float:
"""Calculate the total reversion value from the reversion transfers."""
reversion_amount = 0.0
reversion_date = None
reversion_value = 0.0
last_transfer = eth_transfers[-1]
try:
# Handle ISO format timestamp
timestamp = last_transfer.get("timestamp", "")
if timestamp.endswith("Z"):
# Convert ISO format to datetime
tx_datetime = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
reversion_date = tx_datetime.strftime("%d-%m-%Y")
else:
# Try parsing as Unix timestamp
reversion_date = datetime.fromtimestamp(int(timestamp)).strftime(
"%d-%m-%Y"
)
except (ValueError, TypeError) as e:
logger.warning(f"Error parsing timestamp: {e}")
# Use current date as fallback
transfer = reversion_transfers[0]
reversion_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y")
for index, transfer in enumerate(reversion_transfers):
transfer_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y")
if index == 0:
eth_price = fetch_historical_eth_price(reversion_date)
else:
eth_price = fetch_historical_eth_price(transfer_date)
if eth_price:
reversion_amount = transfer.get("amount", 0)
reversion_value += reversion_amount * eth_price
return reversion_value
def calculate_initial_investment_value_from_funding_events(
chain: str,
address: str,
incoming_transfers: Dict,
outgoing_transfers: Dict,
) -> float:
total_investment = 0.0
if not incoming_transfers:
print(f"No transfers found for {chain} chain")
return 0.0
if chain == "optimism":
print("Using Optimism-specific transfer processing")
for date, date_transfers in incoming_transfers.items():
for transfer in date_transfers:
try:
amount = transfer.get("amount", 0)
token_symbol = transfer.get("symbol", "").upper()
if amount <= 0:
continue
# Get historical price for the transfer date
date_str = datetime.strptime(date, "%Y-%m-%d").strftime("%d-%m-%Y")
if token_symbol == "ETH": # nosec B105
price = fetch_historical_eth_price(date_str)
else:
coingecko_id = get_coin_id_from_symbol(token_symbol, chain)
if coingecko_id:
price = fetch_historical_token_price(
coingecko_id, date_str, token_symbol
)
else:
price = None
transfer_value = amount * price
total_investment += transfer_value
print(f"Processed transfer on {date}: {amount} {token_symbol} @ ${price} = ${transfer_value}")
except Exception as e:
print(f"Error processing transfer: {str(e)}")
continue
else:
print(f"Unsupported chain: {chain}, skipping")
return 0.0
reversion_value = track_and_calculate_reversion_value(
safe_address=address,
chain=chain,
incoming_transfers=incoming_transfers,
outgoing_transfers=outgoing_transfers,
)
logger.info(f"Total investment: {total_investment}")
logger.info(f"Reversion value: {reversion_value}")
total_investment = total_investment - reversion_value
logger.info(f"Total investment after reversion: {total_investment}")
print(f"Total initial investment from {chain} chain: ${total_investment}")
return total_investment if total_investment > 0 else 0.0
def calculate_initial_value_from_address_and_timestamp(
address: str,
final_timestamp: int,
) -> Tuple[float, int]:
# First fetch the transfers
incoming_transfers = fetch_optimism_incoming_transfers(address, final_timestamp)
logger.info("Fetched incoming transfers")
# Find the first transfer to get the from_address
from_address = None
for date_transfers in incoming_transfers.values():
if date_transfers: # Check if the list is not empty
from_address = date_transfers[0].get('from_address')
break
if from_address is None:
logger.warning("No from_address found in incoming transfers")
from_address = ""
outgoing_transfers = fetch_optimism_outgoing_transfers(address, final_timestamp, from_address)
logger.info(f"Fetched outgoing transfers {outgoing_transfers}")
initial_timestamp = final_timestamp
for _transfers in incoming_transfers.values():
for _transfer in _transfers:
if "timestamp" not in _transfer:
continue
transfer_timestamp = datetime.fromisoformat(_transfer["timestamp"].replace('Z', '+00:00')).timestamp()
if transfer_timestamp < initial_timestamp:
initial_timestamp = int(transfer_timestamp)
# Then calculate initial investment
initial_investment = calculate_initial_investment_value_from_funding_events(
chain="optimism",
address=address,
incoming_transfers=incoming_transfers,
outgoing_transfers=outgoing_transfers,
)
return initial_investment, int(initial_timestamp)
def calculate_final_value_from_address_and_timestamp(
address: str,
timestamp: int,
) -> float:
"""
Calculate the final portfolio value at a specific timestamp by fetching
ETH and token balances and multiplying by historical prices.
"""
final_value = 0.0
try:
# Get ETH balance and price
eth_balance = fetch_eth_balance(address, timestamp)
if eth_balance > 0:
eth_price = fetch_historical_eth_price(
datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y")
)
if eth_price and eth_price > 0:
eth_value = eth_balance * eth_price
final_value += eth_value
logger.info(f"ETH value: {eth_balance:.6f} ETH @ ${eth_price:.2f} = ${eth_value:.2f}")
else:
logger.warning(f"Could not fetch ETH price for timestamp {timestamp}")
# Get token balances and prices
for token_address, (symbol, decimals) in WHITELISTED_TOKENS.items():
try:
token_balance = fetch_token_balance(
address=address,
token_address=token_address,
decimals=decimals,
timestamp=timestamp,
)
if token_balance is not None and token_balance > 0:
token_price = fetch_historical_token_price(
coin_id=COIN_ID_MAPPING.get(symbol.lower(), symbol.lower()),
date_str=datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y"),
token_symbol=symbol
)
if token_price is not None and token_price > 0:
token_value = token_balance * token_price
final_value += token_value
logger.info(f"{symbol} value: {token_balance:.6f} @ ${token_price:.6f} = ${token_value:.2f}")
else:
logger.warning(f"Could not fetch price for {symbol} at timestamp {timestamp}")
except Exception as e:
logger.error(f"Error processing token {symbol} ({token_address}): {e}")
continue
except Exception as e:
logger.error(f"Error calculating final value for address {address}: {e}")
return 0.0
logger.info(f"Total final value for {address}: ${final_value:.2f}")
return final_value
def _calculate_adjusted_apr(
apr: float,
initial_timestamp: int,
final_timestamp: int
) -> float:
if apr is None or apr == 0:
return 0.0
intial_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(initial_timestamp).strftime("%d-%m-%Y"))
final_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(final_timestamp).strftime("%d-%m-%Y"))
if (
final_eth_price is not None
and intial_eth_price is not None
):
adjustment_factor = Decimal("1") - (
Decimal(str(final_eth_price)) / Decimal(str(intial_eth_price))
)
adjusted_apr = round(
float(apr)
+ float(adjustment_factor * Decimal("100")),
2,
)
return adjusted_apr
else:
logger.warning(
f"Could not fetch ETH prices for timestamps {initial_timestamp} and {final_timestamp}. Returning original APR: {apr}"
)
return apr
def calculate_apr_and_roi(
initial_value: float,
final_value: float,
initial_timestamp: int,
final_timestamp: int
) -> Tuple[float, float, float]:
if final_value <= 0:
logger.warning("Final value is non-positive, returning 0.0 for APR and ROI.")
return 0.0, 0.0, 0.0
# Calculate ROI (Return on Investment)
roi = ((final_value / initial_value) - 1) * 100
# Calculate hours since investment
hours = max(1, (final_timestamp - int(initial_timestamp)) / 3600)
# Calculate time ratio (hours in a year / hours since investment)
hours_in_year = 8760
time_ratio = hours_in_year / hours
# Calculate APR (Annualized ROI)
apr = float(roi * time_ratio)
if apr < 0:
apr = roi
adjust_apr = _calculate_adjusted_apr(
apr=apr,
initial_timestamp=initial_timestamp,
final_timestamp=final_timestamp
)
return float(round(apr, 2)), float(round(adjust_apr, 2)), float(round(roi, 2))
def fix_apr_and_roi(df: DataFrame) -> DataFrame:
"""
Fix APR and ROI values by recalculating them based on actual blockchain data.
This function processes each row only once and includes proper error handling.
"""
if df.empty:
logger.info("Empty DataFrame provided to fix_apr_and_roi, returning as-is")
return df
logger.info(f"Starting fix_apr_and_roi with {len(df)} rows")
# Remove rows with excluded addresses
original_count = len(df)
df = df[~df['address'].isin(EXCLUDED_ADDRESSES)]
excluded_count = original_count - len(df)
if excluded_count > 0:
logger.info(f"Excluded {excluded_count} rows with excluded addresses")
# Remove rows with timestamps before 2025-06-06
original_count = len(df)
df = df[df['timestamp'] >= '2025-06-06 00:00:00.000000']
old_data_count = original_count - len(df)
if old_data_count > 0:
logger.info(f"Excluded {old_data_count} rows with timestamps before 2025-06-06")
# Check for future timestamps and filter them out
current_time = datetime.now().timestamp()
future_rows = df[df['timestamp'].apply(lambda x: x.timestamp()) > current_time]
if not future_rows.empty:
logger.warning(f"Found {len(future_rows)} rows with future timestamps, excluding them")
for idx, row in future_rows.iterrows():
logger.warning(f"Future timestamp found: {row['timestamp']} (timestamp: {row['timestamp'].timestamp()})")
df = df[df['timestamp'].apply(lambda x: x.timestamp()) <= current_time]
if df.empty:
logger.warning("No valid rows remaining after filtering, returning empty DataFrame")
return df
logger.info(f"Processing {len(df)} valid rows")
# Create a copy to avoid modifying the original DataFrame during iteration
df_copy = df.copy()
rows_to_drop = []
processed_count = 0
for idx, row in df_copy.iterrows():
try:
if row['is_dummy']:
logger.debug(f"Skipping dummy row {idx}")
continue
processed_count += 1
logger.info(f"Processing row {processed_count}/{len(df_copy)} - Address: {row['address']}")
final_timestamp = int(row['timestamp'].timestamp())
# Validate timestamp is not in the future
if final_timestamp > current_time:
logger.warning(f"Skipping row {idx} with future timestamp: {final_timestamp}")
rows_to_drop.append(idx)
continue
calculation_metrics = row['calculation_metrics']
# Calculate initial value and timestamp with error handling
try:
initial_value, initial_timestamp = calculate_initial_value_from_address_and_timestamp(
row['address'], final_timestamp
)
except Exception as e:
logger.error(f"Error calculating initial value for address {row['address']}: {e}")
rows_to_drop.append(idx)
continue
# Calculate final value with error handling
try:
final_value = calculate_final_value_from_address_and_timestamp(
row['address'], final_timestamp
)
# Add volume if it exists and is positive
volume = row.get("volume", 0)
if volume and volume > 0:
final_value += volume
logger.info(f"Added volume ${volume:.2f} to final value for address {row['address']}")
except Exception as e:
logger.error(f"Error calculating final value for address {row['address']}: {e}")
rows_to_drop.append(idx)
continue
if initial_value <= 0:
logger.warning(f"Initial value for address {row['address']} is non-positive ({initial_value}), skipping row.")
rows_to_drop.append(idx)
continue
# Update calculation metrics
calculation_metrics['initial_value'] = initial_value
calculation_metrics['final_value'] = final_value
df.at[idx, 'calculation_metrics'] = calculation_metrics
# Calculate APR and ROI with error handling
try:
apr, adjusted_apr, roi = calculate_apr_and_roi(
initial_value=initial_value,
final_value=final_value,
initial_timestamp=initial_timestamp,
final_timestamp=final_timestamp
)
df.at[idx, 'apr'] = apr
df.at[idx, 'adjusted_apr'] = adjusted_apr
df.at[idx, 'roi'] = roi
logger.info(f"Successfully processed address {row['address']}: APR={apr:.2f}, ROI={roi:.2f}")
except Exception as e:
logger.error(f"Error calculating APR/ROI for address {row['address']}: {e}")
rows_to_drop.append(idx)
continue
except Exception as e:
logger.error(f"Unexpected error processing row {idx}: {e}")
rows_to_drop.append(idx)
continue
# Drop rows that had errors
if rows_to_drop:
logger.info(f"Dropping {len(rows_to_drop)} rows due to errors")
df = df.drop(rows_to_drop)
logger.info(f"Completed fix_apr_and_roi: {len(df)} rows remaining")
return df
if __name__ == "__main__":
test_address = "0xc8E264f402Ae94f69bDEf8B1f035F7200cD2B0c7"
test_final_timestamp = 1750711233
v = calculate_initial_value_from_address_and_timestamp(
test_address,
test_final_timestamp
)
print(v)