MEP / node /test_three_markets.py
WUAIBING
prepare
a25490a
import asyncio
import json
import requests
import websockets
import time
import urllib.parse
from identity import MEPIdentity
import uuid
from typing import Optional
HUB_URL = "http://localhost:8000"
WS_URL = "ws://localhost:8000/ws"
def get_auth_url(identity: MEPIdentity):
ts = str(int(time.time()))
sig = identity.sign(identity.node_id, ts)
sig_safe = urllib.parse.quote(sig)
return f"{WS_URL}/{identity.node_id}?timestamp={ts}&signature={sig_safe}"
def submit_task(identity: MEPIdentity, payload: str, bounty: float, target: Optional[str] = None):
data = {
"consumer_id": identity.node_id,
"payload": payload,
"bounty": bounty
}
if target:
data["target_node"] = target
payload_str = json.dumps(data)
headers = identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
r = requests.post(f"{HUB_URL}/tasks/submit", data=payload_str, headers=headers)
return r.json()
def place_bid(identity: MEPIdentity, task_id: str):
data = {
"task_id": task_id,
"provider_id": identity.node_id
}
payload_str = json.dumps(data)
headers = identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
r = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers)
return r.json()
def complete_task(identity: MEPIdentity, task_id: str, result: str):
data = {
"task_id": task_id,
"provider_id": identity.node_id,
"result_payload": result
}
payload_str = json.dumps(data)
headers = identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
r = requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers)
return r.json()
def get_balance(identity: MEPIdentity):
r = requests.get(f"{HUB_URL}/balance/{identity.node_id}")
return r.json().get("balance_seconds", 0.0)
async def test_three_markets():
print("=" * 60)
print("Testing the 3 MEP Markets (+, 0, -)")
print("=" * 60)
alice = MEPIdentity(f"alice_{uuid.uuid4().hex[:6]}.pem")
bob = MEPIdentity(f"bob_{uuid.uuid4().hex[:6]}.pem")
requests.post(f"{HUB_URL}/register", json={"pubkey": alice.pub_pem})
requests.post(f"{HUB_URL}/register", json={"pubkey": bob.pub_pem})
print(f"πŸ‘© Alice (Consumer): {alice.node_id} | Starting Bal: {get_balance(alice)}")
print(f"πŸ‘¦ Bob (Provider): {bob.node_id} | Starting Bal: {get_balance(bob)}\n")
async def bob_listener():
async with websockets.connect(get_auth_url(bob)) as ws:
# 1. Wait for Compute Market RFC (+5.0)
msg = await ws.recv()
data = json.loads(msg)
if data["event"] == "rfc" and data["data"]["bounty"] > 0:
task_id = data["data"]["id"]
print(f"πŸ‘¦ Bob: Received Compute RFC {task_id[:8]} for +{data['data']['bounty']} SECONDS")
bid_res = place_bid(bob, task_id)
if bid_res["status"] == "accepted":
print("πŸ‘¦ Bob: Won Compute Bid! Completing task...")
complete_task(bob, task_id, "Here is the code you requested.")
print("πŸ‘¦ Bob: Compute task done.\n")
# 2. Wait for Cyberspace Direct Message (0.0)
msg = await ws.recv()
data = json.loads(msg)
if data["event"] == "new_task" and data["data"]["bounty"] == 0.0:
task_id = data["data"]["id"]
print(f"πŸ‘¦ Bob: Received Cyberspace DM {task_id[:8]} from Alice (0.0 SECONDS)")
print(f"πŸ‘¦ Bob: Message = '{data['data']['payload']}'")
complete_task(bob, task_id, "Yes Alice, I am free.")
print("πŸ‘¦ Bob: Sent free reply.\n")
# 3. Wait for Data Market RFC (-2.0)
msg = await ws.recv()
data = json.loads(msg)
if data["event"] == "rfc" and data["data"]["bounty"] < 0:
task_id = data["data"]["id"]
cost = data["data"]["bounty"]
print(f"πŸ‘¦ Bob: Received Data Market RFC {task_id[:8]} costing {cost} SECONDS")
# Bob's local configuration allows him to spend up to 5.0 SECONDS
max_purchase_price = -5.0
if cost >= max_purchase_price:
print("πŸ‘¦ Bob: Budget allows it! Bidding on premium data...")
bid_res = place_bid(bob, task_id)
if bid_res["status"] == "accepted":
print(f"πŸ‘¦ Bob: Paid {abs(cost)} SECONDS to download premium data: '{bid_res['payload']}'")
complete_task(bob, task_id, "Data received successfully.")
print("πŸ‘¦ Bob: Premium data acquisition complete.\n")
else:
print("πŸ‘¦ Bob: Too expensive. Ignored.")
await asyncio.sleep(0.5)
async def alice_sender():
# Let Bob connect
await asyncio.sleep(0.5)
async with websockets.connect(get_auth_url(alice)) as ws:
# Market 1: Compute Market (+5.0)
print("πŸ‘© Alice: Submitting Compute Task (+5.0 SECONDS)...")
submit_task(alice, "Write me a python script", 5.0)
await asyncio.wait_for(ws.recv(), timeout=6.0)
# Market 2: Cyberspace Market (0.0)
print("πŸ‘© Alice: Sending Cyberspace DM to Bob (0.0 SECONDS)...")
submit_task(alice, "Are you free to chat?", 0.0, target=bob.node_id)
await asyncio.wait_for(ws.recv(), timeout=6.0)
# Market 3: Data Market (-2.0)
print("πŸ‘© Alice: Broadcasting Premium Dataset (-2.0 SECONDS)...")
submit_task(alice, "SECRET_TRADING_ALGO_V9", -2.0)
await asyncio.wait_for(ws.recv(), timeout=6.0)
await asyncio.sleep(0.5)
await asyncio.gather(bob_listener(), alice_sender())
print("=" * 60)
print("Final Balances:")
print(f"πŸ‘© Alice (Started 10.0): {get_balance(alice)} (Paid 5.0, Earned 2.0 = Expected 7.0)")
print(f"πŸ‘¦ Bob (Started 10.0): {get_balance(bob)} (Earned 5.0, Paid 2.0 = Expected 13.0)")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_three_markets())