ldhldh commited on
Commit
93475ae
1 Parent(s): 8463451

Delete p2p_utils.py

Browse files
Files changed (1) hide show
  1. p2p_utils.py +0 -67
p2p_utils.py DELETED
@@ -1,67 +0,0 @@
1
- import re
2
- import asyncio
3
- import requests
4
- import hivemind
5
- import functools
6
- from async_timeout import timeout
7
- from petals.server.handler import TransformerConnectionHandler
8
-
9
- info_cache = hivemind.TimedStorage()
10
-
11
-
12
- async def check_reachability(peer_id, _, node, *, fetch_info=False, connect_timeout=5, expiration=300, use_cache=True):
13
- if use_cache:
14
- entry = info_cache.get(peer_id)
15
- if entry is not None:
16
- return entry.value
17
-
18
- try:
19
- with timeout(connect_timeout):
20
- if fetch_info: # For Petals servers
21
- stub = TransformerConnectionHandler.get_stub(node.p2p, peer_id)
22
- response = await stub.rpc_info(hivemind.proto.runtime_pb2.ExpertUID())
23
- rpc_info = hivemind.MSGPackSerializer.loads(response.serialized_info)
24
- rpc_info["ok"] = True
25
- else: # For DHT-only bootstrap peers
26
- await node.p2p._client.connect(peer_id, [])
27
- await node.p2p._client.disconnect(peer_id)
28
- rpc_info = {"ok": True}
29
- except Exception as e:
30
- # Actual connection error
31
- if not isinstance(e, asyncio.TimeoutError):
32
- message = str(e) if str(e) else repr(e)
33
- if message == "protocol not supported":
34
- # This may be returned when a server is joining, see https://github.com/petals-infra/health.petals.dev/issues/1
35
- return {"ok": True}
36
- else:
37
- message = f"Failed to connect in {connect_timeout:.0f} sec. Firewall may be blocking connections"
38
- rpc_info = {"ok": False, "error": message}
39
-
40
- info_cache.store(peer_id, rpc_info, hivemind.get_dht_time() + expiration)
41
- return rpc_info
42
-
43
-
44
- async def check_reachability_parallel(peer_ids, dht, node, *, fetch_info=False):
45
- rpc_infos = await asyncio.gather(
46
- *[check_reachability(peer_id, dht, node, fetch_info=fetch_info) for peer_id in peer_ids]
47
- )
48
- return dict(zip(peer_ids, rpc_infos))
49
-
50
-
51
- async def get_peers_ips(dht, dht_node):
52
- return await dht_node.p2p.list_peers()
53
-
54
- @functools.cache
55
- def get_location(ip_address):
56
- try:
57
- response = requests.get(f"http://ip-api.com/json/{ip_address}")
58
- if response.status_code == 200:
59
- return response.json()
60
- except Exception:
61
- pass
62
- return {}
63
-
64
- def extract_peer_ip_info(multiaddr_str):
65
- if ip_match := re.search(r"/ip4/(\d+\.\d+\.\d+\.\d+)", multiaddr_str):
66
- return get_location(ip_match[1])
67
- return {}