|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from datetime import datetime |
|
from decimal import Decimal |
|
import json |
|
import logging |
|
import os |
|
import time |
|
from typing import Dict, Optional, Tuple, List |
|
|
|
from pandas import DataFrame |
|
import requests |
|
from web3 import Web3 |
|
|
|
ETHERSCAN_API_KEY = "" |
|
EXCLUDED_ADDRESSES = { |
|
"0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7DC8", |
|
"0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7CB5", |
|
"0x3B3AbC1604fAd139F841Da5c3Cad73a72621fee4", |
|
} |
|
COINGECKO_PRICE_API_URL = "https://api.coingecko.com/api/v3/coins/{coin_id}/history?date={date}}" |
|
WHITELISTED_TOKENS = { |
|
|
|
"0x0b2c639c533813f4aa9d7837caf62653d097ff85": ("USDC", 6), |
|
"0x01bff41798a0bcf287b996046ca68b395dbc1071": ("USDT0", 6), |
|
"0x94b008aa00579c1307b0ef2c499ad98a8ce58e58": ("USDT", 6), |
|
"0x7f5c764cbc14f9669b88837ca1490cca17c31607": ("USDC.e", 6), |
|
"0x8ae125e8653821e851f12a49f7765db9a9ce7384": ("DOLA", 18), |
|
"0xc40f949f8a4e094d1b49a23ea9241d289b7b2819": ("LUSD", 18), |
|
"0xda10009cbd5d07dd0cecc66161fc93d7c9000da1": ("DAI", 18), |
|
"0x087c440f251ff6cfe62b86dde1be558b95b4bb9b": ("BOLD", 18), |
|
"0x2e3d870790dc77a83dd1d18184acc7439a53f475": ("FRAX", 18), |
|
"0x2218a117083f5b482b0bb821d27056ba9c04b1d3": ("sDAI", 18), |
|
"0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189": ("oUSDT", 6), |
|
"0x4f604735c1cf31399c6e711d5962b2b3e0225ad3": ("USDGLO", 18), |
|
"0xFC2E6e6BCbd49ccf3A5f029c79984372DcBFE527": ("OLAS", 18) |
|
} |
|
COIN_ID_MAPPING = { |
|
"usdc": "usd-coin", |
|
"alusd": "alchemix-usd", |
|
"usdt0": "usdt0", |
|
"usdt": "bridged-usdt", |
|
"usdc.e": "bridged-usd-coin-optimism", |
|
"usx": "token-dforce-usd", |
|
"dola": "dola-usd", |
|
"lusd": "liquity-usd", |
|
"dai": "makerdao-optimism-bridged-dai-optimism", |
|
"bold": "liquity-bold", |
|
"frax": "frax", |
|
"sdai": "savings-dai", |
|
"usd+": "overnight-fi-usd-optimism", |
|
"ousdt": "openusdt", |
|
"usdglo": "glo-dollar", |
|
"olas": "autonolas" |
|
} |
|
ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
w3 = Web3(Web3.HTTPProvider("https://rpc-gate.autonolas.tech/optimism-rpc/")) |
|
|
|
|
|
def get_coin_id_from_symbol(symbol: str, chain: str) -> Optional[str]: |
|
"""Map token symbol to CoinGecko ID.""" |
|
if chain == "optimism": |
|
coin_id_map = { |
|
"USDC": "usd-coin", |
|
"ALUSD": "alchemix-usd", |
|
"USDT0": "usdt0", |
|
"USDT": "bridged-usdt", |
|
"MSUSD": None, |
|
"USDC.E": "bridged-usd-coin-optimism", |
|
"USX": "token-dforce-usd", |
|
"DOLA": "dola-usd", |
|
"LUSD": "liquity-usd", |
|
"DAI": "makerdao-optimism-bridged-dai-optimism", |
|
"BOLD": "liquity-bold", |
|
"FRAX": "frax", |
|
"SDAI": "savings-dai", |
|
"USD+": "overnight-fi-usd-optimism", |
|
"OUSDT": "openusdt", |
|
"USDGLO": "glo-dollar", |
|
"ETH": "ethereum", |
|
"WETH": "ethereum", |
|
"WBTC": "wrapped-bitcoin", |
|
} |
|
return coin_id_map.get(symbol.upper()) |
|
return None |
|
|
|
def load_cache(name: str) -> Dict: |
|
"""Load price cache from JSON file.""" |
|
cache_file = f"{name}_cache.json" |
|
if os.path.exists(cache_file): |
|
try: |
|
with open(cache_file, 'r') as f: |
|
return json.load(f) |
|
except json.JSONDecodeError: |
|
logger.warning("Cache file corrupted, creating new cache") |
|
return {} |
|
return {} |
|
|
|
def save_cache(name: str, cache: Dict): |
|
"""Save price cache to JSON file.""" |
|
cache_file = f"{name}_cache.json" |
|
with open(cache_file, 'w') as f: |
|
json.dump(cache, f, indent=2) |
|
|
|
def get_cached_price(date_str: str, token_symbol: str) -> Optional[float]: |
|
"""Get price from cache if available.""" |
|
cache = load_cache(name="price") |
|
return cache.get(date_str, {}).get(token_symbol) |
|
|
|
def update_price_cache(date_str: str, token_symbol: str, price: float): |
|
"""Update price cache with new value.""" |
|
cache = load_cache(name="price") |
|
if date_str not in cache: |
|
cache[date_str] = {} |
|
cache[date_str][token_symbol] = price |
|
save_cache(name="price", cache=cache) |
|
|
|
def get_cached_request(cache_key: str) -> Optional[Dict]: |
|
"""Get cached request response if available.""" |
|
cache = load_cache(name="request") |
|
return cache.get(cache_key) |
|
|
|
def update_request_cache(cache_key: str, response: Dict): |
|
"""Update request cache with new response.""" |
|
cache = load_cache(name="request") |
|
cache[cache_key] = response |
|
save_cache(name="request", cache=cache) |
|
|
|
def fetch_historical_eth_price(date_str: str) -> float: |
|
"""Fetch historical ETH price from CoinGecko with caching.""" |
|
|
|
cached_price = get_cached_price(date_str, "ETH") |
|
if cached_price is not None: |
|
return cached_price |
|
|
|
try: |
|
url = "https://api.coingecko.com/api/v3/coins/ethereum/history" |
|
params = {"date": date_str, "localization": "false"} |
|
|
|
|
|
time.sleep(1.2) |
|
|
|
response = requests.get(url, params=params) |
|
response.raise_for_status() |
|
|
|
data = response.json() |
|
if "market_data" in data and "current_price" in data["market_data"]: |
|
price = data["market_data"]["current_price"]["usd"] |
|
|
|
update_price_cache(date_str, "ETH", price) |
|
return price |
|
|
|
return 0.0 |
|
|
|
except Exception as e: |
|
print(f"Error fetching ETH price for {date_str}: {str(e)}") |
|
return 0.0 |
|
|
|
def fetch_historical_token_price(coin_id: str, date_str: str, token_symbol: str) -> float: |
|
"""Fetch historical token price from CoinGecko with caching.""" |
|
|
|
cached_price = get_cached_price(date_str, token_symbol) |
|
if cached_price is not None: |
|
return cached_price |
|
|
|
try: |
|
success, data = request_with_retries( |
|
endpoint=f"https://api.coingecko.com/api/v3/coins/{coin_id}/history", |
|
params={"date": date_str, "localization": "false"}, |
|
) |
|
if not success: |
|
logger.error(f"Failed to fetch historical price for {coin_id} on {date_str}") |
|
return 0.0 |
|
|
|
|
|
time.sleep(1.2) |
|
|
|
if "market_data" in data and "current_price" in data["market_data"]: |
|
price = data["market_data"]["current_price"]["usd"] |
|
|
|
update_price_cache(date_str, token_symbol, price) |
|
return price |
|
|
|
return 0.0 |
|
|
|
except Exception as e: |
|
print(f"Error fetching price for {coin_id} on {date_str}: {str(e)}") |
|
return 0.0 |
|
|
|
def get_block_at_timestamp( |
|
timestamp: int, |
|
chain: str = "optimism" |
|
) -> Optional[int]: |
|
success, res = request_with_retries( |
|
endpoint=f"https://api-optimistic.etherscan.io/api?module=block&action=getblocknobytime×tamp={timestamp}&closest=before&apikey={ETHERSCAN_API_KEY}", |
|
) |
|
if success and res.get("status") == "1" and "result" in res: |
|
return int(res.get("result")) |
|
else: |
|
logger.error(f"Failed to fetch block at timestamp {timestamp} for {chain}: {res.get('message', 'Unknown error')}") |
|
return None |
|
|
|
def fetch_eth_balance(address: str, timestamp: float) -> float: |
|
key = "eth_balance" |
|
cache = load_cache(name=key) |
|
if f"{address}_{timestamp}" in cache: |
|
return cache[f"{address}_{timestamp}"] / (10 ** 18) |
|
|
|
balance = w3.eth.get_balance( |
|
account=Web3.to_checksum_address(address), |
|
block_identifier=get_block_at_timestamp(int(timestamp)) |
|
) |
|
|
|
cache[f"{address}_{timestamp}"] = balance |
|
save_cache(name=key, cache=cache) |
|
return balance / (10 ** 18) |
|
|
|
def fetch_token_balance( |
|
address: str, |
|
token_address: str, |
|
timestamp: int, |
|
decimals: int = 18 |
|
) -> Optional[float]: |
|
contract = w3.eth.contract( |
|
address=Web3.to_checksum_address(token_address), |
|
abi=[ |
|
{ |
|
"constant": True, |
|
"inputs": [{"name": "_owner", "type": "address"}], |
|
"name": "balanceOf", |
|
"outputs": [{"name": "", "type": "uint256"}], |
|
"payable": False, |
|
"stateMutability": "view", |
|
"type": "function", |
|
} |
|
] |
|
) |
|
try: |
|
cache_key = f"token_balance_{address}_{token_address}_{timestamp}" |
|
cache = load_cache(name="token_balance") |
|
if cache_key in cache: |
|
return cache[cache_key] / (10 ** decimals) |
|
|
|
balance = contract.functions.balanceOf(address).call(block_identifier=get_block_at_timestamp(int(timestamp))) |
|
cache[cache_key] = balance |
|
save_cache(name="token_balance", cache=cache) |
|
return balance / (10 ** decimals) if balance else 0.0 |
|
except Exception as e: |
|
logger.error(f"Error fetching token balance for {address} at {timestamp}: {e}") |
|
return None |
|
|
|
def get_datetime_from_timestamp(timestamp: str) -> Optional[datetime]: |
|
"""Convert timestamp string to datetime object.""" |
|
try: |
|
return datetime.fromisoformat(timestamp.replace("Z", "+00:00")) |
|
except (ValueError, TypeError): |
|
logger.warning(f"Invalid timestamp format: {timestamp}") |
|
return None |
|
|
|
def request_with_retries( |
|
endpoint: str, |
|
params: Dict = None, |
|
headers: Dict = None, |
|
method: str = "GET", |
|
body: Dict = None, |
|
rate_limited_code: int = 429, |
|
retry_wait: int = 5, |
|
max_retries: int = 3 |
|
) -> Tuple[bool, Dict]: |
|
for attempt in range(max_retries): |
|
try: |
|
if method.upper() == "POST": |
|
cache_key = f"POST_{endpoint}_{str(body or {})}" |
|
cached_response = get_cached_request(cache_key) |
|
if cached_response is not None: |
|
return len(cached_response) > 0, cached_response |
|
|
|
response = requests.post(endpoint, headers=headers, json=body) |
|
|
|
if response.ok: |
|
update_request_cache(cache_key, response.json()) |
|
else: |
|
|
|
cache_key = f"{endpoint}_{str(params or {})}" |
|
cached_response = get_cached_request(cache_key) |
|
if cached_response is not None: |
|
return len(cached_response) > 0, cached_response |
|
|
|
response = requests.get(endpoint, headers=headers, params=params or {}) |
|
|
|
|
|
if response.status_code == 200: |
|
update_request_cache(cache_key, response.json()) |
|
elif response.status_code == 404: |
|
update_request_cache(cache_key, {}) |
|
|
|
if response.status_code == rate_limited_code: |
|
logger.warning(f"Rate limited. Waiting {retry_wait} seconds...") |
|
time.sleep(retry_wait) |
|
continue |
|
|
|
if response.status_code != 200: |
|
logger.error(f"Request failed with status {response.status_code}") |
|
return False, {} |
|
|
|
return True, response.json() |
|
|
|
except Exception as e: |
|
logger.error(f"Request failed: {str(e)}") |
|
if attempt < max_retries - 1: |
|
time.sleep(retry_wait) |
|
continue |
|
return False, {} |
|
|
|
return False, {} |
|
|
|
def should_include_transfer_optimism( |
|
from_address: str |
|
) -> bool: |
|
"""Determine if an Optimism transfer should be included based on from address type.""" |
|
if not from_address: |
|
return False |
|
|
|
|
|
if from_address.lower() in [ |
|
"0x0000000000000000000000000000000000000000", |
|
"0x0", |
|
"", |
|
]: |
|
return False |
|
|
|
try: |
|
|
|
payload = { |
|
"jsonrpc": "2.0", |
|
"method": "eth_getCode", |
|
"params": [from_address, "latest"], |
|
"id": 1, |
|
} |
|
|
|
success, result = request_with_retries( |
|
endpoint="https://mainnet.optimism.io", |
|
method="POST", |
|
headers={"Content-Type": "application/json"}, |
|
body=payload, |
|
rate_limited_code=429, |
|
retry_wait=5, |
|
) |
|
|
|
if not success: |
|
logger.error("Failed to check contract code") |
|
return False |
|
|
|
code = result.get("result", "0x") |
|
|
|
|
|
if code == "0x": |
|
return True |
|
|
|
|
|
safe_check_url = f"https://safe-transaction-optimism.safe.global/api/v1/safes/{from_address}/" |
|
success, _ = request_with_retries( |
|
endpoint=safe_check_url, |
|
headers={"Accept": "application/json"}, |
|
rate_limited_code=429, |
|
retry_wait=5, |
|
) |
|
|
|
if success: |
|
return True |
|
|
|
logger.info( |
|
f"Excluding transfer from contract: {from_address}" |
|
) |
|
return False |
|
|
|
except Exception as e: |
|
logger.error(f"Error checking address {from_address}: {e}") |
|
return False |
|
|
|
def fetch_optimism_incoming_transfers( |
|
address: str, |
|
last_timestamp: int |
|
) -> Dict: |
|
""" |
|
Fetch Optimism transfers for a given address with improved error handling and rate limiting. |
|
""" |
|
base_url = "https://safe-transaction-optimism.safe.global/api/v1" |
|
all_transfers_by_date = {} |
|
|
|
try: |
|
logger.info(f"Fetching Optimism transfers for address {address}...") |
|
|
|
|
|
transfers_url = f"{base_url}/safes/{address}/incoming-transfers/" |
|
|
|
processed_count = 0 |
|
page_count = 0 |
|
max_pages = 10 |
|
|
|
while page_count < max_pages: |
|
page_count += 1 |
|
logger.info(f"Fetching page {page_count} for address {address}") |
|
|
|
success, response_json = request_with_retries( |
|
endpoint=transfers_url, |
|
headers={"Accept": "application/json"}, |
|
rate_limited_code=429, |
|
retry_wait=5 |
|
) |
|
|
|
if not success: |
|
logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") |
|
break |
|
|
|
transfers = response_json.get("results", []) |
|
if not transfers: |
|
logger.info(f"No more transfers found for address {address} on page {page_count}") |
|
break |
|
print("incoming transfers",response_json) |
|
for transfer in transfers: |
|
|
|
timestamp = transfer.get("executionDate") |
|
if not timestamp: |
|
continue |
|
|
|
tx_datetime = get_datetime_from_timestamp(timestamp) |
|
tx_date = tx_datetime.strftime("%Y-%m-%d") if tx_datetime else None |
|
|
|
if not tx_date: |
|
continue |
|
|
|
if tx_datetime.timestamp() > last_timestamp: |
|
continue |
|
|
|
|
|
from_address = transfer.get("from", address) |
|
transfer_type = transfer.get("type", "") |
|
|
|
if from_address.lower() == address.lower(): |
|
continue |
|
|
|
|
|
if tx_date not in all_transfers_by_date: |
|
all_transfers_by_date[tx_date] = [] |
|
|
|
should_include = should_include_transfer_optimism(from_address) |
|
if not should_include: |
|
continue |
|
|
|
|
|
if transfer_type == "ERC20_TRANSFER": |
|
|
|
token_info = transfer.get("tokenInfo", {}) |
|
token_address = transfer.get("tokenAddress", "") |
|
|
|
if not token_info: |
|
if not token_address: |
|
continue |
|
|
|
symbol = "Unknown" |
|
decimals = 18 |
|
else: |
|
symbol = token_info.get("symbol", "Unknown") |
|
decimals = int(token_info.get("decimals", 18) or 18) |
|
|
|
if symbol.lower() not in ["usdc", "eth"]: |
|
continue |
|
|
|
value_raw = int(transfer.get("value", "0") or "0") |
|
amount = value_raw / (10**decimals) |
|
|
|
transfer_data = { |
|
"from_address": from_address, |
|
"amount": amount, |
|
"token_address": token_address, |
|
"symbol": symbol, |
|
"timestamp": timestamp, |
|
"tx_hash": transfer.get("transactionHash", ""), |
|
"type": "token" |
|
} |
|
|
|
elif transfer_type == "ETHER_TRANSFER": |
|
|
|
try: |
|
value_wei = int(transfer.get("value", "0") or "0") |
|
amount_eth = value_wei / 10**18 |
|
|
|
if amount_eth <= 0: |
|
continue |
|
except (ValueError, TypeError): |
|
logger.warning(f"Skipping transfer with invalid value: {transfer.get('value')}") |
|
continue |
|
|
|
transfer_data = { |
|
"from_address": from_address, |
|
"amount": amount_eth, |
|
"token_address": "", |
|
"symbol": "ETH", |
|
"timestamp": timestamp, |
|
"tx_hash": transfer.get("transactionHash", ""), |
|
"type": "eth" |
|
} |
|
|
|
else: |
|
|
|
continue |
|
|
|
all_transfers_by_date[tx_date].append(transfer_data) |
|
processed_count += 1 |
|
|
|
|
|
if processed_count % 50 == 0: |
|
logger.info(f"Processed {processed_count} Optimism transfers for address {address}...") |
|
|
|
|
|
cursor = response_json.get("next") |
|
if not cursor: |
|
logger.info(f"No more pages for address {address}") |
|
break |
|
else: |
|
transfers_url = cursor |
|
|
|
logger.info(f"Completed Optimism transfers for address {address}: {processed_count} found") |
|
return all_transfers_by_date |
|
|
|
except Exception as e: |
|
logger.error(f"Error fetching Optimism transfers for address {address}: {e}") |
|
return {} |
|
|
|
|
|
def fetch_optimism_outgoing_transfers( |
|
address: str, |
|
final_timestamp: int, |
|
from_address: str, |
|
) -> Dict: |
|
"""Fetch all outgoing transfers from the safe address on Optimism until a specific date. |
|
|
|
Args: |
|
address: The safe address to fetch transfers for |
|
final_timestamp: The timestamp until which to fetch transfers |
|
from_address: The master address to check for reversions |
|
|
|
Returns: |
|
Dict: Dictionary of transfers organized by date |
|
""" |
|
all_transfers = {} |
|
|
|
if not address: |
|
logger.warning( |
|
"No address provided for fetching Optimism outgoing transfers" |
|
) |
|
return all_transfers |
|
|
|
try: |
|
|
|
base_url = "https://safe-transaction-optimism.safe.global/api/v1" |
|
transfers_url = f"{base_url}/safes/{address}/transfers/" |
|
|
|
processed_count = 0 |
|
page_count = 0 |
|
max_pages = 50 |
|
|
|
while page_count < max_pages: |
|
page_count += 1 |
|
logger.info(f"Fetching outgoing transfers page {page_count} for address {address}") |
|
|
|
success, response_json = request_with_retries( |
|
endpoint=transfers_url, |
|
headers={"Accept": "application/json"}, |
|
rate_limited_code=429, |
|
retry_wait=5, |
|
) |
|
|
|
if not success: |
|
logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") |
|
break |
|
|
|
transfers = response_json.get("results", []) |
|
|
|
if not transfers: |
|
logger.info(f"No more transfers found for address {address} on page {page_count}") |
|
break |
|
|
|
print("outgoing_transfers", response_json) |
|
|
|
for transfer in transfers: |
|
|
|
timestamp = transfer.get("executionDate") |
|
if not timestamp: |
|
continue |
|
|
|
|
|
try: |
|
tx_datetime = datetime.fromisoformat( |
|
timestamp.replace("Z", "+00:00") |
|
) |
|
tx_date = tx_datetime.strftime("%Y-%m-%d") |
|
except (ValueError, TypeError): |
|
logger.warning( |
|
f"Invalid timestamp format: {timestamp}" |
|
) |
|
continue |
|
|
|
if tx_datetime.timestamp() > final_timestamp: |
|
continue |
|
|
|
|
|
|
|
if transfer.get("from").lower() == address.lower(): |
|
transfer_type = transfer.get("type", "") |
|
|
|
if transfer_type == "ETHER_TRANSFER": |
|
try: |
|
value_wei = int(transfer.get("value", "0") or "0") |
|
amount_eth = value_wei / 10**18 |
|
|
|
if amount_eth <= 0: |
|
continue |
|
except (ValueError, TypeError): |
|
continue |
|
|
|
transfer_data = { |
|
"from_address": address, |
|
"to_address": transfer.get("to"), |
|
"amount": amount_eth, |
|
"token_address": ZERO_ADDRESS, |
|
"symbol": "ETH", |
|
"timestamp": timestamp, |
|
"tx_hash": transfer.get("transactionHash", ""), |
|
"type": "eth", |
|
} |
|
|
|
if tx_date not in all_transfers: |
|
all_transfers[tx_date] = [] |
|
all_transfers[tx_date].append(transfer_data) |
|
processed_count += 1 |
|
|
|
|
|
|
|
token_info = transfer.get("tokenInfo", {}) |
|
token_address = transfer.get("tokenAddress", "") |
|
|
|
if token_info: |
|
symbol = token_info.get("symbol", "Unknown") |
|
decimals = int(token_info.get("decimals", 18) or 18) |
|
else: |
|
symbol = "Unknown" |
|
decimals = 18 |
|
|
|
try: |
|
value_raw = int(transfer.get("value", "0") or "0") |
|
amount = value_raw / (10**decimals) |
|
|
|
if amount <= 0: |
|
continue |
|
except (ValueError, TypeError): |
|
continue |
|
|
|
transfer_data = { |
|
"from_address": transfer.get("from"), |
|
"to_address": transfer.get("to"), |
|
"amount": amount, |
|
"token_address": token_address, |
|
"symbol": symbol, |
|
"timestamp": timestamp, |
|
"tx_hash": transfer.get("transactionHash", ""), |
|
"type": "token", |
|
} |
|
|
|
if tx_date not in all_transfers: |
|
all_transfers[tx_date] = [] |
|
all_transfers[tx_date].append(transfer_data) |
|
processed_count += 1 |
|
|
|
|
|
if processed_count % 50 == 0: |
|
logger.info(f"Processed {processed_count} outgoing transfers for address {address}...") |
|
|
|
|
|
cursor = response_json.get("next") |
|
if not cursor: |
|
logger.info(f"No more pages for address {address}") |
|
break |
|
else: |
|
transfers_url = cursor |
|
|
|
logger.info(f"Completed Optimism outgoing transfers: {processed_count} found") |
|
return all_transfers |
|
|
|
except Exception as e: |
|
logger.error(f"Error fetching Optimism outgoing transfers: {e}") |
|
return {} |
|
|
|
|
|
def track_and_calculate_reversion_value( |
|
safe_address: str, |
|
chain: str, |
|
incoming_transfers: Dict, |
|
outgoing_transfers: Dict, |
|
) -> float: |
|
"""Track ETH transfers to safe address and handle reversion logic.""" |
|
try: |
|
if not incoming_transfers: |
|
logger.warning(f"No transfers found for {chain} chain") |
|
return 0.0 |
|
|
|
|
|
eth_transfers = [] |
|
initial_funding = None |
|
master_safe_address = None |
|
reversion_transfers = [] |
|
reversion_value = 0.0 |
|
|
|
|
|
sorted_incoming_transfers = [] |
|
for _, transfers in incoming_transfers.items(): |
|
for transfer in transfers: |
|
if isinstance(transfer, dict) and "timestamp" in transfer: |
|
sorted_incoming_transfers.append(transfer) |
|
|
|
sorted_incoming_transfers.sort(key=lambda x: x["timestamp"]) |
|
|
|
sorted_outgoing_transfers = [] |
|
for _, transfers in outgoing_transfers.items(): |
|
for transfer in transfers: |
|
if isinstance(transfer, dict) and "timestamp" in transfer: |
|
sorted_outgoing_transfers.append(transfer) |
|
|
|
sorted_outgoing_transfers.sort(key=lambda x: x["timestamp"]) |
|
|
|
|
|
for transfer in sorted_incoming_transfers: |
|
|
|
if transfer.get("symbol") == "ETH": |
|
|
|
if not initial_funding: |
|
initial_funding = { |
|
"amount": transfer.get("amount", 0), |
|
"from_address": transfer.get("from_address"), |
|
"timestamp": transfer.get("timestamp"), |
|
} |
|
if transfer.get("from_address"): |
|
master_safe_address = transfer.get("from_address").lower() |
|
eth_transfers.append(transfer) |
|
|
|
elif ( |
|
transfer.get("from_address", "").lower() == master_safe_address |
|
): |
|
eth_transfers.append(transfer) |
|
|
|
for transfer in sorted_outgoing_transfers: |
|
if transfer.get("symbol") == "ETH": |
|
if ( |
|
transfer.get("to_address", "").lower() == master_safe_address |
|
and transfer.get("from_address", "").lower() |
|
== safe_address.lower() |
|
): |
|
reversion_transfers.append(transfer) |
|
|
|
reversion_value = calculate_total_reversion_value( |
|
eth_transfers, reversion_transfers |
|
) |
|
|
|
return reversion_value |
|
|
|
except Exception as e: |
|
logger.error(f"Error tracking ETH transfers: {str(e)}") |
|
return 0.0 |
|
|
|
def calculate_total_reversion_value( |
|
eth_transfers: List[Dict], reversion_transfers: List[Dict] |
|
) -> float: |
|
"""Calculate the total reversion value from the reversion transfers.""" |
|
reversion_amount = 0.0 |
|
reversion_date = None |
|
reversion_value = 0.0 |
|
last_transfer = eth_transfers[-1] |
|
|
|
try: |
|
|
|
timestamp = last_transfer.get("timestamp", "") |
|
if timestamp.endswith("Z"): |
|
|
|
tx_datetime = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) |
|
reversion_date = tx_datetime.strftime("%d-%m-%Y") |
|
else: |
|
|
|
reversion_date = datetime.fromtimestamp(int(timestamp)).strftime( |
|
"%d-%m-%Y" |
|
) |
|
except (ValueError, TypeError) as e: |
|
logger.warning(f"Error parsing timestamp: {e}") |
|
|
|
transfer = reversion_transfers[0] |
|
reversion_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") |
|
|
|
for index, transfer in enumerate(reversion_transfers): |
|
transfer_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") |
|
if index == 0: |
|
eth_price = fetch_historical_eth_price(reversion_date) |
|
else: |
|
eth_price = fetch_historical_eth_price(transfer_date) |
|
if eth_price: |
|
reversion_amount = transfer.get("amount", 0) |
|
reversion_value += reversion_amount * eth_price |
|
|
|
return reversion_value |
|
|
|
def calculate_initial_investment_value_from_funding_events( |
|
chain: str, |
|
address: str, |
|
incoming_transfers: Dict, |
|
outgoing_transfers: Dict, |
|
) -> float: |
|
total_investment = 0.0 |
|
|
|
if not incoming_transfers: |
|
print(f"No transfers found for {chain} chain") |
|
return 0.0 |
|
|
|
if chain == "optimism": |
|
print("Using Optimism-specific transfer processing") |
|
for date, date_transfers in incoming_transfers.items(): |
|
for transfer in date_transfers: |
|
try: |
|
amount = transfer.get("amount", 0) |
|
token_symbol = transfer.get("symbol", "").upper() |
|
|
|
if amount <= 0: |
|
continue |
|
|
|
|
|
date_str = datetime.strptime(date, "%Y-%m-%d").strftime("%d-%m-%Y") |
|
|
|
if token_symbol == "ETH": |
|
price = fetch_historical_eth_price(date_str) |
|
else: |
|
coingecko_id = get_coin_id_from_symbol(token_symbol, chain) |
|
if coingecko_id: |
|
price = fetch_historical_token_price( |
|
coingecko_id, date_str, token_symbol |
|
) |
|
else: |
|
price = None |
|
|
|
transfer_value = amount * price |
|
total_investment += transfer_value |
|
|
|
print(f"Processed transfer on {date}: {amount} {token_symbol} @ ${price} = ${transfer_value}") |
|
|
|
except Exception as e: |
|
print(f"Error processing transfer: {str(e)}") |
|
continue |
|
else: |
|
print(f"Unsupported chain: {chain}, skipping") |
|
return 0.0 |
|
|
|
reversion_value = track_and_calculate_reversion_value( |
|
safe_address=address, |
|
chain=chain, |
|
incoming_transfers=incoming_transfers, |
|
outgoing_transfers=outgoing_transfers, |
|
) |
|
|
|
logger.info(f"Total investment: {total_investment}") |
|
logger.info(f"Reversion value: {reversion_value}") |
|
total_investment = total_investment - reversion_value |
|
logger.info(f"Total investment after reversion: {total_investment}") |
|
|
|
print(f"Total initial investment from {chain} chain: ${total_investment}") |
|
return total_investment if total_investment > 0 else 0.0 |
|
|
|
def calculate_initial_value_from_address_and_timestamp( |
|
address: str, |
|
final_timestamp: int, |
|
) -> Tuple[float, int]: |
|
|
|
incoming_transfers = fetch_optimism_incoming_transfers(address, final_timestamp) |
|
logger.info("Fetched incoming transfers") |
|
|
|
|
|
from_address = None |
|
for date_transfers in incoming_transfers.values(): |
|
if date_transfers: |
|
from_address = date_transfers[0].get('from_address') |
|
break |
|
|
|
if from_address is None: |
|
logger.warning("No from_address found in incoming transfers") |
|
from_address = "" |
|
|
|
outgoing_transfers = fetch_optimism_outgoing_transfers(address, final_timestamp, from_address) |
|
logger.info(f"Fetched outgoing transfers {outgoing_transfers}") |
|
initial_timestamp = final_timestamp |
|
for _transfers in incoming_transfers.values(): |
|
for _transfer in _transfers: |
|
if "timestamp" not in _transfer: |
|
continue |
|
|
|
transfer_timestamp = datetime.fromisoformat(_transfer["timestamp"].replace('Z', '+00:00')).timestamp() |
|
if transfer_timestamp < initial_timestamp: |
|
initial_timestamp = int(transfer_timestamp) |
|
|
|
|
|
initial_investment = calculate_initial_investment_value_from_funding_events( |
|
chain="optimism", |
|
address=address, |
|
incoming_transfers=incoming_transfers, |
|
outgoing_transfers=outgoing_transfers, |
|
) |
|
|
|
return initial_investment, int(initial_timestamp) |
|
|
|
def calculate_final_value_from_address_and_timestamp( |
|
address: str, |
|
timestamp: int, |
|
) -> float: |
|
""" |
|
Calculate the final portfolio value at a specific timestamp by fetching |
|
ETH and token balances and multiplying by historical prices. |
|
""" |
|
final_value = 0.0 |
|
|
|
try: |
|
|
|
eth_balance = fetch_eth_balance(address, timestamp) |
|
if eth_balance > 0: |
|
eth_price = fetch_historical_eth_price( |
|
datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y") |
|
) |
|
if eth_price and eth_price > 0: |
|
eth_value = eth_balance * eth_price |
|
final_value += eth_value |
|
logger.info(f"ETH value: {eth_balance:.6f} ETH @ ${eth_price:.2f} = ${eth_value:.2f}") |
|
else: |
|
logger.warning(f"Could not fetch ETH price for timestamp {timestamp}") |
|
|
|
|
|
for token_address, (symbol, decimals) in WHITELISTED_TOKENS.items(): |
|
try: |
|
token_balance = fetch_token_balance( |
|
address=address, |
|
token_address=token_address, |
|
decimals=decimals, |
|
timestamp=timestamp, |
|
) |
|
|
|
if token_balance is not None and token_balance > 0: |
|
token_price = fetch_historical_token_price( |
|
coin_id=COIN_ID_MAPPING.get(symbol.lower(), symbol.lower()), |
|
date_str=datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y"), |
|
token_symbol=symbol |
|
) |
|
|
|
if token_price is not None and token_price > 0: |
|
token_value = token_balance * token_price |
|
final_value += token_value |
|
logger.info(f"{symbol} value: {token_balance:.6f} @ ${token_price:.6f} = ${token_value:.2f}") |
|
else: |
|
logger.warning(f"Could not fetch price for {symbol} at timestamp {timestamp}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing token {symbol} ({token_address}): {e}") |
|
continue |
|
|
|
except Exception as e: |
|
logger.error(f"Error calculating final value for address {address}: {e}") |
|
return 0.0 |
|
|
|
logger.info(f"Total final value for {address}: ${final_value:.2f}") |
|
return final_value |
|
|
|
def _calculate_adjusted_apr( |
|
apr: float, |
|
initial_timestamp: int, |
|
final_timestamp: int |
|
) -> float: |
|
if apr is None or apr == 0: |
|
return 0.0 |
|
|
|
intial_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(initial_timestamp).strftime("%d-%m-%Y")) |
|
final_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(final_timestamp).strftime("%d-%m-%Y")) |
|
|
|
if ( |
|
final_eth_price is not None |
|
and intial_eth_price is not None |
|
): |
|
adjustment_factor = Decimal("1") - ( |
|
Decimal(str(final_eth_price)) / Decimal(str(intial_eth_price)) |
|
) |
|
adjusted_apr = round( |
|
float(apr) |
|
+ float(adjustment_factor * Decimal("100")), |
|
2, |
|
) |
|
return adjusted_apr |
|
else: |
|
logger.warning( |
|
f"Could not fetch ETH prices for timestamps {initial_timestamp} and {final_timestamp}. Returning original APR: {apr}" |
|
) |
|
return apr |
|
|
|
def calculate_apr_and_roi( |
|
initial_value: float, |
|
final_value: float, |
|
initial_timestamp: int, |
|
final_timestamp: int |
|
) -> Tuple[float, float, float]: |
|
if final_value <= 0: |
|
logger.warning("Final value is non-positive, returning 0.0 for APR and ROI.") |
|
return 0.0, 0.0, 0.0 |
|
|
|
|
|
roi = ((final_value / initial_value) - 1) * 100 |
|
|
|
|
|
hours = max(1, (final_timestamp - int(initial_timestamp)) / 3600) |
|
|
|
|
|
hours_in_year = 8760 |
|
time_ratio = hours_in_year / hours |
|
|
|
|
|
apr = float(roi * time_ratio) |
|
if apr < 0: |
|
apr = roi |
|
|
|
adjust_apr = _calculate_adjusted_apr( |
|
apr=apr, |
|
initial_timestamp=initial_timestamp, |
|
final_timestamp=final_timestamp |
|
) |
|
|
|
return float(round(apr, 2)), float(round(adjust_apr, 2)), float(round(roi, 2)) |
|
|
|
|
|
def fix_apr_and_roi(df: DataFrame) -> DataFrame: |
|
""" |
|
Fix APR and ROI values by recalculating them based on actual blockchain data. |
|
This function processes each row only once and includes proper error handling. |
|
""" |
|
if df.empty: |
|
logger.info("Empty DataFrame provided to fix_apr_and_roi, returning as-is") |
|
return df |
|
|
|
logger.info(f"Starting fix_apr_and_roi with {len(df)} rows") |
|
|
|
|
|
original_count = len(df) |
|
df = df[~df['address'].isin(EXCLUDED_ADDRESSES)] |
|
excluded_count = original_count - len(df) |
|
if excluded_count > 0: |
|
logger.info(f"Excluded {excluded_count} rows with excluded addresses") |
|
|
|
|
|
original_count = len(df) |
|
df = df[df['timestamp'] >= '2025-06-06 00:00:00.000000'] |
|
old_data_count = original_count - len(df) |
|
if old_data_count > 0: |
|
logger.info(f"Excluded {old_data_count} rows with timestamps before 2025-06-06") |
|
|
|
|
|
current_time = datetime.now().timestamp() |
|
future_rows = df[df['timestamp'].apply(lambda x: x.timestamp()) > current_time] |
|
if not future_rows.empty: |
|
logger.warning(f"Found {len(future_rows)} rows with future timestamps, excluding them") |
|
for idx, row in future_rows.iterrows(): |
|
logger.warning(f"Future timestamp found: {row['timestamp']} (timestamp: {row['timestamp'].timestamp()})") |
|
df = df[df['timestamp'].apply(lambda x: x.timestamp()) <= current_time] |
|
|
|
if df.empty: |
|
logger.warning("No valid rows remaining after filtering, returning empty DataFrame") |
|
return df |
|
|
|
logger.info(f"Processing {len(df)} valid rows") |
|
|
|
|
|
df_copy = df.copy() |
|
rows_to_drop = [] |
|
processed_count = 0 |
|
|
|
for idx, row in df_copy.iterrows(): |
|
try: |
|
if row['is_dummy']: |
|
logger.debug(f"Skipping dummy row {idx}") |
|
continue |
|
|
|
processed_count += 1 |
|
logger.info(f"Processing row {processed_count}/{len(df_copy)} - Address: {row['address']}") |
|
|
|
final_timestamp = int(row['timestamp'].timestamp()) |
|
|
|
|
|
if final_timestamp > current_time: |
|
logger.warning(f"Skipping row {idx} with future timestamp: {final_timestamp}") |
|
rows_to_drop.append(idx) |
|
continue |
|
|
|
calculation_metrics = row['calculation_metrics'] |
|
|
|
|
|
try: |
|
initial_value, initial_timestamp = calculate_initial_value_from_address_and_timestamp( |
|
row['address'], final_timestamp |
|
) |
|
except Exception as e: |
|
logger.error(f"Error calculating initial value for address {row['address']}: {e}") |
|
rows_to_drop.append(idx) |
|
continue |
|
|
|
|
|
try: |
|
final_value = calculate_final_value_from_address_and_timestamp( |
|
row['address'], final_timestamp |
|
) |
|
|
|
|
|
volume = row.get("volume", 0) |
|
if volume and volume > 0: |
|
final_value += volume |
|
logger.info(f"Added volume ${volume:.2f} to final value for address {row['address']}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error calculating final value for address {row['address']}: {e}") |
|
rows_to_drop.append(idx) |
|
continue |
|
|
|
if initial_value <= 0: |
|
logger.warning(f"Initial value for address {row['address']} is non-positive ({initial_value}), skipping row.") |
|
rows_to_drop.append(idx) |
|
continue |
|
|
|
|
|
calculation_metrics['initial_value'] = initial_value |
|
calculation_metrics['final_value'] = final_value |
|
df.at[idx, 'calculation_metrics'] = calculation_metrics |
|
|
|
|
|
try: |
|
apr, adjusted_apr, roi = calculate_apr_and_roi( |
|
initial_value=initial_value, |
|
final_value=final_value, |
|
initial_timestamp=initial_timestamp, |
|
final_timestamp=final_timestamp |
|
) |
|
df.at[idx, 'apr'] = apr |
|
df.at[idx, 'adjusted_apr'] = adjusted_apr |
|
df.at[idx, 'roi'] = roi |
|
|
|
logger.info(f"Successfully processed address {row['address']}: APR={apr:.2f}, ROI={roi:.2f}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error calculating APR/ROI for address {row['address']}: {e}") |
|
rows_to_drop.append(idx) |
|
continue |
|
|
|
except Exception as e: |
|
logger.error(f"Unexpected error processing row {idx}: {e}") |
|
rows_to_drop.append(idx) |
|
continue |
|
|
|
|
|
if rows_to_drop: |
|
logger.info(f"Dropping {len(rows_to_drop)} rows due to errors") |
|
df = df.drop(rows_to_drop) |
|
|
|
logger.info(f"Completed fix_apr_and_roi: {len(df)} rows remaining") |
|
return df |
|
|
|
|
|
if __name__ == "__main__": |
|
test_address = "0xc8E264f402Ae94f69bDEf8B1f035F7200cD2B0c7" |
|
test_final_timestamp = 1750711233 |
|
|
|
v = calculate_initial_value_from_address_and_timestamp( |
|
test_address, |
|
test_final_timestamp |
|
) |
|
print(v) |
|
|