cgae-server / cgae_engine /wallet.py
rb125
final demo cleanup
a556b6c
"""
CGAE Wallet Manager — Real ETH wallet integration for agents.
Each agent gets a unique Ethereum keypair on registration.
A treasury wallet funds agents and disburses ETH on successful contract completion.
Usage:
from cgae_engine.wallet import WalletManager
wm = WalletManager(rpc_url="https://evmrpc-testnet.0g.ai",
treasury_private_key=os.getenv("PRIVATE_KEY"))
wallet = wm.create_agent_wallet("agent_abc123")
tx = wm.disburse(wallet.address, amount_wei=10**15) # 0.001 ETH
"""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from eth_account import Account
from web3 import Web3
logger = logging.getLogger(__name__)
@dataclass
class AgentWallet:
"""An agent's Ethereum wallet."""
agent_id: str
address: str
private_key: str # hex with 0x prefix
def to_dict(self, redact_key: bool = True) -> dict:
return {
"agent_id": self.agent_id,
"address": self.address,
"private_key": self.private_key[:10] + "..." if redact_key else self.private_key,
}
class WalletManager:
"""
Manages ETH wallets for CGAE agents.
- Creates a unique keypair per agent
- Sends real ETH from a treasury wallet on contract settlement
- Tracks all disbursements for audit trail
"""
def __init__(
self,
rpc_url: Optional[str] = None,
treasury_private_key: Optional[str] = None,
dry_run: bool = False,
wallet_store_path: Optional[str] = None,
):
self.rpc_url = rpc_url or os.getenv("ZG_RPC_URL", "https://evmrpc-testnet.0g.ai")
self._treasury_key = treasury_private_key or os.getenv("PRIVATE_KEY")
self.dry_run = dry_run
self.w3 = Web3(Web3.HTTPProvider(self.rpc_url))
self._wallets: dict[str, AgentWallet] = {} # agent_id -> wallet
self._disbursements: list[dict] = []
self._store_path = Path(wallet_store_path) if wallet_store_path else Path("server/live_results/wallets.json")
if self._treasury_key:
key = self._treasury_key if self._treasury_key.startswith("0x") else f"0x{self._treasury_key}"
self._treasury_account = Account.from_key(key)
self.treasury_address = self._treasury_account.address
else:
self._treasury_account = None
self.treasury_address = None
# Load persisted wallets from disk
self._model_wallets: dict[str, AgentWallet] = {} # model_name -> wallet
self._load_wallets()
@property
def is_live(self) -> bool:
"""True if we can send real transactions."""
return self._treasury_account is not None and not self.dry_run
def create_agent_wallet(self, agent_id: str, model_name: str = "") -> AgentWallet:
"""Get existing wallet for this model or generate a new keypair."""
if agent_id in self._wallets:
return self._wallets[agent_id]
# Reuse persisted wallet for this model if it exists
if model_name and model_name in self._model_wallets:
wallet = AgentWallet(
agent_id=agent_id,
address=self._model_wallets[model_name].address,
private_key=self._model_wallets[model_name].private_key,
)
self._wallets[agent_id] = wallet
logger.info(f" [wallet] Loaded existing wallet for {agent_id}: {wallet.address}")
return wallet
acct = Account.create()
wallet = AgentWallet(
agent_id=agent_id,
address=acct.address,
private_key=acct.key.hex() if isinstance(acct.key, bytes) else acct.key,
)
self._wallets[agent_id] = wallet
if model_name:
self._model_wallets[model_name] = wallet
self._save_wallets()
logger.info(f" [wallet] Created wallet for {agent_id}: {wallet.address}")
return wallet
def _load_wallets(self):
"""Load persisted model->wallet mapping from disk."""
if self._store_path.exists():
try:
data = json.loads(self._store_path.read_text())
for model_name, w in data.items():
self._model_wallets[model_name] = AgentWallet(
agent_id=w.get("agent_id", ""),
address=w["address"],
private_key=w["private_key"],
)
logger.info(f" [wallet] Loaded {len(self._model_wallets)} persisted wallets")
except Exception as e:
logger.warning(f" [wallet] Could not load wallets: {e}")
def _save_wallets(self):
"""Persist model->wallet mapping to disk (unredacted keys)."""
self._store_path.parent.mkdir(parents=True, exist_ok=True)
data = {
model: {"agent_id": w.agent_id, "address": w.address, "private_key": w.private_key}
for model, w in self._model_wallets.items()
}
self._store_path.write_text(json.dumps(data, indent=2))
def get_wallet(self, agent_id: str) -> Optional[AgentWallet]:
return self._wallets.get(agent_id)
def get_balance(self, address: str) -> float:
"""Get balance in ETH."""
try:
wei = self.w3.eth.get_balance(Web3.to_checksum_address(address))
return float(Web3.from_wei(wei, "ether"))
except Exception as e:
logger.warning(f" [wallet] Balance check failed for {address}: {e}")
return 0.0
def get_treasury_balance(self) -> float:
if not self.treasury_address:
return 0.0
return self.get_balance(self.treasury_address)
def disburse(self, to_address: str, amount_eth: float, reason: str = "") -> Optional[dict]:
"""
Send ETH from treasury to an agent wallet.
Returns tx receipt dict or None if dry_run / failed.
"""
amount_wei = Web3.to_wei(amount_eth, "ether")
record = {
"to": to_address,
"amount_eth": amount_eth,
"reason": reason,
"tx_hash": None,
"status": "pending",
}
if not self.is_live:
record["status"] = "dry_run"
self._disbursements.append(record)
logger.info(f" [wallet] DRY RUN disburse {amount_eth:.6f} ETH → {to_address} ({reason})")
return record
try:
nonce = self.w3.eth.get_transaction_count(self.treasury_address)
tx = {
"from": self.treasury_address,
"to": Web3.to_checksum_address(to_address),
"value": amount_wei,
"gas": 21000,
"gasPrice": self.w3.eth.gas_price,
"nonce": nonce,
"chainId": self.w3.eth.chain_id,
}
signed = self._treasury_account.sign_transaction(tx)
tx_hash = self.w3.eth.send_raw_transaction(signed.raw_transaction)
receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash, timeout=60)
record["tx_hash"] = tx_hash.hex()
record["status"] = "confirmed" if receipt["status"] == 1 else "failed"
self._disbursements.append(record)
logger.info(
f" [wallet] Disbursed {amount_eth:.6f} ETH → {to_address} "
f"(tx: {tx_hash.hex()[:16]}… status: {record['status']})"
)
return record
except Exception as e:
record["status"] = f"error: {e}"
self._disbursements.append(record)
logger.error(f" [wallet] Disburse failed: {e}")
return record
def fund_agent(self, agent_id: str, amount_eth: float) -> Optional[dict]:
"""Convenience: disburse ETH to an agent by agent_id."""
wallet = self._wallets.get(agent_id)
if not wallet:
logger.warning(f" [wallet] No wallet for {agent_id}")
return None
return self.disburse(wallet.address, amount_eth, reason=f"fund:{agent_id}")
def disburse_reward(self, agent_id: str, amount_eth: float, contract_id: str = "") -> Optional[dict]:
"""Disburse contract reward to agent."""
wallet = self._wallets.get(agent_id)
if not wallet:
return None
return self.disburse(wallet.address, amount_eth, reason=f"reward:{contract_id}")
@property
def disbursements(self) -> list[dict]:
return list(self._disbursements)
def summary(self) -> dict:
total_disbursed = sum(d["amount_eth"] for d in self._disbursements if d["status"] in ("confirmed", "dry_run"))
return {
"treasury_address": self.treasury_address,
"treasury_balance_eth": self.get_treasury_balance() if self.is_live else None,
"agents_with_wallets": len(self._wallets),
"total_disbursements": len(self._disbursements),
"total_eth_disbursed": total_disbursed,
"is_live": self.is_live,
"rpc_url": self.rpc_url,
}
def export_wallets(self, path: str, redact_keys: bool = True):
"""Export wallet mapping to JSON."""
data = {aid: w.to_dict(redact_key=redact_keys) for aid, w in self._wallets.items()}
Path(path).write_text(json.dumps(data, indent=2))